Archive for the ‘Coding’ Category

Lockless Multiple-Producer/Single-Consumer Ring FIFO Queue

Wednesday, February 16th, 2011

Professor.jpg

I'm still on the quest for better performance out of my ticker plants and their clients, and to that end today I spent quite a bit of time on building a multiple-producer/single-consumer, lockless, FIFO ring buffer. The problem I have with my LinkedFIFO queue is that it's constantly doing malloc and free - OK, new and delete, but it's the same thing. The problem is all this thrashing. What I want is to be able to pre-allocate the memory for the buffer and then just use it.

So I spent a lot of time - more than I'd like to have spent, getting this guy right. It wasn't as easy as I'd have hoped, but in the end, I think it's going to be very useful for me.

The hard part was the multiple-producers. No question. The obvious solution to the multiple-producers is to get a concept of the 'new' Tail, put your data at the old Tail, and update the new Tail. You have to check for queue crashes in there, and all this has to be atomic. It's that last part that's the kicker.

What I come up with was the idea that the queue had an insert point. This was not the Tail, because the insert point would be atomically moved by the push() method and then, after the crashing was checked and we've set the value, we can update the tail.

    bool push( T & anElem )
    {
      bool    error = false;
 
      // move the insertion point and get the old value for me
      size_t  insPt = __sync_fetch_and_add(&mInsPt, 1);
      // see if we've crashed the queue all the way around...
      if (__sync_bool_compare_and_swap(&mHead, (insPt + 1), mHead)) {
        error = true;
        cLog.error("[push] the queue is full! Can't push this item");
      } else {
        // save the data in the right spot for the size
        const_cast<T &>(mData[insPt & eBitMask]) = anElem;
        // ...and finally update the 'tail' for a new item
        __sync_fetch_and_add(&mTail, 1);
      }
 
      return !error;
    }

If you move the tail before it's got a value, then it's possible that the consumer can see that the tail has moved, assume that it contains valid data, and process it. Bd move. So it's clear you have to update the tail last. But then how do you know where to put the new data? Hence the insert point.

This is working fine for me, and while it's still pretty new code, I have a feeling it's solid - there just isn't that much to it and the tests I've come up with are pretty inclusive. It's nice to have a choice of efficient FIFO queues to choose from.

Tracking Down a Problem I Already Solved

Tuesday, February 15th, 2011

bug.gif

I hate being dense. Really. In the recent past, I realized that I can't really use the compare-and-swap in a multi-threaded environment like this:

  Node  *old = __sync_val_compare_and_swap(&mTail,
                                           mTail,
                                           newbie);

because it's possible that between the marshaling for the args, another thread would get in there and change the value of mTail so that this atomic operation would fail, but the returned value would not be what I expected it to be. It would be the previous value, but the new value would not be newbie, which is what I wanted.

What I was looking for was simply an exchange - I don't care about the current value, I just want to swap what's there now for something new, and give me the old while I give you the new.

No such luck. There's nothing like that in GCC.

So I'm back to the only thing I can do:

  Node   *t = mTail;
  while (!__sync_bool_compare_and_swap(&mTail, t, newbie)) {
    t = mTail;
  }

and in this we are forced to loop and get a consistent set, but in the end it's still just a swap. Sure do with they had one of those.

Anyway, I'm miffed at myself for spending several hours on this when I'd already solved it. The wrinkle came in the multi-threaded usage of the LinkedFIFO, and while my test cases worked, I didn't really check that there was no corruption in the data. I added those tests to the app and all of a sudden it was clear I had an issue. It took me about 5 mins to fix, and all was well, but finding the problem was hours.

Sometimes, I'm my own worst enemy. However, I will say that I'm getting far more comfortable with all this stuff the more I use it. Guess there's a silver lining in this after all.

Getting Ever Closer to the Goal

Monday, February 14th, 2011

MarketData.jpg

This morning has been better than most, but still challenging. Today I was able to handle the open with my ticker plants, and the client had a latency of less than 368 msec. This is great news. I had expected the upper limit at around 400 msec, and this was based on the tests I've been running with the individual components. At about 100,000 msg/sec from the exchange, a processing time of 0.4 usec equates to 400 msec. So we're probably a little shy of the 100,000 msg/sec, and that was what we were seeing.

