A POSIX Queue Implementation
10 Sep 2014
In UNIX Network Programming Vol. 2, using tight code and clear diagrams, Richard Stevens gives an account of the POSIX message queue that anyone can follow Ste99 Ch. 5. His work inspired me to examine the implementation found in the illumos kernel.
What is a POSIX Message Queue?
The POSIX message queue, or queue for short, was first defined in POSIX.1b IEEE13. It provides inter-process communication like a pipe, but the similarities quickly stop there. The queue broadens the pipe abstraction in several key ways.
- Unrelated Process Communication
- Queues are named global resources; easily shared among unrelated processes. Pipes are unnamed file descriptors shared between a parent and child process. However, named pipes (FIFOs) can be used to get around this limitation.
- Message Oriented
- Queues store messages with explicit boundaries, FIFO ordering, and priority-based delivery. Pipes are simple bytes streams.
- Atomicity
-
The delivery of a message is atomic; concurrent writers
cannot interleave the contents of their messages. A write to
a pipe is not atomic if the
PIPE_BUF
size is exceeded. - Asynchrony
- Messages may be placed on a queue that has no current readers. Pipes require a reader in order to write, even in non-blocking mode.
- Kernel Persistence
- A queue’s existence is independent of its usage. A queue may contain messages even if no process has the queue open. It exists until either the kernel reboots or deletion is requested; this property is called kernel persistence. Pipes have process persistence; existence requires the cooperation of at least one process.
- Notification
- Queues can use signals or threads to indicate when an empty queue has received a message. I will not discuss this feature save for a note at the end.
Queues provide message-based communication between unrelated processes using atomic reads and writes, FIFO ordering, and a multi-priority scheme. Their lifecycle is separate from the processes that use them and they allow asynchrony among producers and consumers. But what is a queue in the physical sense?
A Queue is a Glorified File
At its most basic level the illumos Queue is nothing more than a glorified mmap’d file. Actually, it is a set of four files:
- Lock File
-
Used as a mutex to achieve atomicity during creation,
opening, and destroying of a queue; preventing concurrent
initialization and other race conditions. It exists only
while
mq_open
ormq_unlink
is executing. - Permission File
- Prevents unauthorized access to the data file. To open the queue the caller must first open this file. This seemingly superfluous file is needed because the data file must have read-write permissions, even if the opening process intends only to read.
- Description File
-
Used to mmap the queue description structure (
struct mq_dn
), which is a bit-vector of flags per queue descriptor. Currently, the only flag is theO_NONBLOCK
flag. This file exists briefly during open for the sole purpose of performing the mmap. - Data File
- Contains the bytes of the queue data structure and its messages; it is the queue.
All four files are needed, but the data file is most important. Each process maps this file to its local address space, allowing the queue to be treated like any other in-memory data structure.
As shown in Figure 1, each process maps
the queue to a different location in its address space. This
presents a challenge, preventing pointers from being used
inside the data structure. A pointer to a message in one
process’s address space could cause a segmentation fault in
another. Offsets from the beginning of the data file are used
instead. An offset, stored as an uint64_t
,
represents the number of bytes from the beginning of the file.
The use of offsets explains the odd macros
like MQ_PTR
.
#define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
MQ_PTR
converts an offset to a message pointer.
Given a pointer to the queue header (m
) and the
offset of the message (n
) it returns a pointer to
the message header. This macro, along with
friends HEAD_PTR
and TAIL_PTR
,
bridges the physical memory layout and logical structure. In
order to understand this macro better let us examine the
layout of the queue data file.
Anatomy of a Queue
A queue consists of a header structure (mqhdr_t
)
followed by priority offsets and messages. The header is the
window into the queue, containing all the information needed
to correctly access the
messages. Figure 2 divides the memory
layout of the queue into four parts: ❶ general attributes, ❷
concurrency control, ❸ notification, and ❹ the messages. This
figure assumes default attributes are used, each row
represents a 64-bit cache line.
➊ General
- Total Size
-
(
mq_totsize
) The total size, in bytes, for the entire queue. This includes the header, head and tail offsets, and all the messages. - Max Message Size
-
(
mq_maxsz
) The maximum size, in bytes, of a message payload. Default of 1024, may be changed during creation. - Max Number of Messages
-
(
mq_maxmsg
) The maximum number of messages allowed pending in the queue. Default of 128, may be changed during creation. - Max Priority
-
(
mq_maxprio
) The maximum priority of any message, hard-coded to 32.
➋ Concurrency Control
A mutex provides read and write atomicity. Two sempahores communicate the queue’s emptiness or fullness.
- Exclusive Mutex
-
(
mq_exclusive
) Prevents concurrent writes from corruption and reading of incomplete messages. - Receive Semaphore
-
(
mq_notempty
) The number of messages currently on the queue. Readers block on this, writers use it to signal that a message is ready to be read. Its initial value is 0. - Send Semaphore
-
(
mq_notfull
) The number of free messages available. Writers block on this if the queue is full. Readers use it to signal a message has been freed. Its initial value is equal tomq_maxmsg
.
➌ Notification
The
fields mq_sigid
, mq_ntype
, mq_des
, mq_rblocked
,
and mq_spawner
are all related to notification. I
will not discuss them in this post.
➍ Messages
Messages are delivered in FIFO order across 32 priorities. A read returns the oldest message of the highest priority. For efficiency, each priority is its own linked list. These logical linked lists are represented by physical offsets into the queue structure. The offset gymnastics make the implementation appear complicated but a queue is simply a linked list per priority. Physically, it’s a contiguous block of memory. Logically, it’s a set of 32 linked lists.
- Offset to Free List
-
(
mq_freep
) Offset to head of the free list. At least one message must be on this list for a write to occur. - Offset to Head Offsets
-
(
mq_headpp
) Offset to the start of the head offsets. - Offset to Tail Offsets
-
(
mq_tailpp
) Offset to the start of the tail offsets. - Current Max Priority
-
(
mq_curmaxprio
) Records the current maximum priority; providing an index into the head and tail offsets, which are required for reading and writing. - Priority Mask
-
(
mq_mask
) A bit-vector tracking priorities that have pending messages. Needed to determine the next highest priority when the current maximum is drained. - Priority N Head & Tail Offset
- The head and tail offset for priority N. Logically they represent pointers to the front, where messages are read from, and the back, where messages are written to, of that priority’s queue.
- Message N
-
The header (
msghdr_t
) and body for message N. Messages are chained via themsg_next
offset found in the header.
That's all of the data structure, now for the functions. I'm going to stay at a high level to avoid rabbit holes. At roughly 1,100 lines, the implementation is very approachable.
Open and Create
The mq_open
function is used for both creation
and opening. To create the O_CREATE
oflag must be
set and four arguments passed: the name (path) of the queue,
the oflags which determine how to open the queue, the mode
bits which represent the permissions to access the queue, and
an optional pointer to an attributes structure for setting
message size and max number of pending messages. ① If the path
is a valid POSIX object name then the first action is to
create the lock file. The lock file acts as a mutex
across all processes, protecting actions inside the
dotted black rectangle from concurrent execution. ② Next, if
the caller cannot access the permission file then it is not
allowed to open the queue. A separate permission file exists
because the data file must be given read-write
permissions in order to modify the queue structure when
performing a send or receive. The permission file’s
permissions are set to the value of the mode
argument. ③ If the caller passed O_CREATE
, and
the queue doesn’t already exist, then create the data file.
Otherwise, open the existing data file. ④ Allocate the message
queue descriptor (mqdes_t
), mmap the data file,
and mmap the queue description (mq_dn
). ⑤ A queue
must be initialized before it can be used. This step calls
the mq_init
function which initializes all
mutexes, sets the basic attributes and offsets, and chains all
messages together to live on the free list. ⑥ At this point
the queue exists and is ready for use. The lock file is
deleted allowing other processes to open this queue. ⑦ In
order for the opening process to remember this queue it must
place the descriptor on its global list of message queues
(mqdest_t *mq_list
). This action is protected by
the exclusive mutex mq_list_lock
represented as
the dotted brown rectangle. Finally, the queue descriptor is
returned to the caller.
Send
The mq_send
function requires four arguments: the
queue descriptor (mqdes
), message pointer
(msg_ptr
), message length (msg_len
),
and priority (msg_prio
). ① The arguments are
validated, write permission is checked, and
then sem_wait
is called
on mq_notfull
. Once a lock on the semaphore has
been acquired, indicating that the queue is not full,
the exclusive mutex lock (mq_exclusive
) is
acquired to enter the critical region shown as the dotted red
rectangle. ② With both space on the queue and exclusive access
the caller may now pull a message from the head of the free
list (mq_freep
) and memcpy
the
user’s message into the body. ③ If this is the priority’s
first message then set the head and tail offsets to this
message, flip the appropriate bit in the mask
(mq_mask
), and change the current maximum
priority (mq_curmaxprio
) if needed. Otherwise, if
a list already exists for this priority, add the new message
to the tail of the list. ④
Call sem_post(mq_notempty)
to notify readers a
message is ready to be consumed. Now that the sender has
finished mutating the queue it may release the mutex and
return 0
for success.
Receive
The mq_receive
function requires four arguments:
the queue descriptor (mqdes
), a pointer to the
buffer to copy the message payload to (msg_ptr
),
the length of the buffer (msg_len
), and a pointer
to write the priority to (msg_prio
)
or NULL
if the priority is of no concern. ① The
arguments are validated, read permission is checked, the buffer
length is verified to be large enough to hold the largest
message, and then the reader waits on
the mq_notempty
semaphore. When the semaphore lock
has been acquired the reader knows there is at least one message
to read and then waits on the exclusive lock
(mq_exclusive
). ② Once in the critical region, the
reader removes a message from the head of the queue for the
highest priority. ③ If this is the last message then turn off
the priority’s bit in the mask (mq_mask
), find and
set the next max (mq_curmaxprio
), and set the tail
offset to NULL
to indicate the list is empty. ④
Copy the message payload into the user’s buffer and place the
message on the head of the free list. ⑤ Post to
the mq_notfull
semaphore to notify writers that a
message is available for write, and release the mutex. Return
the length of the message payload that was written to the user’s
buffer.
Timeouts and Blocking
In the above descriptions of send and receive I assumed the default behavior of blocking with no timeout. However, both functions allow blocking calls with timeouts or non-blocking calls. I’ll discuss timeouts first.
Send and receive have three APIs
each: mq_<op>
, mq_timed<op>
,
and mq_reltimed<op>_np
. The first version,
with no timeout, was discussed in the previous sections. The
second version, mq_timed<op>
, allows the
caller to set an absolute timeout; i.e. timeout once a
specific moment has passed. The third
version, mq_reltimed<op>_np
, allows a
relative timeout; i.e. timeout after a duration has passed. In
both cases the timeout is based on the semaphore being waited
on in step ① of each diagram, before the exclusive mutex is
acquired. The timeout is specified as
a timespec_t
which consist of two numbers: the
number of seconds and nanosecond resolution of that second. An
absolute timeout adds the number of seconds to the Unix epoch.
A relative timeout treats the seconds as a duration.
When opening a queue you may specify O_NONBLOCK
to indicate that send and receive calls should immediately
return EAGAIN
if the operation cannot be
performed at that moment. To perform in a non-blocking manner
the implementation makes use of the sem_trywait
function instead of the blocking semaphore API. Calling the
timeout versions in non-blocking mode will act as if calling
the non-timeout version; i.e. the timeout is ignored.
Close and Destroy
As noted earlier, queues exist separate from the lifecycle of a process. For this to work the actions of closing and destroying must be separated.
A process indicates it is done with a queue by closing it.
The mq_close
function removes the descriptor from
the process list (mq_list
), invalidates the
descriptor, frees the memory, and unmaps the description and
data files. A close does not have any affect on other
processes using the same queue.
A process removes the queue from public view by
calling mq_unlink
to destroy it. A destroyed
queue does not immediately invalidate the queue descriptors of
the other processes. It only prevents future opens from
succeeding. The mq_unlink
function deletes the
permission and data files. It uses the lock file for atomicity
just like mq_open
.
A Note on Notification
The notification feature allows a queue to signal a pid or create a thread to handle the case where a queue transitions from empty to containing at least one message. However, there is a subtle race between readers and explaining the implementation would have lengthened an already lengthy post. This feature also complicates the implementation in a way that irks me. I wager the notification mechanism makes up about half the code, around 500 lines. Bloating both the data structure and the functions that operate on it. There is a smaller, cleaner, more elegant implementation trying to free itself from the shackles of the notification system.
Further Reading
To learn more about illumos internals buy Solaris Internals McD06. It has a section on the queue implementation as well as many other fascinating details.
References
IEEE13 | |
McD06 |
|
Ste99 |