Reusable ZeroMQ C Helpers

My most recent project has involved a few common 0MQ patterns like request-reply, forwarding, and topic subscription. I was using each of these enough to move them out to a shared library that could be improved as I encountered bugs and new, albiet small, improvements needed.

First was reliably getting a socket. For some reason, I have yet to investigate, the legacy application I’m building on seems to throw a number of interrupts on startup so I had to create a common retry method when creating sockets.


void *GetZMQSocketWithRetries(void *context, const char *transport, int socketOptions, bool bind, log4c_category_t *cat);

view raw

ZMQHelpers.h

hosted with ❤ by GitHub

The user provides the root 0MQ context, desired transport, any options, if a bind or connect should be used, and finally a logging category.

The second was subscribing to a transport and doing something whenever a specific topic (first part of a multi-part message) was received.


typedef void (* BYTE_ARRAY_CALLBACK)(uint8_t *data, int size);
typedef struct _SubscribeZeroMQDataThreadStruct
{
void *context;
const char *transport;
const char *topic;
BYTE_ARRAY_CALLBACK callback;
const char *logCategory;
bool running;
pthread_t *thread;
} SubscribeZeroMQDataThreadArgs;
SubscribeZeroMQDataThreadArgs *StartSubscribe(void *zmqContext, const char *zmqTransport, const char *topic, BYTE_ARRAY_CALLBACK callback, const char *logCategory);
void StopSubscribe(SubscribeZeroMQDataThreadArgs *args);

view raw

Subscribe.h

hosted with ❤ by GitHub

It follows the same pattern; provide the context, transport, desired topic, and logging category along with a callback that will receive the payload or second part of the multi-part message.

The above subscribe pattern was modified into a rough request-reply pattern allowing the user not only receive both the topic and data in a callback but also reply directly to the requester with another payload.


typedef struct _RequestReplyZeroMQReply
{
void *data;
int size;
} RequestReplyZeroMQReply;
typedef RequestReplyZeroMQReply* (*STRING_CALLBACK)(const char *request, const void *argument, const uint8_t *data, const int size);
typedef struct _RequestReplyZeroMQThreadStruct
{
void *context;
bool bind;
bool running;
const char *transport;
const char *logCategory;
const void *argument;
STRING_CALLBACK callback;
pthread_t *thread;
} RequestReplyZeroMQThreadArgs;
RequestReplyZeroMQThreadArgs *StartRequestReply(void *zmqContext, const char *zmqTransport, STRING_CALLBACK callback, void *callbackArg, const char *logCategory);
void StopRequestReply(RequestReplyZeroMQThreadArgs *args);

view raw

RequestReply.h

hosted with ❤ by GitHub

Lastly, was reliably forwarding messages from any number of transports to another transport.


typedef struct _ForwardZeroMQDataThreadStruct
{
void *context;
int numSubs;
const char *subTransports[10];
const char *pubTransport;
const char *logCategory;
bool running;
pthread_t *thread;
} ForwardZeroMQDataThreadArgs;
ForwardZeroMQDataThreadArgs *StartForward(void *zmqContext, const char *pubTransport, const char *logCategory, int numSubTransports, …);
void StopForward(ForwardZeroMQDataThreadArgs *args);

view raw

Forward.h

hosted with ❤ by GitHub

Below are some recent cuts of the code…


void *GetZMQSocketWithRetries(void *context, const char *transport, int socketOptions, bool bind, log4c_category_t *cat)
{
void *socket = zmq_socket(context, socketOptions);
if(socket == NULL)
{
log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to create socket for transport [%s] %s", transport, zmq_strerror(errno));
return NULL;
}
log4c_category_log(cat, LOG4C_PRIORITY_INFO, "created socket for transport [%s]", transport);
int rc;
int retryCount = 0;
do{
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "attempted to connect to [%s] for the %d time", transport, retryCount);
if(!bind)
rc = zmq_connect(socket, transport);
else
rc = zmq_bind(socket, transport);
retryCount++;
if(rc != 0)
log4c_category_log(cat, LOG4C_PRIORITY_WARN, "unable to connect to [%s] during %d attempt: %s", transport, retryCount, zmq_strerror(errno));
}while(rc != 0 && retryCount < 5);
if(rc != 0)
{
log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to connect to [%s] after five retries", transport);
return NULL;
}
log4c_category_log(cat, LOG4C_PRIORITY_INFO, "connected socket for transport [%s]", transport);
return socket;
}

view raw

ZMQHelpers.c

hosted with ❤ by GitHub