Great news.

But it was not without sadness. The multiple-producer/single-consumer linked FIFO was not being a nice citizen, and corrupting the malloc/free heap. It was causing a lot of problems. It was easy to track down as it was a big user of new and delete, and it was newly added to the code. When I added a simple spinlock to the code, everything cleared up, and slowed down. So I went on a little fishing expedition for some other implementations to look at.

What I found was that by starting with a Node, and pointing the head and tail at it, you can really simplify the code. Also, you can segregate the head and tail pointers to their single, respective, methods so that there's no chance of a cross-contamination. Much nicer. I put that in and things are looking much better.

I'll still need to test for several days to be sure, but things are looking a lot better. Next up is to rebuild the two other development machines to be Ubuntu 10.04.1 as I've had some real issues with this code on CentOS 5. I believe it's due to the memory allocation scheme in the kernel, and Ubuntu is a lot newer than CentOS 5. Hard to say. What isn't hard to see is that on CentOS the code won't run for 2 mins, and on Ubuntu, it seems to run forever.

I'm hoping to get these boxes rebuilt today and then all my stuff moved back over to them so I can be working off the set of three for tomorrow's open. That would be a really nice goal.

Using ZeroMQ’s Zero Copy Message Constructor

Monday, February 14th, 2011

ZeroMQ

This morning I was looking at some code that was left over from last week, and was causing a lot of problems with the CentOS build of my ticker plants. The problem was in the zmq::message_t's data() method call, and I was trying to dig into the ZeroMQ code to see what was happening there that might be causing the problems on CentOS and not on Ubuntu.

Nothing really popped out. But at the same time, my existing transmitter code looks something like this:

  try {
    // make a ZMQ message of the right size
    zmq::message_t  msg(aPayload.size());
    // ...copy in the data we need from the payload
    memcpy(msg.data(), aPayload.data(), aPayload.size());
    // ...and WOOSH! out it goes
    aTopic.socket->send(msg);
  } catch (std::exception &amp; e) {
  }

It's a design rule that you need to create a new ZMQ message for each send, and while it's possible to copy messages, I hadn't really dug into it that much up to now. But this morning I decided to see if I could use the zero copy version of the message constructor to minimize the creations on the heap and therefore bring a little stability to the CentOS build.

It turns out to be pretty easy. In my code, I've got one 'buffer' - a std::string that I clear out every message send and fill with the serialized form of the outgoing message. I'm already passing this into the sending method as aPayload, and copying the data out of it into the new message.

For the zero copy to work, we need to have a function that can be called to "free" the memory passed into the ZMQ message's constructor. Since I've got this one buffer, and I'm reusing it over and over again, all this "free" method needed to do was exactly nothing. I just needed to satisfy the ZeroMQ contract.

So I made a static do-nothing method:

  static void payloadRecycle( void *data, void *hint );

and then changed my code to look like:

  try {
    // make a ZMQ message with the payload's data
    zmq::message_t  msg((void*)aPayload.data(), aPayload.size(), payloadRecycle);
    // ...and WOOSH! out it goes
    aTopic.socket->send(msg);
  } catch (std::exception &amp; e) {
  }

At this point, the zmq::message_t is going to create it's structures around this buffer, send out the message, and then call my "free" function. I'll
do nothing, and then we'll return to the loop where I clear out this buffer and do it all again. Very slick!

Unfortunately, this didn't help the CentOS build as the messages appeared to be coming from this call, but they didn't stop. Ubuntu, however, is running strong. Can't figure that out, but I'm switching to Ubuntu anyway, and so it really doesn't matter.

Still... I got rid of the buffer creation and copy, and that's got to help the performace a little.

UPDATE: I had a problem on the receiver side of things. It seems that when I do the zero-copy, ZMQ misses the size of the payload by one or two bytes either side of the actual payload. When I use the memcpy() code, it's just fine. Sounds like a bug in the ZMQ code to me. I'll see if I can find out if they have extensively tested that code.

[9:22] UPDATE: the problem is more subtle - the fact is that the ZMQ method, send(), is not atomic. It will return as soon as it can, and buffer the message to be sent. I, on the other hand, assumed that it was, and so I was refilling the buffer with the next message. Not good. The problem could be solved with a string buffer pool, but then we're copying into that, and the message still needs to be constructed.

I suppose that if I put the string pool in the "filling" part, it'd work as we're only going to fill one string, and that would save a copy. But for now, I know why it's not working, and I'm OK with the old code.

At least I know what's happening and what it takes to really fix it.

Optimization Brings It’s Own Problems

Friday, February 11th, 2011

Linux

Today has been an up and down day... on the one hand, the changes I made yesterday have increased the speed of the ZeroMQ transmitter to new heights, but with that has come other problems - most notably in CentOS 5. Interestingly enough, I'm now seeing real problems with the ticker plants on CentOS 5 when the message rates get to even moderate levels. It's always in the ZeroMQ code, but I've dug into that code, and I can't see what it's doing wrong other than calling malloc() quite a bit.

Interestingly enough, when I run it on Ubuntu 10.04.1, it's fine - so I'm currently inclined to think that it's something int e linux kernel to do with memory allocation/deallocation that was fixed in the later kernel, and simply not available for CentOS 5. We'll have to run more tests on Monday, but for now, I'm at a bit of a loss.

Thankfully, we're moving to Ubuntu, and so long as it runs there, I'm good to go. The only wrinkle is the client - does this run on a CentOS 5 client? Don't know, and we'll have to see next week. It could go either way.

I did a lot of other little optimizations today - things that weren't an issue before, have now become an issue because of the speed of the overall process. Locks on maps that aren't really needed... things like this. I spent the entire day finding these little issues and fixing/updating them, and checking CentOS for a possible fix.

They all worked on Ubuntu, and it seemed that nothing I could do was going to get it running on CentOS again. We'll have to start back at it on Monday.

Multiple-Producer/Single-Consumer Lockless FIFO Queue

Thursday, February 10th, 2011

Professor.jpg

I've been working on trying to get more speed out of the components of my ticker plants as they run fine during the day, but the crush at the open is still something that gets ahead of them. To that end, I wanted to make my conflation queue lockless. The trie that holds the messages is lockless - using just the GCC built-ins for compare-and-swap. But the STL std::deque was certainly not thread-safe, and updating the two together was something I left in the lock as well.

Then I got to thinking about the order of operations in the conflation queue, and I became convinced that I could order things properly and it would be possible t have a lockless conflation queue if I had a lockless FIFO queue that allowed for multiple-producers. As is is now, the only lockless FIFO queue I have is a nice, fixed-size, circular FIFO that doesn't even use compare-and-swap. But that wasn't going to be good enough.

So I started writing. My goal: lockless, multiple-producer/single-consumer, FIFO queue using compare-and-swap. It's all sitting in this gist.

There are a few things I'm really pleased about this class. First, it's a nice, general template class, and the problem with my circular FIFO class is that it uses the volatile keyword to synchronize data between the CPUs. This is nice, as it doesn't use any compare-and-swap operations, but it makes it very hard to put your own classes into the queue. It's best suited for simple values like pointers, etc. Nice and useful, but to have something like this is even better.

The one section of code that I wish were a little better is in the pop() call:

  1. /**
  2.   * This method updates the passed-in reference with the value on the
  3.   * top of the queue - if it can. If so, it'll return the value and
  4.   * 'true', but if it can't, as in the queue is empty, then the method
  5.   * will return 'false' and the value will be untouched.
  6.   */
  7. bool pop( T & anElem )
  8. {
  9. bool error = false;
  10.  
  11. // see if the head is NULL - i.e. empty queue
  12. if (__sync_bool_compare_and_swap(&mHead, NULL, mHead)) {
  13. error = true;
  14. }
  15.  
  16. /**
  17.   * This is interesting... when we pop, we need to see if the
  18.   * head and tail are pointing to the same thing - the only
  19.   * element in the queue. If they are, then the best thing to
  20.   * do is to NULL-out the tail, so push() can work, and then
  21.   * quickly swap out the head so it's NULL as well. Then we
  22.   * have time to do what we need.
  23.   *
  24.   * However, if there are several elements in the queue, then
  25.   * we only need to remove the head and process it.
  26.   */
  27. if (!error) {
  28. // get what I *think* is the head (should be)
  29. Node *h = mHead;
  30. /**
  31.   * If we have the head == tail, then we have only one item
  32.   * in the list. In order to make this a clean break, NULL
  33.   * out the tail so new push() calls can start building the
  34.   * list again.
  35.   */
  36. __sync_bool_compare_and_swap(&mTail, h, NULL);
  37. // now let's move the head to the next in the list
  38. while (!__sync_bool_compare_and_swap(&mHead, h, mHead->next)) {
  39. h = mHead;
  40. }
  41.  
  42. // get the value out of the node and into the arg
  43. if (h != NULL) {
  44. anElem = h->value;
  45. // at this point we're done with this node - delete it
  46. delete h;
  47. h = NULL;
  48. } else {
  49. error = true;
  50. }
  51. }
  52.  
  53. return !error;
  54. }

