About Writing GitHub Pub

A POSIX Queue Implementation

10 Sep 2014

In UNIX Network Programming Vol. 2, using tight code and clear diagrams, Richard Stevens gives an account of the POSIX message queue that anyone can follow Ste99 Ch. 5. His work inspired me to examine the implementation found in the illumos kernel.

What is a POSIX Message Queue?

The POSIX message queue, or queue for short, was first defined in POSIX.1b IEEE13. It provides inter-process communication like a pipe, but the similarities quickly stop there. The queue broadens the pipe abstraction in several key ways.

Queues provide message-based communication between unrelated processes using atomic reads and writes, FIFO ordering, and a multi-priority scheme. Their lifecycle is separate from the processes that use them and they allow asynchrony among producers and consumers. But what is a queue in the physical sense?

A Queue is a Glorified File

At its most basic level the illumos Queue is nothing more than a glorified mmap’d file. Actually, it is a set of four files:

All four files are needed, but the data file is most important. Each process maps this file to its local address space, allowing the queue to be treated like any other in-memory data structure.

As shown in Figure 1, each process maps the queue to a different location in its address space. This presents a challenge, preventing pointers from being used inside the data structure. A pointer to a message in one process’s address space could cause a segmentation fault in another. Offsets from the beginning of the data file are used instead. An offset, stored as an uint64_t, represents the number of bytes from the beginning of the file. The use of offsets explains the odd macros like MQ_PTR.

#define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))

MQ_PTR converts an offset to a message pointer. Given a pointer to the queue header (m) and the offset of the message (n) it returns a pointer to the message header. This macro, along with friends HEAD_PTR and TAIL_PTR, bridges the physical memory layout and logical structure. In order to understand this macro better let us examine the layout of the queue data file.

Anatomy of a Queue

A queue consists of a header structure (mqhdr_t) followed by priority offsets and messages. The header is the window into the queue, containing all the information needed to correctly access the messages. Figure 2 divides the memory layout of the queue into four parts: ❶ general attributes, ❷ concurrency control, ❸ notification, and ❹ the messages. This figure assumes default attributes are used, each row represents a 64-bit cache line.

➊ General

Total Size
(mq_totsize) The total size, in bytes, for the entire queue. This includes the header, head and tail offsets, and all the messages.
Max Message Size
(mq_maxsz) The maximum size, in bytes, of a message payload. Default of 1024, may be changed during creation.
Max Number of Messages
(mq_maxmsg) The maximum number of messages allowed pending in the queue. Default of 128, may be changed during creation.
Max Priority
(mq_maxprio) The maximum priority of any message, hard-coded to 32.

➋ Concurrency Control

A mutex provides read and write atomicity. Two sempahores communicate the queue’s emptiness or fullness.

Exclusive Mutex
(mq_exclusive) Prevents concurrent writes from corruption and reading of incomplete messages.
Receive Semaphore
(mq_notempty) The number of messages currently on the queue. Readers block on this, writers use it to signal that a message is ready to be read. Its initial value is 0.
Send Semaphore
(mq_notfull) The number of free messages available. Writers block on this if the queue is full. Readers use it to signal a message has been freed. Its initial value is equal to mq_maxmsg.

➌ Notification

The fields mq_sigid, mq_ntype, mq_des, mq_rblocked, and mq_spawner are all related to notification. I will not discuss them in this post.

➍ Messages

Messages are delivered in FIFO order across 32 priorities. A read returns the oldest message of the highest priority. For efficiency, each priority is its own linked list. These logical linked lists are represented by physical offsets into the queue structure. The offset gymnastics make the implementation appear complicated but a queue is simply a linked list per priority. Physically, it’s a contiguous block of memory. Logically, it’s a set of 32 linked lists.

Offset to Free List
(mq_freep) Offset to head of the free list. At least one message must be on this list for a write to occur.
Offset to Head Offsets
(mq_headpp) Offset to the start of the head offsets.
Offset to Tail Offsets
(mq_tailpp) Offset to the start of the tail offsets.
Current Max Priority
(mq_curmaxprio) Records the current maximum priority; providing an index into the head and tail offsets, which are required for reading and writing.
Priority Mask
(mq_mask) A bit-vector tracking priorities that have pending messages. Needed to determine the next highest priority when the current maximum is drained.
Priority N Head & Tail Offset
The head and tail offset for priority N. Logically they represent pointers to the front, where messages are read from, and the back, where messages are written to, of that priority’s queue.
Message N
The header (msghdr_t) and body for message N. Messages are chained via the msg_next offset found in the header.

That's all of the data structure, now for the functions. I'm going to stay at a high level to avoid rabbit holes. At roughly 1,100 lines, the implementation is very approachable.

Open and Create

