Completed the UDP Receiver for DKit

DKit Laboratory

This afternoon I finally finished the UDP receiver for DKit - and it's everything I'd hoped for in this incarnation. When I started all this work, I wanted to make something that was easy to use, free, and something I could build on easily should I ever find myself in the same place where I needed to create an exchange feed system. Some of the limitations that I've run into in the past are:

  • Too Much in the Receiver Class - this happens all the time at a job: you want to do it "right", but there's no time. So as soon as you get something going, you add to it, as opposed to taking the time to see where it's appropriate to cut the design off and leave the component alone. In the past, this has really hurt the complexity of my UDP receivers.
  • Too Much Copy-n-Paste Reuse - as with the lockless containers and pools, I ended up not working hard enough to make the important template classes, and rather just hammered out code that worked - regardless of the line count. That's a horrible mistake to make.
  • Queueing in the Wrong Places - because of these other two mistakes, I have ended up putting in queueing in the wrong places. This makes things far less reusable as there is a built-in limitation that can't easily be overcome. In the past, this has even further introduced complexity as I've tried to make these queues "switchable".

So this time I was trying to really get it right. And I think I'm a lot closer now than I ever have been in the past. This new implementation in DKit has all the things fixed that I didn't like about my previous versions: better templates, less lines of code, and most importantly: it's the right tool for the job.

Specifically, there are several features in this version that make many of the limitations and problems of the last no longer issues at all:

  • The ability to have any number of receivers on the same service thread - this was a problem of context switching with all the threads doing a lot of waiting. In the old version, I had one thread for each pair of UDP sockets, and that was far too few. I could have easily gone up to 15 or 20, but the architecture of the receivers didn't allow the sharing of the boost::io_service instances like DKit allows. This will make it much easier to effectively load up a box and not pay a high price for context switching.
  • The ability to have any type for a source and sink - the old implementation had a single type of message moving between all sources and sinks, and while this may seem OK, it really meant that I had to have a source generate a message - it couldn't generate a datagram. The ability to have the source generate a datagram means that I can leave that component alone and now make components that sink datagrams and source messages. These are then universal in what they do, and we don't have to worry about putting too much logic into any one component.
  • The minimal feature set helps efficiency and performance - there's a lot to be said for a simple component that does one thing very well, and the templates make the compile time a little long, but in the end, it's incredibly flexible. I've always said that the best designs are the simplest designs, and this is certainly true of the receiver as it exists now. There will be no need for subclassing this guy - he's done. I now just need to make the combined sink/source to act as the subclass for those components that will mutate/translate/convert the output of one source to the input of the necessary sink. This design is far better than what we had.

The usage of this is pretty simple. You first need to make a subclass of the sink and have that call out to a method of the right type. Then you just need to create one, start it listening, and you're there:

  /**
   * This is the tests for the udp_receiver
   */
  //  System Headers
  #include <iostream>
  #include <string>
 
  //  Third-Party Headers
 
  //  Other Headers
  #include "io/udp_receiver.h"
  #include "sink.h"
  #include "util/timer.h"
 
  using namespace dkit::io;
 
  /**
   * I need to have a subclass of sink<T> that's going to handle my messages.
   * Because it's still a template class, I need to call out to a specialized
   * method. It's all still in the class, but it's required for the virtual
   * template class to actually work.
   */
  template <class T> class MySink :
    public dkit::sink<T>
  {
    public:
      MySink() :
        _cnt(0),
        _last_time(0)
      { }
 
      /**
       * This is the main receiver method that we need to call out to
       * a concrete method for the type we're using. It's what we have
       * to do to really get a virtual template class working for us.
       */
      virtual bool recv( const T anItem )
      {
        return onMessage(anItem);
      }
 
      /**
       * This method is called when we get a new datagram, and because
       * we are expecting to instantiate this template class with the
       * type 'T' being a <datagram *>, this is the method we're
       * expecting to get hit. It's just that simple.
       */
      bool onMessage( const datagram *dg ) {
        if (dg == NULL) {
          std::cout << "got a NULL" << std::endl;
        } else {
          std::cout << "got: " << dg->contents() << std::endl;
          _last_time = dkit::util::timer::usecStamp();
          ++_cnt;
        }
        return true;
      }
 
      /**
       * This method will return 'true' if we've received ANY datagrams
       * and if the last one was more than 5 sec ago. That's the timeout
       * for "no more data is coming our way."
       */
      bool allDone()
      {
        using namespace dkit::util;
        static uint32_t __limit = 5 * 1000000L;
        return ((_cnt > 0) && ((timer::usecStamp() - _last_time) > __limit));
      }
 
    private:
      uint32_t    _cnt;
      uint64_t    _last_time;
  };
 
 
  /**
   * This is the main testing app where we'll listen on a specific URL for
   * UDP multicast data, and then process it until there's a timeout. It's
   * going to also use the "shared io_service" capabilities just to make
   * sure that the reference counting in the udp_receiver is working right.
   */
  int main(int argc, char *argv[]) {
    bool  error = false;
 
    /**
     * To wire up as a listener to the udp_receiver, we need to be a
     * subclass of sink<datagram*>… so now that it's made, construct
     * one with it's own io_service.
     */
    MySink<datagram*> dump;
    udp_receiver  rcvr(multicast_channel("udp://239.255.0.1:30001"));
    rcvr.addToListeners(&dump);
    rcvr.listen();
    /**
     * At this point, make a new udp_receiver but share the io_service
     * thread from the one we just made. This will mean that both these
     * sockets are serviced on the same thread. Just a nice way to prove
     * that the reference counting is working.
     */
    udp_receiver  hold;
    hold.shareService(rcvr);
    hold.init();
    /**
     * Now let's stay in this loop as long as we need to...
     */
    while (rcvr.isListening() && !dump.allDone()) {
      sleep(1);
    }
    std::cout << "shutting down due to inactivity..." << std::endl;
    rcvr.shutdown();
 
    std::cout << (error ? "FAILED!" : "SUCCESS") << std::endl;
    return (error ? 1 : 0);
  }

The next thing I need to add to DKit is the adapter which com pines the sink and source in a single template class to be able to do all the steps in between. Once I have that, it's a simple matter of making the exchange codecs exchange adapters and then I'm out the door with a ZeroMQ transmitter that can take datagrams or payloads of some kind, and send them out with a reliable multicast transport.

It's looking pretty good.