Friday, September 10, 2010

event handling with message queue on top of pipes: part 2

The previous part stopped just before going into the details of sending and receiving mechanism.
It's implied that each sent message should receive an ACK from the handler when it was processed. This piece of information could be embedded in the handler response or sent back as a dedicated message if there's no answer expected. ACK should indicate that the handler finished processing the message. The sender can indicate if the ACK is not needed at all.
On one hand this kind of information could be used by sender to conclude whether message processing is complete. On other hand it's possible to statistically conclude how much time it takes the handler to process specific message. Though this information is not used in current rough implementation it could be accounted for probabilistic scheduling.

The prototype of routine for sending message is straightforward:

mq_cookie
mq_enqueue_request(struct mq *mq,
                   struct mq_request *req,
                   int broadcast)
It accepts message queue descriptor as a first argument and message itself as a second. Third argument states whether the message should be delivered to all handlers or just to one. The function return a cookie which could be used by caller to obtain the answer(or ACK) from the handler.
If the request is not a broadcast mq_enqueue_request searches for the handler with the shortest message backlog. Also this routine is trying to find a handler writing to whose pipe will not block. However if all pipes are full this would be a blockable operation.

Aforementioned poll thread is used to avoid blocks in handlers on writing the responses(or ACKs) back. The thread is iterating over handlers' pipes gathering the responses and putting them into the list. Each response has its lifetime after which the resources would be freed.
In kernels prior to 2.6.35 there was no way to configure pipe length however now it possible via F_SETPIPE_SZ fcntl(the max size for non-root owners is limited by /proc/sys/fs/pipe-max-size). Since this thread is fetching messages from pipes on regular basis there's no need to set big pipe size. Current upper bound of 1MB is big enough to hold thousands of short messages and this could be limited to few KB unless big messages are not required.

Collecting responses into a dedicated list is also needed for obtaining the responses by a caller
since pipes are not iterative. Once data read from the pipe it's not possible to write it back(to be more precise it's possible but for iterating over the pipe all messages should be read and then written back - the overhead is to big in this case and also requires some synchronization mechanism to guard the write end of the pipe). mq_get_response is used to fetch the response from the global list:
struct mq_response *
mq_get_response(struct mq *mq,
                mq_cookie cookie)
Second argument, cookie, is a response identifier returned by mq_enqueue_request. If the message is not in the list of the already fetched responses this routine will return NULL. At the moment there is no way to say whether the response is still on the way or it was already purged.

Following example illustrates the flow of sending messages and receiving responses:
    int id;
    for (id=0;id<10000;++id) {
        struct mq_list *e, *t;
        struct mq_cookie_list *entry;
        mq_cookie cookie = mq_enqueue_request(&mq,
                                              mq_request(&req,
                                                         MQ_EVENT_REQUEST,
                                                         ((unsigned long long)pid << 32) | id,
                                                         timems(),
                                                         "",
                                                         0),
                                              0);

        mq_list_for_each_safe(&cookies.list, e, t) {
            entry = container_of(e, struct mq_cookie_list, list);

            if ((rsp = mq_get_response(&mq, entry->cookie))) {
                /* Do something with the response */
                
                mq_release_response(rsp);
                mq_list_detach(&entry->list);
                free(entry);
            }
        }

        if ((rsp = mq_get_response(&mq, cookie))) {
            /* Do something with the response */
            
            mq_release_response(rsp);
        } else {
            struct mq_cookie_list *entry = malloc(sizeof(struct mq_cookie_list));
            entry->cookie = cookie;
            mq_list_add_tail(&cookies.list, &entry->list);
        }
    }

Basic handler should deal with two kinds of events: MQ_EVENT_EXIT and MQ_EVENT_REQUEST. The following function could be a good skeleton for a message handler.
void
handler(int endpoints[2])
{
    struct mq_request req;
    struct mq_response rsp;
    unsigned int served = 0, id = 0;
    unsigned int pid = getpid();

    printf("Event Handler %u\n", pid);

    for (;;) {
        mq_get_request(&req, endpoints);

        ++served;

        if (req.event & MQ_EVENT_EXIT) {
                printf("Event Handler %u: exiting, request %u:%u, served %u\n", pid, (unsigned int)(req.id >> 32), (unsigned int)(req.id & 0xffffffff), served);
                mq_free_request(&req, endpoints);
                return;
        } else if (req.event & MQ_EVENT_REQUEST) {
            if (req.data.len) {
                printf("Event Handler %u: request %u:%u, %s\n", pid, (unsigned int)(req.id >> 32), (unsigned int)(req.id & 0xffffffff), (char *)req.data.data);
            }

            mq_send_response_free_request(&req,
                                          mq_response(&rsp,
                                                      &req,
                                                      ((unsigned long long)pid << 32) | id++,
                                                      timems(),
                                                      &pid,
                                                      4),
                                          endpoints);
        } else {
            printf("Event Handler %u: unknown event: %d", pid, (int)req.event);
            mq_free_request(&req, endpoints);
        }
    }
}
The sources of pipe message queue could be found at GitHub.