The mq_open function is used for both creation and opening. To create the O_CREATE oflag must be set and four arguments passed: the name (path) of the queue, the oflags which determine how to open the queue, the mode bits which represent the permissions to access the queue, and an optional pointer to an attributes structure for setting message size and max number of pending messages. ① If the path is a valid POSIX object name then the first action is to create the lock file. The lock file acts as a mutex across all processes, protecting actions inside the dotted black rectangle from concurrent execution. ② Next, if the caller cannot access the permission file then it is not allowed to open the queue. A separate permission file exists because the data file must be given read-write permissions in order to modify the queue structure when performing a send or receive. The permission file’s permissions are set to the value of the mode argument. ③ If the caller passed O_CREATE, and the queue doesn’t already exist, then create the data file. Otherwise, open the existing data file. ④ Allocate the message queue descriptor (mqdes_t), mmap the data file, and mmap the queue description (mq_dn). ⑤ A queue must be initialized before it can be used. This step calls the mq_init function which initializes all mutexes, sets the basic attributes and offsets, and chains all messages together to live on the free list. ⑥ At this point the queue exists and is ready for use. The lock file is deleted allowing other processes to open this queue. ⑦ In order for the opening process to remember this queue it must place the descriptor on its global list of message queues (mqdest_t *mq_list). This action is protected by the exclusive mutex mq_list_lock represented as the dotted brown rectangle. Finally, the queue descriptor is returned to the caller.

Send

The mq_send function requires four arguments: the queue descriptor (mqdes), message pointer (msg_ptr), message length (msg_len), and priority (msg_prio). ① The arguments are validated, write permission is checked, and then sem_wait is called on mq_notfull. Once a lock on the semaphore has been acquired, indicating that the queue is not full, the exclusive mutex lock (mq_exclusive) is acquired to enter the critical region shown as the dotted red rectangle. ② With both space on the queue and exclusive access the caller may now pull a message from the head of the free list (mq_freep) and memcpy the user’s message into the body. ③ If this is the priority’s first message then set the head and tail offsets to this message, flip the appropriate bit in the mask (mq_mask), and change the current maximum priority (mq_curmaxprio) if needed. Otherwise, if a list already exists for this priority, add the new message to the tail of the list. ④ Call sem_post(mq_notempty) to notify readers a message is ready to be consumed. Now that the sender has finished mutating the queue it may release the mutex and return 0 for success.

Receive

The mq_receive function requires four arguments: the queue descriptor (mqdes), a pointer to the buffer to copy the message payload to (msg_ptr), the length of the buffer (msg_len), and a pointer to write the priority to (msg_prio) or NULL if the priority is of no concern. ① The arguments are validated, read permission is checked, the buffer length is verified to be large enough to hold the largest message, and then the reader waits on the mq_notempty semaphore. When the semaphore lock has been acquired the reader knows there is at least one message to read and then waits on the exclusive lock (mq_exclusive). ② Once in the critical region, the reader removes a message from the head of the queue for the highest priority. ③ If this is the last message then turn off the priority’s bit in the mask (mq_mask), find and set the next max (mq_curmaxprio), and set the tail offset to NULL to indicate the list is empty. ④ Copy the message payload into the user’s buffer and place the message on the head of the free list. ⑤ Post to the mq_notfull semaphore to notify writers that a message is available for write, and release the mutex. Return the length of the message payload that was written to the user’s buffer.

Timeouts and Blocking

In the above descriptions of send and receive I assumed the default behavior of blocking with no timeout. However, both functions allow blocking calls with timeouts or non-blocking calls. I’ll discuss timeouts first.

Send and receive have three APIs each: mq_<op>, mq_timed<op>, and mq_reltimed<op>_np. The first version, with no timeout, was discussed in the previous sections. The second version, mq_timed<op>, allows the caller to set an absolute timeout; i.e. timeout once a specific moment has passed. The third version, mq_reltimed<op>_np, allows a relative timeout; i.e. timeout after a duration has passed. In both cases the timeout is based on the semaphore being waited on in step ① of each diagram, before the exclusive mutex is acquired. The timeout is specified as a timespec_t which consist of two numbers: the number of seconds and nanosecond resolution of that second. An absolute timeout adds the number of seconds to the Unix epoch. A relative timeout treats the seconds as a duration.

When opening a queue you may specify O_NONBLOCK to indicate that send and receive calls should immediately return EAGAIN if the operation cannot be performed at that moment. To perform in a non-blocking manner the implementation makes use of the sem_trywait function instead of the blocking semaphore API. Calling the timeout versions in non-blocking mode will act as if calling the non-timeout version; i.e. the timeout is ignored.

Close and Destroy

As noted earlier, queues exist separate from the lifecycle of a process. For this to work the actions of closing and destroying must be separated.

A process indicates it is done with a queue by closing it. The mq_close function removes the descriptor from the process list (mq_list), invalidates the descriptor, frees the memory, and unmaps the description and data files. A close does not have any affect on other processes using the same queue.

A process removes the queue from public view by calling mq_unlink to destroy it. A destroyed queue does not immediately invalidate the queue descriptors of the other processes. It only prevents future opens from succeeding. The mq_unlink function deletes the permission and data files. It uses the lock file for atomicity just like mq_open.

A Note on Notification

The notification feature allows a queue to signal a pid or create a thread to handle the case where a queue transitions from empty to containing at least one message. However, there is a subtle race between readers and explaining the implementation would have lengthened an already lengthy post. This feature also complicates the implementation in a way that irks me. I wager the notification mechanism makes up about half the code, around 500 lines. Bloating both the data structure and the functions that operate on it. There is a smaller, cleaner, more elegant implementation trying to free itself from the shackles of the notification system.

Further Reading

To learn more about illumos internals buy Solaris Internals McD06. It has a section on the queue implementation as well as many other fascinating details.

References

IEEE13
McD06
Ste99