Message Queues
Chapter 12-10
Message Queues
The Message Queue functions provide a way to send information from one Domino or Notes application to another.
Application Message Queues
One possible use of message queues is to send information from the Domino server console to server add-in tasks. This is implemented by extending the server console TELL command. Starting with Release 4, if any text other than "QUIT" follows the name of the add-in task in the TELL command, the server checks to see if the task has created a message queue. If the message queue exists, the server places the rest of the command in a message and puts the message into that queue. The sample program SAMPLES\SERVER\MSG_Q illustrates this type of operation. An additional sample program MSG_T in SAMPLES\SERVER\MSG_Q sends messages to the MSG_Q task directly through the message queue the MSG_Q task created.
Message queues are not limited to the server environment. Queues are built using shared memory provided by the Domino and Notes core and do not depend on any platform-specific interprocess communication (IPC) capability. Any Domino or Notes C API application can create a message queue or open a message queue created by another API application and send messages back and forth. While a message queue does not provide the highest performance IPC for any given platform, it does allow applications to use an IPC mechanism that works the same on all Domino and Notes platforms.
Use the MQCreate function (declared in the header file mq.h) to create a message queue. For server add-in tasks that expect to receive commands from the console, the name of the message queue must consist of the prefix TASK_QUEUE_PREFIX followed by the name of the add-in task. For example, the sample program MSG_Q creates a message queue with the name:
- char MsgQueueName [] = TASK_QUEUE_PREFIX "MSG_Q";
which uses the implicit string concatenation feature of C to create an array of characters containing the prefix followed by the name of the add-in task, "MSG_Q". TASK_QUEUE_PREFIX is defined in stdnames.h as "MQ$", so the name of the queue is actually MQ$MSG_Q. The MQCreate function also has a parameter that can set a limit on the number of messages waiting in the queue. A non-zero value is used as the limit; if there are already that many unprocessed messages in the queue, an attempt to put a message into the queue returns an error. A value of zero indicates that no limit should be placed on the number of messages waiting.
Once you have created the message queue, use the MQOpen function to obtain a handle that is used by the rest of the message queue functions. Call MQClose to close the queue when you're done with it. MQPut adds messages to a queue, including a priority value to control the ordering of messages. MQGet retrieves messages from the queue. It can either return with an error if there are no messages waiting or wait for a message to be placed in the queue with an optional timeout value. MQScan provides a powerful additional capability; it invokes a callback function for each message in the queue. The return value from the callback function determines what happens next: quit the scan, go on to the next message, remove the message from the queue and quit, or remove the message and go on to the next.
Figures 1 and 2 show two different ways to structure a server add-in task. Both examples run a job on the server when a certain number of seconds have elapsed. The flowchart in Figure 1 calls AddInSecondsHaveElapsed in a loop; when this function returns TRUE, the specified time is up and the periodic job is run. One drawback to this design is that the main loop runs continuously; each time through the loop, the add-in has to check to see if AddInSecondsHaveElapsed returns TRUE.
The flowchart in Figure 2 shows how you can implement the same basic process using Message Queues. MQGet is called inside the loop, with the timeout set to the number of seconds between each run of the periodic job. If the add-in should quit (either because the server is shutting down or because the operator used the TELL ... QUIT command), MQGet returns ERR_MQ_QUITTING. If a message arrives from another process (for example, the server console), MQGet returns NOERROR and puts the received message into a buffer for the add-in to process. If the specified number of seconds has elapsed, MQGet returns ERR_MQ_TIMEOUT, indicating that the add-in should run the periodic job. MQGet only returns when there is something for the add-in to do; the add-in is not constantly running through the loop, wasting CPU time on checking.
The QUIT command receives special handling in Message Queues. QUIT has higher priority than any other message and sets the message queue to a special quit state. Once a message queue has been told to quit, no more messages can be placed in the queue. This processing for quit allows a program to create and open a Message Queue and then enter a loop that calls MQGet and processes each message. When the QUIT command is issued, the next call to MQGet returns the status ERR_MQ_QUITTING, indicating that the program should clean up and quit.
Third-Party Administration Requests
You can extend the Administration Process by creating an Administration Request directed to a third-party server add-in task which interprets the request and acts on it. When creating a third party Administration Request, you must specify the message queue name used by the server add-in in the ProxyProcess field of the request. The Administration Process uses this data to pass the request's and response's note IDs to the server add-in. Also specify the server name where the server add-in task is running in the ProxyServer field of the request. In the Proxy Action field of the request, put a text version of an identifier, greater than "5000".
The Administration Process acts on third party requests by oopening the message queue and placing an AdminAddInMessage with the ID of the administration request note and the ID of the log note. The add-in task monitors the message queue and performs the required processing.
Extensions to the Administration Process should not modify database ACLs because their actions could conflict with the Administration Process' standard ACL modification operations.
The Domino Collector Task Message Queue
The Domino Collect task collects statistics from specified servers. You can put collection request messages in the Collector task's message queue. The Collector will return the information you requested by placing STAT_RETURN_BLOCK structures in your own message queue.
In order to receive the Collector's responses to your request messages, you need to create your own message queue.
You place request messages in the Collector 's message queue via the SERVER_MSG_BLOCK structure. In this structure, you specify the server that you want to collect information from, the kind of information that you want collected, and the message queue you created in order to receive responses. See the Reference for details about the SERVER_MSG_BLOCK and STAT_RETURN_BLOCK structures.
To place a message on the Collector's message queue, specify COLLECT_QUEUE_NAME for the QueueName parameter to MQOpen and use MQPut to put your SERVER_MSG_BLOCK structure in this message queue. The Collector will post the information you requested in your own message queue as STAT_RETURN_BLOCK structures.
To retrieve the Collector's responses from your message queue, specify the address of a STAT_RETURN_BLOCK structure (casted to a char *) for the Buffer parameter to MQGet.
The sample program below demonstrates how to put in collection request messages and how to retrieve the returned response.
coltest.c: Sample Add-In for requesting Collection messages |
/ This Addin interfaces with the collector task to get information
on two different servers. /
/ C include files /
#include <stdio.h>
#include <stdlib.h>
/ C API include files /
#include "global.h"
#include "misc.h"
#include "osmisc.h"
#include "ostime.h"
#include "osmem.h"
#include "addin.h"
#include "event.h"
#include "mq.h"
#include "stats.h"
#include "stdnames.h"
#include "intl.h"
#include "miscerr.h"
#define COLLECT_TEST_QUEUE_NAME "TestCollect"
#define CHAR_TAB 0x09
#define CHAR_LF 0x0a
static WORD x;
extern ServerCount = 2;
static FILE fd; / File descriptor. /
extern int UserValue[5] = {0,0,0,0,0};
extern DWORD StartTicks[5] = {0,0,0,0,0};
extern DWORD ElapsedTicks[5] = {0,0,0,0,0};
extern char ServerName[5][MAXSPRINTF] =
{
"Radium",
"Acare"
};
STATUS far PASCAL ParseStats (char pServerName, HANDLE hStats, DWORD StatsLength);
STATUS LNPUBLIC AddInMain (HMODULE hResourceModule, int argc, char argv[])
/ Main entry point for event add-in
/
{
STATUS error = NOERROR;
char filename[MAXPATH]; / File name. /
int Minutes = 0;
char Output[MAXSPRINTF];
TIMEDATE StartTime, EndTime, CurrentTime, LoopTime; / Time variables. /
char StartTimeText[MAXSPRINTF];
char EndTimeText[MAXSPRINTF];
MQHANDLE hCollectorQueue = NULLHANDLE;
MQHANDLE bMsgQueue = NULLHANDLE;
SERVER_MSG_BLOCK ServerMsg;
WORD NameLen = 0;
WORD RandomLen = 0;
WORD CharsLeft = 0;
char TmpRandom[16];
char pRandom;
char TmpQueueName[20];
if (argc < 3)
{
printf("Invalid parameters\nUse: Load coltest <Filename> <Minutes>\n");
return(NOERROR);
}
/ Log 'We just started' /
AddInLogMsg(MSG_ADDIN_STARTED, NULL);
/ Get the filename from param 1/
strncpy(filename, argv[1], sizeof(filename)-1);
fd = fopen (filename, "w+");
if(fd == NULL)
return(ERR_ADDIN_FOPEN);
/ Convert param 2 form ascii to integer (Minutes). /
Minutes = atoi(argv[2]);
/ Get my start time. (OSTIME.H) /
OSCurrentTIMEDATE(&StartTime);
EndTime = LoopTime = StartTime;
/ Adjust my end time by number of minutes specified in param 2. /
TimeDateAdjust(&EndTime, 0, Minutes, 0, 0, 0, 0);
/ Adjust my end time by two of minutes for loop. /
TimeDateAdjust(&LoopTime, 0, 2, 0, 0, 0, 0);
/ Convert start time and end time to text, for header. /
ConvertTIMEDATEToText (NULL, NULL, &StartTime, StartTimeText,
sizeof(StartTimeText)-1, NULL);
ConvertTIMEDATEToText (NULL, NULL, &EndTime, EndTimeText,
sizeof(EndTimeText)-1, NULL);
/ Write header to text file. /
sprintf(Output, "Users count from %s to %s\n", StartTimeText, EndTimeText);
fwrite( Output, sizeof(char), strlen(Output), fd );
/ Write labels to text files. /
sprintf(Output, "Time/Server");
for(x = 0; x < ServerCount; x++)
{
char ServerString[MAXSPRINTF];
sprintf(ServerString, ",%s", ServerName[x]);
strcat (Output, ServerString);
}
strcat(Output, "\n");
fwrite( Output, sizeof(char), strlen(Output), fd );
getnewname:
/ Create random queuename /
NameLen = sprintf(TmpQueueName, "%.11s", COLLECT_TEST_QUEUE_NAME);
CharsLeft = 16-NameLen;
if (CharsLeft)
{
RandomLen = sprintf(TmpRandom, "%08lu", (DWORD)rand());
pRandom = TmpRandom+RandomLen-CharsLeft;
sprintf(TmpQueueName+NameLen, "%s", pRandom);
}
printf("Opening queue (%s)\n", TmpQueueName);
/ create temp queue /
if ((error = MQCreate(TmpQueueName, 128, 0)) == NOERROR)
{
if (error = MQOpen(TmpQueueName, 0, &bMsgQueue))
{
goto done;
}
}
else
{
if(ERR(error) == ERR_DUPLICATE_MQ)
goto getnewname;
bMsgQueue = NULL;
goto done;
}
memset(&ServerMsg, 0, sizeof(SERVER_MSG_BLOCK));
/ Set up message to put in collect queue /
ServerMsg.Task = GET_STAT_TASK;
sprintf(ServerMsg.StatName, "Server.Users");
sprintf(ServerMsg.QueueName, "%s", TmpQueueName);
if((error = MQOpen(COLLECT_QUEUE_NAME, 0, &hCollectorQueue)) != NOERROR)
{
printf("Collector not running.\n");
error = NOERROR;
goto done;
}
/ Main loop.
AddInIdleDelay checks for quit message, therefore the !
Check to see if we have reached end time.
/
while (!AddInIdle())
/ while (!AddInIdleDelay(10) && TimeDateCompare(&CurrentTime, &EndTime) < 0) /
{
WORD ReturnMsgCount = 0;
for(x = 0; x < ServerCount; x++)
{
/ Get starting millisecond count ans save /
StartTicks[x] = GetTickCount();
sprintf(ServerMsg.StatServerName, "%s", ServerName[x]);
if((error = MQPut(hCollectorQueue, NOPRIORITY, (char )&ServerMsg,
sizeof(SERVER_MSG_BLOCK), 0)) != NOERROR)
{
goto done;
}
}
Next:
/ Get the current time to compare against /
OSCurrentTIMEDATE(&CurrentTime);
if(MQGet(bMsgQueue, (char) &pRtnMsg, sizeof(STAT_RETURN_BLOCK), 0, 0, NULL) !=
ERR_MQ_EMPTY)
{
if(!pRtnMsg.error)
{
ParseStats(pRtnMsg.ServerName, pRtnMsg.hStatName, pRtnMsg.StatNameSize);
ReturnMsgCount++;
}
else
{
char szString[256];
OSLoadString( NULLHANDLE, pRtnMsg.error, szString, sizeof(szString)-1);
printf( "\tError = %s\n", szString );
printf("Error contacting %s.\n", pRtnMsg.ServerName);
ReturnMsgCount++;
}
}
/ Did I get messages back from all 5 servers. /
if(!AddInIdleDelay(1) && ReturnMsgCount < ServerCount)
goto Next;
ReturnMsgCount = 0;
ConvertTIMEDATEToText (NULL, NULL, &CurrentTime, EndTimeText,
sizeof(EndTimeText)-1, NULL);
sprintf(Output, "%s", EndTimeText);
for(x = 0; x < ServerCount; x++)
{
char OutString[MAXSPRINTF];
sprintf(OutString, ",%d", UserValue[x]);
strcat (Output, OutString);
sprintf(OutString, ",%d", ElapsedTicks[x]);
strcat (Output, OutString);
}
strcat(Output, ",");
strcat(Output, EndTimeText);
for(x = 0; x < ServerCount; x++)
{
char OutString[MAXSPRINTF];
}
strcat(Output, "\n");
fwrite( Output, sizeof(char), strlen(Output), fd );
/ Wait two minutes /
while(TimeDateCompare(&CurrentTime, &LoopTime) < 0)
{
OSCurrentTIMEDATE(&CurrentTime);
if(AddInIdleDelay(1))
break;
}
LoopTime = CurrentTime;
/ Adjust my end time by two of minutes for loop. (MISC.H) /
TimeDateAdjust(&LoopTime, 0, 2, 0, 0, 0, 0);
}
done:
if(hCollectorQueue)
MQClose(hCollectorQueue, 0);
if(bMsgQueue)
MQClose(bMsgQueue, 0);
/ Write event counts prefixed with Type label. /
fclose(fd);
/ Log 'We are shuting down'. (ADDIN.H) /
AddInLogMsg(MSG_ADDIN_TERMINATING, NULL);
return(error);
}
STATUS far PASCAL ParseStats (char pServerName, HANDLE hStats, DWORD StatsLength)
/ Parse the hStats buffer /
{
char p, pValue;
WORD ValueLength;
char NumberValue[MAXSPRINTF];
p = OSLockObject(hStats);
printf("%s\n", pServerName);
/ Parse stats into Facility, StatName and Value /
while (StatsLength)
{
BOOL PuntuationStripped = FALSE;
pValue = strchr(p, CHAR_TAB);
if (pValue == NULL)
break;
pValue++;
StatsLength -= (WORD) (pValue - p + 1);
p = strchr(pValue, CHAR_LF);
if (p == NULL)
break;
/ Parse the value to store in array /
ValueLength = (WORD) (p - pValue);
p++;
StatsLength -= ValueLength+1;
sprintf(NumberValue, "%.s", ValueLength, pValue);
for(x = 0; x < ServerCount; x++)
{
/ Save user value to correct spot in array */
if(_stricmp (pServerName, ServerName[x]) == 0)
{
UserValue[x] = atoi(NumberValue);
ElapsedTicks[x] = GetTickCount() - StartTicks[x];
break;
}
}
}
OSUnlockObject(hStats);
OSMemFree(hStats);
return(NOERROR);
}