Gauche Devlog

< Better REPL? | Blowfish password hashing >

2010/04/26

Enhanced queue

util.queue module (ref:util.queue) has been completely rewritten for 0.9.1.

The main motivation is to support thread-safe queue (mtqueue). Last couple of years I had quite a few projects that needed to fully utilize multiple cores, and for almost every one of them I ended up writing some kind of thread-safe queue. It is a simple but robust way as a synchronization primitive. Naturally the feature is a strong candidate for Gauche to provide natively.

Writing a thread-safe queue is seemingly a simple task. Yet it took some time for me to come up an API I feel right.

Just protecting access to a queue by a mutex makes the queue thread-safe, but it doesn't make the queue usable for synchronization; we want a consumer thread to block when it finds the queue is empty, and to resume automatically as soon as some data is available in the queue.

Providing a simple-minded thread-safety, that just protects access by a mutex, isn't a good strategy. It doesn't help much to build synchronization operations, since the latter needs a condition variable to cooperate with the mutex of the queue. Such a strategy is a dead-end of abstraction; it does abstract something, but does not useful to build more abstractions on top of it.

So, we use a condition variable and let a consumer thread block on an empty queue. Is that enough? Not really.

If a producer thread fails for some reason, the consumer thread would wait indefinitely. It is usually unacceptable in the production code; the consumer thread must detect abnormality and try to recover, report to supervisor, or shut itself down gracefully. There's also a situation that a consumer thread fails; in such case a producer thread would just keep pushing the data into the queue until it exhausts memory, which is also undesirable.

I explored some options. In one option, a queue had low and high water marks, and a callback was called when the length goes below the low mark or above the high mark, and it adjusted the pace of either a consumer or a producer threads. It worked for the project, but I felt it too complicated for general use.

Finally I ended up on a simple concept of timeouts, plus optional upper-bound of the queue length. The API is parallel to other timeout-able thread operations so that it's easy to remember, and I think it is general enough so that more sophisticated stuff can be built on top of it.

Here's an outline of synchronizing mtqueue operations. See the manual in svn trunk if you are curious to know the details.

  • make-mtqueue :key max-length
    Returns a thread-safe queue, with optional upper bound of the queue length.
  • dequeue/wait! mtqueue :optional (timeout #f) (timeout-val #f)
    Pop an item from mtqueue, or block if the queue is empty, with optional timeout. The semantics of timeout and timeout-val are the same as thread-join! etc. (ref:thread-join!).
  • enqueue/wait! mtqueue obj :optional (timeout #f) (timeout-val #f)
    Push obj into the queue, or block if the queue has maximum number of elements, with optional timeout.

Note that there are also dequeue! and enqueue! that do not block. They work on both mtqueue and normal (non-thread-safe, but cheap) queue. dequeue! throws an error by default if the queue is empty, though you can suppress it by providing a fallback value.

Tags: 0.9.1, util.queue

Post a comment

Name: