Unifying Caching and Building Conflation Queues

GeneralDev.jpg

Today I wanted to just build a conflation queue and integrate it into a few of my endpoints in the message flow so that slow consumers, or very fast producers, don't overwhelm the system. It's a simple design: a queue with a map that are linked so that the map can quickly tell the insert routine if the message already exists in the queue, and so just the contents are replaced. It's standard stuff, but there are always a few tricks.

In my case, I wanted to have the uniqueness of the messages dictated by the type, and unsigned 8-bit integer, and a conflation key, a 64-bit unsigned integer that is returned by each message indicating the uniqueness of the message within that family of message. I decided to use a simple STL std::pair as it could be used in the std::deque as well as the std::map as a key.

When I got into the code, though, I realized that there were plenty of places I wasn't doing any kind of buffering - specifically, the TCP endpoints. So I had to go into those components and put in a simple byte-level buffer with an additional thread for de-spooling. That took time. Then I saw that there were a few more that needed some work, and pretty soon the entire morning was gone.

Finally, though, I got all the components working and testing just fine - which wasn't as easy as it used to be with additional threads running around. In fact, it was a mess of segmentation faults until I changed the way I was terminating the unspooling threads. But I got it all going.

The afternoon was devoted to getting the conflation queue working and tested out. It's nice, and it should be pretty fast, but I'll have to wait and see how it tests out for performance when it's getting hit with some of these exchange feeds. But hey, that's the point - we need to be able to decouple the producers from consumers with this conflation queue.

Tomorrow is going to be putting these into some components.