Unix Background Queue
For a side-project to be run on a single machine I needed a background queue. I like self-contained software like sqlite, but I didn’t know of any self-contained background queue. They usually rely on some kind of broker, whether that is Redis or a database. I decided it would be fun to write one! Here’s the weekend story of toying with Unix, Ruby C extensions, MRI and Ruby to create localjob.
Unix inter-process communication
To engineer my self-contained solution I looked into Unix’s IPC functionality, the classics include:
- Files. Persistent, would require handling file locking to work for this case.
- Signals. Only for commands, no information passing.
- Sockets. Good choice, but would require self-handling of persistence.
- Named pipes. Producers block until consumed, do not persist.
- Shared memory. Not persistent. Requires using semaphores or something similar to avoid race conditions.
I stumbled upon the POSIX message queue during my research, which has everything I was looking for:
- Persistent. This gives the ability to push messages to the queue while nothing is listening, and the simplicity of being able to restart workers without maintaining a master transition. Note it’s only persistent till system shutdown.
- Simplicity. Locks and most race conditions are handled for me.
- File descriptors. On Linux (not guaranteed by POSIX) the message queue is
implemented as file descriptors, this means it’s trivial to add multiple
queue support. It’s simply a matter of listening on a file descriptor for
each queue and act when something happens, as
select(2)
does.
Creating a Ruby wrapper for the POSIX message queue
Ruby’s standard library does not provide access to the POSIX message queue, which meant I’d have to roll my own with a Ruby C extension.
POSIX message queue provides blocking calls like mq_receive(3)
and
mq_send(3)
. In Ruby, threads are handled by context switching between threads,
however, with blocking I/O not handled correctly a thread can block the entire
VM. This means only the blocking thread, which does nothing useful, will run.
To handle this situation you must call rb_thread_wait_fd(fd)
before the
blocking I/O call, where fd
is the file descriptioner. That way the Ruby
thread scheduler can do a select(2)
on the file descriptioners and decide
which thread to run, ignoring those that are currently waiting for I/O. Below is
the source for a function to handle this in a C extension.
VALUE
posix_mqueue_receive(VALUE self)
{
// Contains any error returned by the syscall
int err;
// Buffer data from the message queue is read into
size_t buf_size;
char *buf;
// The Ruby string (a VALUE is a Ruby object) that we return to Ruby with the
// contents of the buffer.
VALUE str;
// posix-mqueue's internal data structure, contains information about the
// queue such as the file descriptor, queue size, etc.
mqueue_t* data;
// Get the internal data structure
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
// The buffer size is one byte larger than the maximum message size
buf_size = data->attr.mq_msgsize + 1;
buf = (char*)malloc(buf_size);
// We notify the Ruby scheduler this thread is now waiting for I/O
// The Ruby scheduler can resume this thread when the file descriptioner in
// data->fd becomes readable. This file descriptioner points to the message
// queue.
rb_thread_wait_fd(data->fd);
// syscall to mq_receive(3) with the message queue file desriptor and our
// buffer. This call will block, once it returns the buffer will be filled
// with the frontmost message.
do {
err = mq_receive(data->fd, buf, buf_size, NULL);
} while(err < 0 && errno == EINTR); // Retry interrupted syscall
if (err < 0) { rb_sys_fail("Message retrieval failed"); }
// Create a Ruby string from the now filled buffer that contains the message
str = rb_str_new(buf, err);
// Free the buffer
free(buf);
// Finally return the Ruby string
return str;
}
It was a fun experience creating a Ruby C extension. A lot of grepping in MRI to find the right methods. Despite being undocumented, the api is pretty nice to work with. The resulting gem is posix-mqueue.
Localjob is born
With access from Ruby to the POSIX message queue with posix-mqueue, I could start writing localjob. Because the POSIX message queue already does almost everything a background queue needs, it’s a very small library, but does a good bunch of the things you’d expect from a background queue! I’ll go through a few of the more interesting parts of Localjob.
Signal interruptions
To kill a worker you send it a signal, localjob currently only traps SIGQUIT
,
for graceful shutdown. That means if it’s currently working on a job, it won’t
throw it away forever and terminate, but will finish the job and then terminate.
It’s implemented with an instance variable waiting
. If the worker is waiting
for I/O, it’s true. In the signal trap if waiting
is true it’s safe to
terminate. If not, it’s currently handling a job, and another instance variable,
shutdown
, is set to true. When the worker is done processing the current job
it’ll notice that and finally terminate. Simple implementation that doesn’t
handle job exceptions and multiple queues:
Signal.trap "QUIT" do
exit if @waiting
@shutdown = true
end
@shutdown = false
loop do
exit if @shutdown
@waiting = true
job = queue.shift
@waiting = false
process job
end
Multiple queues
I mentioned before that POSIX message queues in Linux are implemented as file
descriptors. This comes in handy when you want to support workers popping off
multiple queues. We just call select(2)
on each of the queue file descriptors,
and that call will block until one of queues is ready for read, which in this
context means it has one or more jobs.
This can lead to a race condition if multiple workers are waiting and one pops
before another. To handle this, instead we issue a nonblocking call
mq_timedreceive(3)
on the file descriptioner returned by select(2)
.
posix-mqueue
for that method will throw an exception if receiving a message
would block, which it would in the case that another worker already took the
job. Thus we can simply iterate over the descriptors and see which one doesn’t
block, and therefore still has a job for the worker:
def multiple_queue_shift
(queue,), = IO.select(@queues)
# This calls mq_timedreceive(3) via posix-mqueue
# (wrapped in Localjob to # deserialize as well).
# It'll raise an exception if it would block, which
# means the queue is empty.
queue.shift
# The job was taken by another worker, and no jobs
# have been pushed in the meanwhile. Start over.
rescue POSIX::Mqueue::QueueEmpty
retry
end
Localjob and posix-mqueue
Localjob and posix-mqueue are both open source, let me know if have any interesting ideas for the projects or if you are going to use them!
You might also like...