Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Meditations on Writing a Queue, Part 2

DZone's Guide to

Meditations on Writing a Queue, Part 2

Welcome back! In the final part of this series, we'll discuss queue pop and how to use Ruby to implement your queue. Enjoy!

· Web Dev Zone
Free Resource

Never build auth again! Okta makes it simple to implement authentication, authorization, MFA and more in minutes. Try the free developer API today! 

Queue Pop

Now that we have data in our queue, we need a way to get data out of it. Introducing tiny_queue_pop

void *tiny_queue_pop(tiny_queue_t *queue) {
  pthread_mutex_lock(&queue->mutex);
    while(queue->head == NULL) { // block if buffer is empty
      pthread_cond_wait(&queue->wakeup, &queue->mutex);
    }

    struct tiny_linked_list_t* current_head = queue->head;
    void *data = current_head->data;
    if(queue->head == queue->tail) {
      queue->head = queue->tail = NULL;
    }
    else {
      queue->head = queue->head->next;
    }
    free(current_head);
  pthread_mutex_unlock(&queue->mutex);

  return data;
}

This function takes a tiny_queue_t pointer as an argument and returns an untyped pointer (void *).

The first thing we do is try to acquire a lock to the mutex. If an element is being added to the queue, this code will wait until the lock is released. The next thing that happens is interesting:

while(queue->head == NULL) { // block if buffer is empty
  pthread_cond_wait(&queue->wakeup, &queue->mutex);
}

While the head of our queue is NULL it indicates that there is nothing in the queue. When this occurs we tell this code to go to sleep by calling pthread_cond_wait. This will release the lock and wait until someone triggers the condition variable, in this case, named queue->wakeup.

Remember when we pushed data to the queue we triggered pthread_cond_signal? That code sends a signal to tell anyone that is listening that they can wake up and start processing again. You can either wake up one listener or ALL listeners (via broadcast), in this case, we’re only waking up one at a time since we’ve only enqueued one element into the queue.

What this does is it allows a thread that is trying to pop something off of the queue to go to sleep and not burn CPU time trying to pop things from an empty queue. Once we add something on to the queue, we signal to any sleeping threads that 1 element is in the queue and it can start processing.

One thing to note is that we are using a while and not an if clause when checking for an empty queue. We do this on the off chance that between the time the signal was triggered and the code runs, the queue is empty again.

Let’s say there was something in the queue, or our code was woken up via a push. The next thing we do is grab our head instance and pull our data pointer off of it:

struct tiny_linked_list_t* current_head = queue->head;
void *data = current_head->data;

We’re creating a variable called current_head that is a pointer to the linked list element currently at head. From there we pull out the pointer to whatever we pushed onto the queue in a variable named data.

When we push things on the queue, we add them to the end (or tail). When we pop them, they come off the top (or head). We need to check to see if we have a 1 element queue:

queue->head == queue->tail

If that’s the case, then we set head and tail both to NULL, since after we pop 1 element off of a 1 element queue there will be nothing left.

If there is more than one element then we have to move the head pointer:

queue->head = queue->head->next;

We are setting the current head pointer to the next element in the linked list. This means that the second element now becomes the first.

Finally, since we allocated a list element in the push via a malloc we have to deallocate it with a call to free:

free(current_head);

We’re only freeing our list element, not the data pointer on the list, which we will return. The last thing is to unlock the mutex so that other threads can push or pop. Note that we do not signal to our condition variable here because popping an element off of the queue does not indicate a change in state that is actionable by a reader or a writer (push or pop call).

Lastly, we return the pointer to the thing we put in the queue:

return data;

We’re done! Told you that wasn’t bad. What you’re left with is a simple interface, the ability to create a queue, push, and pop. I wrote some examples of usage on my GitHub page. You can view the code.

  • hello.c pushes strings onto a queue and pops them off.
  • hello_struct.c pushes arbitrary structs onto a queue and pops them off.
  • hello_threads.c pushes numbers onto a queue and has different threads perform work on the numbers.

Ruby Queue Implementation

Ruby is written in C, and one goal of learning C for me is to possibly contribute to Ruby. I thought it would be interesting to compare my implementation of a Queue to how Ruby does these three operations.

First off, I was surprised to find that as recently as Ruby 2.0, the Queue was written in Ruby (instead of C). Click on the Queue docs for Ruby 2.0 then “toggle source.”

In 2.4.1 it is written in C and points to thread_sync.c. I’m actually going to look at the most recent implementation on trunk (Ruby uses trunk instead of masterbranch). Here’s a link to the code I’ll be reviewing.

The C code looks a bit different than mine because the interface is intended to be consumed by Ruby and not another C code. A VALUE, for example, is not a C type but one that Ruby can understand.

Here is the code to push an element onto the queue:

static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);
    int should_block = szqueue_push_should_block(argc, argv);

    while (queue_length(self, &sq->q) >= sq->max) {
  if (!should_block) {
      rb_raise(rb_eThreadError, "queue full");
  }
  else if (queue_closed_p(self)) {
      goto closed;
  }
  else {
      struct queue_waiter qw;

      qw.w.th = GET_THREAD();
      qw.as.sq = sq;
      list_add_tail(&sq->pushq, &qw.w.node);
      sq->num_waiting_push++;

      rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
  }
    }

    if (queue_closed_p(self)) {
      closed:
  raise_closed_queue_error(self);
    }

    return queue_do_push(self, &sq->q, argv[0]);
}