In lines 29-40 we are trying to deal with the fact that there is one, and only one, value in the queue. This means that the head and the tail point to the same Node. What I wanted to do was to NULL out the head and tail, but save the Node for later extraction. Since I can't atomically swap two variables, I had to do the one that I believe is more critical - the tail, as that's effecting the push() calls. I then very quickly NULL out the head and grab that value for processing.

I may come back to this soon, but for now all my tests look good and I'm very happy with the results. I'm able to have a lockless conflation queue which tests out very nicely, and with this as a general component, I have a lot more flexibility in what I use in the tighter performance sections of the code.

Good day's work.

[2/14] UPDATE: I've been doing a lot of reading on queues like this and it turns out that if you start with an 'empty' Node, the code for push() and pop() gets a lot easier. The complete class is still here, but this is the new push():

    /**
     * This method takes an item and places it in the queue - if it can.
     * If so, then it will return 'true', otherwise, it'll return 'false'.
     */
    bool push( const T & anElem )
    {
      bool    error = false;
 
      // create a new Node with the provided value
      Node   *me = new Node(anElem);
      if (me == NULL) {
        error = true;
      } else {
        /**
         * We need to add the new value to the tail and then link it
         * back into the list. Not too bad.
         */
        // put in the new tail, and get the old one
        Node  *oldTail = mTail;
        while (!__sync_bool_compare_and_swap(&mTail, oldTail, me)) {
          oldTail = mTail;
        }
        // OK, make sure that the list remains intact
        if (oldTail != NULL) {
          oldTail->next = me;
        }
      }
 
      return !error;
    }

and pop() becomes:

    /**
     * This method updates the passed-in reference with the value on the
     * top of the queue - if it can. If so, it'll return the value and
     * 'true', but if it can't, as in the queue is empty, then the method
     * will return 'false' and the value will be untouched.
     */
    bool pop( T & anElem )
    {
      bool    error = false;
 
      // if the next guy is NULL, we're empty
      if (__sync_bool_compare_and_swap(&(mHead->next), NULL, NULL)) {
        error = true;
      } else {
        // move the head to the next Node and get the value
        Node  *oldHead = __sync_val_compare_and_swap(&mHead, mHead, mHead->next);
        anElem = mHead->value;
        // if there is an oldHead (should be), delete it now
        if (oldHead != NULL) {
          delete oldHead;
        }
      }
 
      return !error;
    }

Of important note is that we're now only touching the mTail in the push() method, and only touching the mHead in the pop(). This is much cleaner than the previous implementation, and in addition, there are no more "spin to replace" loops as we don't care in this scheme. Much cleaner.

Hard to Find a Good Fine-Grained Timer on Linux

Wednesday, February 9th, 2011

Linux

I'm looking at some logs of a client of my ticker plant. I'm trying to iron out the last few bugs in the system, and I'm coming across the following data that's making no sense whatsoever. Basically, every 10 sec, my client is printing stats on the messages it's received. Things like messages received per second (on average), messages conflated, the size of the conflation queue, the min, max, and average delay of the messages as timestamped when they arrive from the exchange to the time they are received by my client code.

What I'm seeing is something like this:

Time Messages
(/sec)
Conf Msgs
(/sec)
Conf Queue
(msgs)
Max Delay
(msec)
13:30:47.01 4083.4 550.6 0 838.6
13:30:57.01 6561.7 547.6 24 13459.0
13:31:07.01 5170.2 797.5 0 410.7