static void *ZMQSubscribe(void *args)
{
SubscribeZeroMQDataThreadArgs *typedArgs = args;
log4c_category_t *cat = log4c_category_get(typedArgs->logCategory);
void *subscriber = GetZMQSocketWithRetries(typedArgs->context, typedArgs->transport, ZMQ_SUB, false, cat);
if(subscriber == NULL)
return NULL;
int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, typedArgs->topic, strlen(typedArgs->topic));
if(rc != 0)
{
log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to subscribe to topic [%s] on transport [%s] %s", typedArgs->topic, typedArgs->transport, zmq_strerror(errno));
return NULL;
}
log4c_category_log(cat, LOG4C_PRIORITY_INFO, "subscribed to topic [%s] on transport [%s]", typedArgs->topic, typedArgs->transport);
uint8_t buf[1024];
char topic[1024];
while(typedArgs->running)
{
int size = zmq_recv(subscriber, topic, sizeof(topic), ZMQ_NOBLOCK);
if(size == -1)
{
if(errno != EAGAIN && errno != EINTR)
log4c_category_log(cat, LOG4C_PRIORITY_WARN, "couldn't receive topic from transport [%s] %s", typedArgs->transport, zmq_strerror(errno));
//no messages available, sleep and go on
usleep(500000);
}
else
{
topic[size] = '\0';
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a topic of size %d [%s] from transport [%s]", size, topic, typedArgs->transport);
size = zmq_recv(subscriber, buf, 1024, 0);
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received data of size %d from transport [%s]", size, typedArgs->transport);
typedArgs->callback(buf, size);
}
}
return NULL;
}
SubscribeZeroMQDataThreadArgs *StartSubscribe(void *zmqContext, const char *zmqTransport, const char *topic, BYTE_ARRAY_CALLBACK callback, const char *logCategory)
{
SubscribeZeroMQDataThreadArgs *threadArgs = malloc(sizeof(SubscribeZeroMQDataThreadArgs));
threadArgs->callback = callback;
threadArgs->context = zmqContext;
threadArgs->running = true;
char *topicCopy = malloc(1 + strlen(topic));
strcpy(topicCopy, topic);
threadArgs->topic = topicCopy;
char *logCategoryCopy = malloc(1 + strlen(logCategory));
strcpy(logCategoryCopy, logCategory);
threadArgs->logCategory = logCategoryCopy;
char *transportCopy = malloc(1 + strlen(zmqTransport));
strcpy(transportCopy, zmqTransport);
threadArgs->transport = transportCopy;
threadArgs->thread = malloc(sizeof(pthread_t));
pthread_create(threadArgs->thread, NULL, ZMQSubscribe, threadArgs);
return threadArgs;
}
void StopSubscribe(SubscribeZeroMQDataThreadArgs *args){
args->running = false;
pthread_join(*(args->thread), NULL);
free(args->thread);
free((char*)args->logCategory);
free((char*)args->transport);
free((char*)args->topic);
free(args);
}

view raw

Subscribe.c

hosted with ❤ by GitHub


static void *ZMQRequestReply(void *args)
{
RequestReplyZeroMQThreadArgs *typedArgs = args;
log4c_category_t *cat = log4c_category_get(typedArgs->logCategory);
log4c_category_log(cat, LOG4C_PRIORITY_INFO, "starting request reply thread");
void *server = GetZMQSocketWithRetries(typedArgs->context, typedArgs->transport, ZMQ_REP, true, cat);
if(server == NULL)
return NULL;
uint8_t buf[4096];
char request[4096];
while(typedArgs->running) {
int size = zmq_recv(server, request, 4096, ZMQ_NOBLOCK);
if(size == -1)
{
if(errno != EAGAIN && errno != EINTR)
log4c_category_log(cat, LOG4C_PRIORITY_WARN, "couldn't receive request from transport [%s] %s", typedArgs->transport, zmq_strerror(errno));
//no messages available, sleep and go on
usleep(500000);
}
else
{
request[size] = '\0';
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a request of size %d [%s] from transport [%s]", size, request, typedArgs->transport);
size = zmq_recv(server, buf, 4096, 0);
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a request payload of size %d from transport [%s]", size, typedArgs->transport);
RequestReplyZeroMQReply *toReturn = typedArgs->callback(request, typedArgs->argument, buf, size);
zmq_send(server, toReturn->data, toReturn->size, 0);
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "sent reply");
free(toReturn->data);
free(toReturn);
}
}
log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "exiting request reply thread for %s", typedArgs->transport);
zmq_close(server);
return NULL;
}
RequestReplyZeroMQThreadArgs *StartRequestReply(void *zmqContext, const char *zmqTransport, STRING_CALLBACK callback, void *callbackArg, const char *logCategory)
{
RequestReplyZeroMQThreadArgs *threadArgs = malloc(sizeof(RequestReplyZeroMQThreadArgs));
threadArgs->bind = true;
threadArgs->callback = callback;
threadArgs->context = zmqContext;
threadArgs->running = true;
threadArgs->argument = callbackArg;
char *logCategoryCopy = malloc(1 + strlen(logCategory));
strcpy(logCategoryCopy, logCategory);
threadArgs->logCategory = logCategoryCopy;
char *transportCopy = malloc(1 + strlen(zmqTransport));
strcpy(transportCopy, zmqTransport);
threadArgs->transport = transportCopy;
threadArgs->thread = malloc(sizeof(pthread_t));
pthread_create(threadArgs->thread, NULL, ZMQRequestReply, threadArgs);
return threadArgs;
}
void StopRequestReply(RequestReplyZeroMQThreadArgs *args)
{
args->running = false;
pthread_join(*(args->thread), NULL);
free(args->thread);
free((char*)args->logCategory);
free((char*)args->transport);
free(args);
}

