Updating the Broker Client to Handle Ping-Pong Protocol

Professor.jpg

This afternoon has been spent upgrading the features of my broker client so that it can handle a greater variety of messages and conditions. The first thing I needed to do was to realize that there are more than the two types of connections that I had originally thought. There is the 'call' and the 'subscribe', but those names (I've come to learn) are a little misleading.

The 'call' is really a one-time pull of a specific answer from the data service on the other side of the broker. This is RPC. It can be thought of as a lot more than that, but that's what it really is. The protocol implements it as a 'call' request to the service, and the service responds with a serialized data structure that the client decodes. No problem.

The 'subscribe' is really a request for an open channel to the service, and for that channel to remain open until it's asked to be closed. In the simplest case, the 'subscription' can be for a data item, like a table of values, that then updates asynchronously at the client. Each time the data on the data service updates, it sends "deltas" to the subscribed clients, and they receive it and update their copy of the same data structure.

But it can be so much more than that. Unfortunately, that's as far as my C++ client took it. What I learned what that it can be a general communication channel with as varied and complex a communication between the client and the data service as the service builder wants. It can be a series of questions and replies, setting of values, just about anything you can imagine, and in the end, a 'close channel' command will close the channel and return the socket to the pool for reuse.

What I needed to do was to add in the ability to carry on this more general communication with the data service. It wasn't too hard - I just had to add the ability to send a message to the data service and not worry about what's returned or when. Then I had to implement a listening mechanism on the variant so that when the data service updates it, something can register to act on those updates.

With that, I then had a way to register for the updates (or errors) from the data service so I'd know when the 'pongs' arrived from the service, and I could now send all the 'pings' I needed. I still needed one more thing: an asynchronous read timeout.

When issuing the first request to a data service, I needed to have some idea of a timeout so that I don't hang there for ever waiting on a dead or locked service. With boost asio, it was a lot easier than I thought. My initial entry point for the async read looked like:

  void BEClient::startAsyncMessageRead()
  {
    bool       error = false;
 
    // first, make sure we have something to do
    if (!error && ((mSocket == NULL) || (!mSocket->is_open()))) {
      error = true;
      cLog.warn("[startAsyncMessageRead] the socket hasn't been connected"
                " to a service");
    }
 
    // if all is well, start the receive process for the "header"
    if (!error) {
      using namespace boost::asio;
      async_read(*mSocket,
                 buffer(mInbound.body(), mInbound.size()),
                 boost::bind(&BEClient::asyncHeaderRead, this,
                             placeholders::error,
                             placeholders::bytes_transferred)
                );
    }
  }

and to add in the async read timeout, I only needed to create a boost::asio::deadline_timer as an ivar with the same boost io_service as the socket itself, add a method to act as the target of the timeout, and then change the method to read:

  void BEClient::startAsyncMessageRead()
  {
    bool       error = false;
 
    // first, make sure we have something to do
    if (!error && ((mSocket == NULL) || (!mSocket->is_open()))) {
      error = true;
      cLog.warn("[startAsyncMessageRead] the socket hasn't been connected"
                " to a service");
    }
 
    // if all is well, start the receive process for the "header"
    if (!error) {
      using namespace boost::asio;
      async_read(*mSocket,
                 buffer(mInbound.body(), mInbound.size()),
                 boost::bind(&BEClient::asyncHeaderRead, this,
                             placeholders::error,
                             placeholders::bytes_transferred)
                );
    }
 
    // if we have a non-zero timeout, the start it now
    if (!error && (aTimeoutInMillis > 0) && (mTimer != NULL)) {
      // set the timeout in millis as a time in the future...
      mTimer->expires_from_now(boost::posix_time::milliseconds(aTimeoutInMillis));
      // if it expires, call the right method
      mTimer->async_wait(boost::bind(&BEClient::asyncReadTimeout, this,
                                     boost::asio::placeholders::error)
                        );
    }
  }

and then to capture the timeout, we only need to have a very simple method:

  void BEClient::asyncReadTimeout( const boost::system::error_code & anError )
  {
    // if we have a target, alert them of the error
    if (mTarget != NULL) {
      mTarget->fireUpdateFalied("asynchronous read timeout occurred");
    }
 
    // let's recycle this socket now... it's all used up
    mBoss->recycle(this);
  }

With this, we can make sure that the client has the ability to specify a timeout (in milliseconds) if they wish it, and if that timeout occurs, the async operation will be cancelled and all will be returned to it's starting place. It's not ideal, but hey... it's a timeout.

With this all in and tested, I could call it a day. What a day.