My concern is about the max delay - the last column, and for the middle time slice. It's showing that on an empty conflation queue, this interval had a maximum delay from the source of 13 sec.! That's longer than the sampling interval. It's as if it was hiding up stream!

I initially suspected the time base I was using. Originally, I was using gettimeofday() as it's pretty universal, and calculating the microseconds since epoch with:

  struct timeval  tv;
  gettimeofday(&amp;tv, NULL);
  return (tv.tv_sec * 1000000 + tv.tv_usec);

Then I read on stackoverflow that this isn't a very good timebase, and I switched to the supposedly more stable clock_gettime():

  struct timespec  tv;
  clock_gettime(CLOCK_REALTIME, &amp;tv);
  return (tv.tv_sec * 1000000 + tv.tv_nsec/1000);

I'm not sure if this is going to be a lot better, as I think there's a problem still lurking in my code, but it's a start. Getting a good timer is important when you're timing things this close.

Google Chrome dev 10.0.648.45 is Out

Wednesday, February 9th, 2011

V8 Javascript Engine

This morning I noticed that Google Chrome dev 10.0.648.45 was out with a few really nice additions. Among them are the latest V8 engine (3.0.12.12) and Flash (10.2), but also "many crash fixes". Good enough for them. I'm not using Flash anywhere else on my MacBook Pro, so it's my one place to view Flash content - and I use it every now and then, but not often.

When I restarted it, it was amazingly fast, and I was jazzed for the first time in a long time. Wouldn't this be great! I thought. But after a bit it was back to 'normal' speed. Guess Comcast was just speedy then.

Rebalancing the Multicast Channels Seems to Reduce Errors

Tuesday, February 8th, 2011

ZeroMQ

I've been trying to track down some ZeroMQ errors I've been seeing today as I try to build a clean multi-threaded client for those folks that want more performance out of the ticker plant client. It's a decent design in that it devolves to the same client as I have now if you only create one client. The zmq::context_t is essentially a process singleton, by design, so I put that as a static class variable on my ZMQ receiver class, but then each receiver class has it's own zmq::socket_t instance which is what you connect() to the URLs that are the reliable multicast channels.

It all makes sense on paper, but I have been seeing a lot of errors that seem to be coming from ZeroMQ, and that's just pretty unlikely in my experience. So I started to wonder if there was some external factor that was causing ZeroMQ to have these problems. I had initially cut up the multicast channels into 27 per message type: A through Z and 'other'. I decided that OPRA had a balanced scheme for 48 channels, and that couldn't be worse, so I reconfigured my channels to match OPRA's.

As it's running, it's certainly doing better than before - even if it's not perfect, it's better. I'll have to keep an eye on it, and get the latest ZeroMQ to see if they have fixed any issues in the latest version on github. I know we're going to need something as ZeroMQ is a bit flakey still. But to be fair, I'm pushing it pretty hard.

UPDATE: OK, not perfect... we're still seeing errors, but it's better. Still have something to hammer on... I'll get it - eventually.

Nice Re-Org of Client Class Allows Users to Balance Load

Tuesday, February 8th, 2011

GeneralDev.jpg

This morning I was working on how to decrease the latency of my ticker plant client for very large subscription sets when it hit me. In my original design, I had a single ZeroMQ receiver that fed any number of subscribed clients. That's OK for small sets, and maybe that's OK for large sets with conflation, but I wanted something that allowed the user a lot more flexibility.

So I stopped making the ZeroMQ receiver a static ivar of the client class, and made it a regular ivar of the client. I then changed the ZeroMQ receiver to make it's amq::context_t a singleton in the process (static ivar of the ZeroMQ receiver class) so that I could control the thread usage there. There will still be a single zmq::socket_t for each receiver, and since the receivers are paired with the clients, we'll have the ability to use the filtering of the reliable multicast on the individual client subscriptions.

Now, if you want to subscribe to a large dataset you can. It'll all come to the one client. But if you want to handle things a little more quickly, subscribe to smaller groups, and each client will get just that portion of the whole that it asks for, but will have the threading to be able to take it as fast as it can.

Controllable speed. If you don't need it, just subscribe to a big block. But if you need it, it's there. This is a great little compromise that should give the users a lot more flexibility than they have now.

Excited about looking at the results at the open. Should be exciting.