On the Importance of Queueing

Four other Oysters followed them,
And yet another four;
And thick and fast they came at last,
And more, and more, and more…

Queues are an important and often overlooked tool in the developer’s toolbox. At Panda Strike, we use queues in a variety of ways to improve the performance and reliability of the applications we build.

Where Queues Are Useful

Two common scenarios for queues:

A Simple Queue

There are various kinds of queues: FIFO queues, LIFO queues (also known as a stack), priority queues, durable queues, and so on. Let’s start with a simple FIFO (first-in, first-out) queue.

tasks = do (data=[]) ->
  enqueue: (task) -> data.push task
  dequeue: -> data.shift()

Here’s a simple example of using a queue:

tasks.enqueue -> console.log "hello, world"
tasks.enqueue -> console.log "goodbye, world"

do tasks.dequeue() # prints hello, world

Pretty simple isn’t it? This is one reason to get into the habit of thinking in terms of queues: they don’t add much complexity to your code. You might as well just incorporate queues into your design from the start.

A Simple Web API, Using Queues

Of course, you may just as well be thinking, how does this help us? All we accomplished in the example above was come up with a complicated way to call a function. What isn’t as obvious is that we’ve separated the what from the when and even the where.

To demonstrate this idea, consider a simple HTTP server that decides on an action to be performed—the what—and enqueues it.

(require "http").createServer ({method, url}, response) ->
  switch method
    when "GET"
      switch url
        when "/hello"
          tasks.enqueue {response, action: "hello"}
        when "/goodbye"
          tasks.enqueue {response, action: "goodbye"}
.listen 8080, ->
  console.log "Listening on port 8080"

Here are the actions that we support. When these functions run, we’re performing the action—this is the when.

actions:
  hello: -> "Hello, World"
  goodbye: -> "Goodbye, World"

We also need something to get things out of the queue. (In real life, we’d avoid checking an empty queue.)

do process = ->
  task = tasks.dequeue()
  if task?
    result = actions[task.action]()
    task.response.end result
  setImmediate process

Separation of the What, When, and Where

Obviously, this is a very simple-minded request processor. But we’ve separated figuring out what we need to do to fulfill a request from when we’re going to do it. Instead of just dropping whatever we were working on to deal with the latest incoming request, we can simply take note of the request and resume what we were doing.

A Queue-Based Worker

But we can also change the where: we can offload the request processing to an entirely different machine. Here’s a simple worker that accepts tasks over a socket.

lines = require "byline"

(require "net").createServer (socket) ->
  lines(socket).on "data", (action) ->
    tasks.enqueue {socket, action}
.listen 8181, ->
  console.log "Listing on port 8181"

We process the socket’s stream a line at a time, using the byline NPM module. Each line is a task, and we enqueue it accordingly.

We can now move our actions object and our process function over to the worker process. We’ll make one small change to the process function, to reference a socket property instead of the response property.

do process = ->
  task = tasks.dequeue()
  if task?
    result = actions[task.action]()
    task.socket.end result
  setImmediate process

If we run that and telnet to port 8181, we can enqueue tasks and get responses.

Queuing Tasks for the Worker

All that remains is to provide a client proxy with a queuing interface so that our HTTP server can send tasks to the worker. (Again, in real life, we’d probably keep the socket open. Or we’d use a messaging queuing library.)

tasks = do (data=[]) ->
  enqueue: (task) ->
    socket = (require "net").connect port: 8181, ->
      socket.write "#{task.action}\n"
      socket.pipe task.response

We swap this in for the original queue (which now resides in our worker) and we’ve now offloaded our request processing into another process entirely.

And we haven’t changed our HTTP request handling code at all. That is, we’ve changed the when and the where without affecting the what.

Here’s the worker code.

The Worker

# this is the _when_: we actually execute the desired
# code here...
tasks = do (data=[]) ->
  enqueue: (message) -> data.push message
  dequeue: -> data.shift()

actions =
  hello: -> "Hello, World"
  goodbye: -> "Goodbye, World"

do process = ->
  task = tasks.dequeue()
  if task?
    result = actions[task.action]()
    task.socket.end result
  setImmediate process

lines = require "byline"

# This is the _where_: we're running server that
# encapsulates the code, allowing us to run it
# in a different process or machine
(require "net").createServer (socket) ->
  lines(socket).on "data", (action) ->
    tasks.enqueue {socket, action}
.listen 8181, ->
  console.log "Listing on port 8181"

And here’s our HTTP server.

The HTTP Server

# The client proxy, with a convenient queueing interface
tasks = do (data=[]) ->
  enqueue: (task) ->
    socket = (require "net").connect port: 8181, ->
      socket.write "#{task.action}\n"
      socket.pipe task.response

# Here's the _what_, where we describe the request and
# queue it up. This code doesn't change regardless of
# whether we implement these actions locally or remotely
(require "http").createServer ({method, url}, response) ->
  switch method
    when "GET"
      switch url
        when "/hello"
          tasks.enqueue {response, action: "hello"}
        when "/goodbye"
          tasks.enqueue {response, action: "goodbye"}
.listen 8080, ->
  console.log "Listening on port 8080"

Again, once we introduced queuing, we no longer need to change the request handling code, even when we moved the implementation into a different process, or even a different machine. Just as importantly, our request handling code isn’t much more complex than it would have been without using queues.

Benefits of a Queue-based Interface

Obviously, this is a naive implementation of a queue-based worker. In real life, we’d want to use a more sophisticated queueing implementation, probably based on queuing technologies, like ZeroMQ or RabbitMQ. We often use Redis, which isn’t a message queue, but can be used to implement one. The main point here is that we could use these technologies without appreciably changing our request handling. We could add everything from retry logic to durable messages and still use the same queuing interface.

This is a great strategy for making your applications more performant, reliable, and elastic.

Of course, queues aren’t a cure-all. Queuing can introduce problems. Probably the worst of these is simply running out of memory to put more tasks in a queue. This can happen when things are getting placed into the queue faster than they’re being taken out. (Ironically, the solution to this is usually more queuing.) But getting into the habit of using queueing interfaces is never a bad thing by itself. It costs you very little because queues, in principle, are very simple things, and buy you a lot of flexibility.

The Rule

If it’s distributed, or could become so, code to a queue-based interface.

Discussion on HackerNews.