The pointer to the queue is not being passed in, instead, it is being determined from self which is the execution context (since Ruby is object-oriented):

struct rb_szqueue *sq = szqueue_ptr(self);

You can see that their queue is bounded:

while (queue_length(self, &sq->q) >= sq->max) {

There is a max value and while you’re trying to push a value to the queue in an async fashion then an exception will be raised if you’re past that limit.

Otherwise, if you’re pushing and allowing the call to block, then it looks like the element will be added to the end of a waiting queue? I’m not totally sure what’s going on here:

struct queue_waiter qw;

qw.w.th = GET_THREAD();
qw.as.sq = sq;
list_add_tail(&sq->pushq, &qw.w.node);
sq->num_waiting_push++;

rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);

Then at the very end, there is a call to queue_do_push. If you look at that method:

static VALUE
queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
{
    if (queue_closed_p(self)) {
  raise_closed_queue_error(self);
    }
    rb_ary_push(check_array(self, q->que), obj);
    wakeup_one(&q->waitq);
    return self;
}

This looks a lot like our code for push. They check to see if the queue is “closed,” a behavior that’s not implemented in my queue.

If it’s not, they add the element onto the end of an array. Functions inside of the Ruby source code are prefixed with rb_ if they’re exposed. So this function call rb_ary_push is the same as when you call [].push("foo") in your Ruby code.

Notice in this code they don’t have to mess around with pointers and heads and tails, that’s because they already have a list structure (implemented as an Array) that they can use.

Once the element is added to the array’s tail then a condition variable signal is sent to wake up any blocked threads.

wakeup_one(&q->waitq);

One thing you might notice is that there are no mutexes in this code. There’s no locking or unlocking. That is because instead of having a lock in each method, there is a global lock on the entire Ruby interpreter. This is called a GIL or a GVL and Python has a similar concept. This lock prevents two threads from running Ruby code at the same time. This means that only one thread could be operating on the Array at a time.

A GIL will not totally protect you from thread safety issues, as there are a number of operations that are not atomic, for instance @foo ||= 2 or @num += 1 can fail because they are expanded by the interpreter. Also, Ruby does allow thread switching (and yes, it uses native threads) when IO is performed, such as a disk read or a network call (like a database query). So threading is still important.

I’m guessing that Ruby’s C calls are atomic so by putting all that code within queue_do_push means that all those operations happen in one atomic chunk: check for closed, add element, signal to blocked threads.

This is one of the benefits of having a GIL, from an implementer’s perspective, it makes coding extremely easy because you don’t have to worry about wrapping everything in a call to lock and unlock.

This is interesting to me because, at my second RubyConf in Denver, I remember someone asking Matz if we could ever get rid of the GIL. His response was one of shock and horror. I think he basically said “no.” After digging in, I can understand a bit more now that it’s not just some flag that needs to be unset, but, rather, the entire codebase would need to be re-written to be threadsafe, which would be an extremely hard effort for any organization.

This makes effort’s like Koichi’s work on “guilds” or another concurrency model even more interesting. Essentially, the idea is that instead of getting rid of the GIL, we can have multiple GILs without having to spawn multiple processes. Each “guild” would essentially behave like a cross between a process and a thread. I’ve always thought of them as a process with a resource sharing model.

Wrapup

If you’ve made it this far, congrats. This was some pretty dense stuff. I do have one tip which I want to leave readers if you’re working with queues. This is a common “trick” that is not very intuitive if you’ve never worked with threads. The idea is that if you need to tell your workers when to shut down, you also need to wake them up since they’re blocked at the pop call. You can do this with a “poison” object. In Ruby it looks like this:

require 'thread'

GLOBAL_QUEUE = Queue.new
POISON       = Object.new

threads = 10.times.map do
  Thread.new do
    loop do
      work = GLOBAL_QUEUE.pop
      break if work == POISON
      puts "Working on #{work}"
    end
  end
end

20.times do |i|
  GLOBAL_QUEUE.push(i)
end

10.times do
  GLOBAL_QUEUE.push(POISON)
end

threads.map {|t| t.join }

Here we create a unique object and assign it to a constant of POISON then when we pop an element from the queue, we check to see if we have that special object and exit. If you know how many threads you have looping infinitely then you enqueue that same number of poison objects to stop all of them. Since the poison goes in the end of the queue, you can be sure that the queue is drained before the workers will shut down.

I also recommend Working with Ruby Threads if you’re new to concepts like threads, queues, and mutexes.

I’m having a blast writing C code. While it does take me 20 hours to do something that would take 20 minutes in Ruby, it’s a huge accomplishment when I get something to work. It’s also neat not having the crutch of a huge standard library with pre-made data structures for me. The real payoff, though, is that the more I learn about C, the less foreign and unapproachable the source code behind Ruby becomes. I’m not saying that everyone should learn C, but if you’re looking for a challenge in a language that’s extremely fast and used all over the world, it’s not a bad language to be familiar with.

Launch your application faster with Okta’s user management API. Register today for the free forever developer edition!

Topics:
web dev ,ruby ,queues ,c

Published at DZone with permission of Richard Schneeman, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}