view raw

RequestReply.c

hosted with ❤ by GitHub


static void *ZMQForward(void *args)
{
ForwardZeroMQDataThreadArgs *typedArgs = args;
int rc, i, size;
zmq_pollitem_t items[typedArgs->numSubs];
log4c_category_t *mainLog = log4c_category_get(typedArgs->logCategory);
void *outgoingSocket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->pubTransport, ZMQ_PUB, true, mainLog);
if(outgoingSocket == NULL)
return NULL;
for(i=0;i<(typedArgs->numSubs);i++)
{
items[i].socket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->subTransports[i], ZMQ_SUB, false, mainLog);
items[i].events = ZMQ_POLLIN;
items[i].fd = 0;
items[i].revents = 0;
if(items[i].socket == NULL)
{
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to create socket %s", zmq_strerror(errno));
}
else
{
rc = zmq_setsockopt(items[i].socket, ZMQ_SUBSCRIBE, NULL, 0);
if(rc != 0)
{
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to subscribe to all messages on transport [%s]: %s", typedArgs->subTransports[i] , zmq_strerror(errno));
}
else
{
log4c_category_log(mainLog, LOG4C_PRIORITY_INFO, "subscribed to all messages on transport to [%s]", typedArgs->subTransports[i]);
}
}
}
while(typedArgs->running)
{
uint8_t buf[1024];
char topic[1024];
rc = zmq_poll(items, typedArgs->numSubs, -1);
if(rc == ETERM || rc == EFAULT)
{
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't poll all sockets: %s", zmq_strerror(errno));
break;
}
else if(rc == EINTR)
{
//Interrupted, no biggie
}
else
{
for(i=0;i<typedArgs->numSubs;i++)
{
if(items[i].revents & ZMQ_POLLIN)
{
size = zmq_recv(items[i].socket, topic, sizeof(topic), 0);
if(size > -1)
{
topic[size] = '\0';
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received topic of size %d [%s] from transport [%s]", size, topic, typedArgs->subTransports[i]);
size = zmq_send(outgoingSocket, topic, size, ZMQ_SNDMORE);
if(size > -1)
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded topic of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport);
else
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding topic of size on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno));
size = zmq_recv(items[i].socket, buf, sizeof(buf), 0);
if(size > -1)
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received data of size %d from transport [%s]", size, typedArgs->subTransports[i]);
else
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error receiving data on transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno));
size = zmq_send(outgoingSocket, buf, size, 0);
if (size > -1)
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded data of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport);
else
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding data on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno));
}
else
{
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't receive topic from transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno));
}
}
}
}
usleep(500000);
}
return NULL;
}
ForwardZeroMQDataThreadArgs *StartForward(void *zmqContext, const char *pubTransport, const char *logCategory, int numSubTransports, …)
{
ForwardZeroMQDataThreadArgs *threadArgs = malloc(sizeof(ForwardZeroMQDataThreadArgs));
threadArgs->context = zmqContext;
threadArgs->running = true;
char *logCategoryCopy = malloc(1 + strlen(logCategory));
strcpy(logCategoryCopy, logCategory);
threadArgs->logCategory = logCategoryCopy;
char *pubTransportCopy = malloc(1 + strlen(pubTransport));
strcpy(pubTransportCopy, pubTransport);
threadArgs->pubTransport = pubTransportCopy;
threadArgs->numSubs = numSubTransports;
int i;
va_list subArgs;
va_start(subArgs, numSubTransports);
char *subTransport;
for(i=0;i<numSubTransports;i++)
{
subTransport = va_arg(subArgs, char*);
char *subTransportCopy = malloc(1 + strlen(subTransport));
strcpy(subTransportCopy, subTransport);
threadArgs->subTransports[i] = subTransportCopy;
}
va_end(subArgs);
threadArgs->thread = malloc(sizeof(pthread_t));
pthread_create(threadArgs->thread, NULL, ZMQForward, threadArgs);
return threadArgs;
}
void StopForward(ForwardZeroMQDataThreadArgs *args)
{
args->running = false;
pthread_join(*(args->thread), NULL);
free(args->thread);
free((char*)args->logCategory);
free((char*)args->pubTransport);
free(args);
}

view raw

Forward.c

hosted with ❤ by GitHub

Tagged with: , , ,
Posted in C, Messaging, ZeroMQ
One comment on “Reusable ZeroMQ C Helpers
  1. […] Now it’s really easy to throw data around using the 0MQ helpers written up before! […]

Leave a comment

Is this your new site? Log in to activate admin features and dismiss this message
Log In