Late yesterday I realized that I had a nasty problem with the ZeroMQ receiver component in my ticker plant codebase. Basically, if the publisher stopped before the client did, then there would be no clean exit for the client as it'd be stuck in a ZeroMQ zmq::socket_t::recv() call, and there's no timeouts or interruptions from that guy. I hadn't figured this out because ticker plants are almost always ticking, and so it was easy to stop the clients. Also, stopping and starting a publisher is easy, but I'd never planned on the use case of the publisher shutting down and not coming back up, and the client needing to quit. Well... my mistake, to be sure.
Now we have to fix it.
The idea I came up with was nothing special - have every new zmq::socket_t bind to a unique inproc: URL so that there exists a "back door" to send it messages - quickly. I can then continue on as normal, but in my shutdown() code I can tell the thread to stop, and on the off-chance that the thread is in zmq::socket_t::recv(), I'll connect to that same inproc: URL and send it a zero-length message. This will cause it to break out of the recv() call, and then it'll detect that it's supposed to shutdown.
The code is pretty simple, but even there, it took a few times to get all the pieces of the inproc: connections working. In my initialization code I now have:
// nope... so let's make a new socket for this guy
mSocket = new zmq::socket_t(cContext, ZMQ_SUB);
if (mSocket == NULL) {
error = true;
cLog.error("[initialize] couldn't create the ZMQ socket!");
} else {
// ...and subscribe to all the messages
mSocket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
// we need to set this guy up properly
mSocket->setsockopt(ZMQ_SNDBUF, &__sndbuf, sizeof(__sndbuf));
mSocket->setsockopt(ZMQ_RCVBUF, &__rcvbuf, sizeof(__rcvbuf));
mSocket->setsockopt(ZMQ_RATE, &__rate, sizeof(__rate));
mSocket->setsockopt(ZMQ_RECOVERY_IVL_MSEC, &__rec, sizeof(__rec));
// let's get a new, unique instance number for this socket
uint32_t inst = __sync_fetch_and_add(&cInstance, 1);
char name[80];
snprintf(name, 79, "inproc://sock_%d", inst);
mBailOutURL = name;
// now let's
mSocket->bind(name);
}
after I set the socket options with the calls to setsockopt(), I then atomically increment a class ivar that holds the next socket instance number. This, put on the end of the URL makes it unique, and then I save this so that my shutdown() code knows who to talk to.
Finally, I bind() the socket to this inproc: URL. This is important because in the case of OpenPGM/ZeroMQ, it's a callt o connect() regardless if you're a sender or receiver. Here, with the inproc: transport, it matters. The receiver has to bind() and the transmitter, as we shall see, has to connect(). OK. Got that cleared up.
In my shutdown() code, I now have something like this:
void ZMQReceiver::shutdown()
{
// first, see if we're still running, and shut it down
if (isRunning()) {
// first, tell the thread it's time to stop processing
setTimeToDie(true);
// next, make the ZMQ socket for the inproc: transport
zmq::socket_t bail(cContext, ZMQ_PUB);
// ...and set him up properly
static int64_t __rate = 500000;
static int64_t __rec = 100;
bail.setsockopt(ZMQ_RATE, &__rate, sizeof(__rate));
bail.setsockopt(ZMQ_RECOVERY_IVL_MSEC, &__rec, sizeof(__rec));
// connect to the right inproc: URL
bail.connect(mBailOutURL.c_str());
// make a ZMQ message to send this guy
zmq::message_t msg(0);
// ...and WHOOSH! out it goes
bail.send(msg);
// finally, wait for the thread to stop
join();
}
// next, clear out all the channels and shut down the socket
clearChannels();
// ...but make sure the socket is really dropped
if (mSocket != NULL) {
delete mSocket;
mSocket = NULL;
}
// let's drop the conflation queue and all it holds
if (mMessageQueue != NULL) {
delete mMessageQueue;
mMessageQueue = NULL;
}
// ...and clear out the commands to the socket
mCommandQueue.clear();
}
It's a little complex, but it's really worth it. We have a way to use the blocking recv() for efficiency, but we also have a way to shut it down. That's pretty nice.