My most recent project has involved a few common 0MQ patterns like request-reply, forwarding, and topic subscription. I was using each of these enough to move them out to a shared library that could be improved as I encountered bugs and new, albiet small, improvements needed.
First was reliably getting a socket. For some reason, I have yet to investigate, the legacy application I’m building on seems to throw a number of interrupts on startup so I had to create a common retry method when creating sockets.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| void *GetZMQSocketWithRetries(void *context, const char *transport, int socketOptions, bool bind, log4c_category_t *cat); |
The user provides the root 0MQ context, desired transport, any options, if a bind or connect should be used, and finally a logging category.
The second was subscribing to a transport and doing something whenever a specific topic (first part of a multi-part message) was received.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| typedef void (* BYTE_ARRAY_CALLBACK)(uint8_t *data, int size); | |
| typedef struct _SubscribeZeroMQDataThreadStruct | |
| { | |
| void *context; | |
| const char *transport; | |
| const char *topic; | |
| BYTE_ARRAY_CALLBACK callback; | |
| const char *logCategory; | |
| bool running; | |
| pthread_t *thread; | |
| } SubscribeZeroMQDataThreadArgs; | |
| SubscribeZeroMQDataThreadArgs *StartSubscribe(void *zmqContext, const char *zmqTransport, const char *topic, BYTE_ARRAY_CALLBACK callback, const char *logCategory); | |
| void StopSubscribe(SubscribeZeroMQDataThreadArgs *args); |
It follows the same pattern; provide the context, transport, desired topic, and logging category along with a callback that will receive the payload or second part of the multi-part message.
The above subscribe pattern was modified into a rough request-reply pattern allowing the user not only receive both the topic and data in a callback but also reply directly to the requester with another payload.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| typedef struct _RequestReplyZeroMQReply | |
| { | |
| void *data; | |
| int size; | |
| } RequestReplyZeroMQReply; | |
| typedef RequestReplyZeroMQReply* (*STRING_CALLBACK)(const char *request, const void *argument, const uint8_t *data, const int size); | |
| typedef struct _RequestReplyZeroMQThreadStruct | |
| { | |
| void *context; | |
| bool bind; | |
| bool running; | |
| const char *transport; | |
| const char *logCategory; | |
| const void *argument; | |
| STRING_CALLBACK callback; | |
| pthread_t *thread; | |
| } RequestReplyZeroMQThreadArgs; | |
| RequestReplyZeroMQThreadArgs *StartRequestReply(void *zmqContext, const char *zmqTransport, STRING_CALLBACK callback, void *callbackArg, const char *logCategory); | |
| void StopRequestReply(RequestReplyZeroMQThreadArgs *args); |
Lastly, was reliably forwarding messages from any number of transports to another transport.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| typedef struct _ForwardZeroMQDataThreadStruct | |
| { | |
| void *context; | |
| int numSubs; | |
| const char *subTransports[10]; | |
| const char *pubTransport; | |
| const char *logCategory; | |
| bool running; | |
| pthread_t *thread; | |
| } ForwardZeroMQDataThreadArgs; | |
| ForwardZeroMQDataThreadArgs *StartForward(void *zmqContext, const char *pubTransport, const char *logCategory, int numSubTransports, …); | |
| void StopForward(ForwardZeroMQDataThreadArgs *args); |
Below are some recent cuts of the code…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| void *GetZMQSocketWithRetries(void *context, const char *transport, int socketOptions, bool bind, log4c_category_t *cat) | |
| { | |
| void *socket = zmq_socket(context, socketOptions); | |
| if(socket == NULL) | |
| { | |
| log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to create socket for transport [%s] %s", transport, zmq_strerror(errno)); | |
| return NULL; | |
| } | |
| log4c_category_log(cat, LOG4C_PRIORITY_INFO, "created socket for transport [%s]", transport); | |
| int rc; | |
| int retryCount = 0; | |
| do{ | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "attempted to connect to [%s] for the %d time", transport, retryCount); | |
| if(!bind) | |
| rc = zmq_connect(socket, transport); | |
| else | |
| rc = zmq_bind(socket, transport); | |
| retryCount++; | |
| if(rc != 0) | |
| log4c_category_log(cat, LOG4C_PRIORITY_WARN, "unable to connect to [%s] during %d attempt: %s", transport, retryCount, zmq_strerror(errno)); | |
| }while(rc != 0 && retryCount < 5); | |
| if(rc != 0) | |
| { | |
| log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to connect to [%s] after five retries", transport); | |
| return NULL; | |
| } | |
| log4c_category_log(cat, LOG4C_PRIORITY_INFO, "connected socket for transport [%s]", transport); | |
| return socket; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| static void *ZMQSubscribe(void *args) | |
| { | |
| SubscribeZeroMQDataThreadArgs *typedArgs = args; | |
| log4c_category_t *cat = log4c_category_get(typedArgs->logCategory); | |
| void *subscriber = GetZMQSocketWithRetries(typedArgs->context, typedArgs->transport, ZMQ_SUB, false, cat); | |
| if(subscriber == NULL) | |
| return NULL; | |
| int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, typedArgs->topic, strlen(typedArgs->topic)); | |
| if(rc != 0) | |
| { | |
| log4c_category_log(cat, LOG4C_PRIORITY_ERROR, "unable to subscribe to topic [%s] on transport [%s] %s", typedArgs->topic, typedArgs->transport, zmq_strerror(errno)); | |
| return NULL; | |
| } | |
| log4c_category_log(cat, LOG4C_PRIORITY_INFO, "subscribed to topic [%s] on transport [%s]", typedArgs->topic, typedArgs->transport); | |
| uint8_t buf[1024]; | |
| char topic[1024]; | |
| while(typedArgs->running) | |
| { | |
| int size = zmq_recv(subscriber, topic, sizeof(topic), ZMQ_NOBLOCK); | |
| if(size == -1) | |
| { | |
| if(errno != EAGAIN && errno != EINTR) | |
| log4c_category_log(cat, LOG4C_PRIORITY_WARN, "couldn't receive topic from transport [%s] %s", typedArgs->transport, zmq_strerror(errno)); | |
| //no messages available, sleep and go on | |
| usleep(500000); | |
| } | |
| else | |
| { | |
| topic[size] = '\0'; | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a topic of size %d [%s] from transport [%s]", size, topic, typedArgs->transport); | |
| size = zmq_recv(subscriber, buf, 1024, 0); | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received data of size %d from transport [%s]", size, typedArgs->transport); | |
| typedArgs->callback(buf, size); | |
| } | |
| } | |
| return NULL; | |
| } | |
| SubscribeZeroMQDataThreadArgs *StartSubscribe(void *zmqContext, const char *zmqTransport, const char *topic, BYTE_ARRAY_CALLBACK callback, const char *logCategory) | |
| { | |
| SubscribeZeroMQDataThreadArgs *threadArgs = malloc(sizeof(SubscribeZeroMQDataThreadArgs)); | |
| threadArgs->callback = callback; | |
| threadArgs->context = zmqContext; | |
| threadArgs->running = true; | |
| char *topicCopy = malloc(1 + strlen(topic)); | |
| strcpy(topicCopy, topic); | |
| threadArgs->topic = topicCopy; | |
| char *logCategoryCopy = malloc(1 + strlen(logCategory)); | |
| strcpy(logCategoryCopy, logCategory); | |
| threadArgs->logCategory = logCategoryCopy; | |
| char *transportCopy = malloc(1 + strlen(zmqTransport)); | |
| strcpy(transportCopy, zmqTransport); | |
| threadArgs->transport = transportCopy; | |
| threadArgs->thread = malloc(sizeof(pthread_t)); | |
| pthread_create(threadArgs->thread, NULL, ZMQSubscribe, threadArgs); | |
| return threadArgs; | |
| } | |
| void StopSubscribe(SubscribeZeroMQDataThreadArgs *args){ | |
| args->running = false; | |
| pthread_join(*(args->thread), NULL); | |
| free(args->thread); | |
| free((char*)args->logCategory); | |
| free((char*)args->transport); | |
| free((char*)args->topic); | |
| free(args); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| static void *ZMQRequestReply(void *args) | |
| { | |
| RequestReplyZeroMQThreadArgs *typedArgs = args; | |
| log4c_category_t *cat = log4c_category_get(typedArgs->logCategory); | |
| log4c_category_log(cat, LOG4C_PRIORITY_INFO, "starting request reply thread"); | |
| void *server = GetZMQSocketWithRetries(typedArgs->context, typedArgs->transport, ZMQ_REP, true, cat); | |
| if(server == NULL) | |
| return NULL; | |
| uint8_t buf[4096]; | |
| char request[4096]; | |
| while(typedArgs->running) { | |
| int size = zmq_recv(server, request, 4096, ZMQ_NOBLOCK); | |
| if(size == -1) | |
| { | |
| if(errno != EAGAIN && errno != EINTR) | |
| log4c_category_log(cat, LOG4C_PRIORITY_WARN, "couldn't receive request from transport [%s] %s", typedArgs->transport, zmq_strerror(errno)); | |
| //no messages available, sleep and go on | |
| usleep(500000); | |
| } | |
| else | |
| { | |
| request[size] = '\0'; | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a request of size %d [%s] from transport [%s]", size, request, typedArgs->transport); | |
| size = zmq_recv(server, buf, 4096, 0); | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "received a request payload of size %d from transport [%s]", size, typedArgs->transport); | |
| RequestReplyZeroMQReply *toReturn = typedArgs->callback(request, typedArgs->argument, buf, size); | |
| zmq_send(server, toReturn->data, toReturn->size, 0); | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "sent reply"); | |
| free(toReturn->data); | |
| free(toReturn); | |
| } | |
| } | |
| log4c_category_log(cat, LOG4C_PRIORITY_DEBUG, "exiting request reply thread for %s", typedArgs->transport); | |
| zmq_close(server); | |
| return NULL; | |
| } | |
| RequestReplyZeroMQThreadArgs *StartRequestReply(void *zmqContext, const char *zmqTransport, STRING_CALLBACK callback, void *callbackArg, const char *logCategory) | |
| { | |
| RequestReplyZeroMQThreadArgs *threadArgs = malloc(sizeof(RequestReplyZeroMQThreadArgs)); | |
| threadArgs->bind = true; | |
| threadArgs->callback = callback; | |
| threadArgs->context = zmqContext; | |
| threadArgs->running = true; | |
| threadArgs->argument = callbackArg; | |
| char *logCategoryCopy = malloc(1 + strlen(logCategory)); | |
| strcpy(logCategoryCopy, logCategory); | |
| threadArgs->logCategory = logCategoryCopy; | |
| char *transportCopy = malloc(1 + strlen(zmqTransport)); | |
| strcpy(transportCopy, zmqTransport); | |
| threadArgs->transport = transportCopy; | |
| threadArgs->thread = malloc(sizeof(pthread_t)); | |
| pthread_create(threadArgs->thread, NULL, ZMQRequestReply, threadArgs); | |
| return threadArgs; | |
| } | |
| void StopRequestReply(RequestReplyZeroMQThreadArgs *args) | |
| { | |
| args->running = false; | |
| pthread_join(*(args->thread), NULL); | |
| free(args->thread); | |
| free((char*)args->logCategory); | |
| free((char*)args->transport); | |
| free(args); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| static void *ZMQForward(void *args) | |
| { | |
| ForwardZeroMQDataThreadArgs *typedArgs = args; | |
| int rc, i, size; | |
| zmq_pollitem_t items[typedArgs->numSubs]; | |
| log4c_category_t *mainLog = log4c_category_get(typedArgs->logCategory); | |
| void *outgoingSocket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->pubTransport, ZMQ_PUB, true, mainLog); | |
| if(outgoingSocket == NULL) | |
| return NULL; | |
| for(i=0;i<(typedArgs->numSubs);i++) | |
| { | |
| items[i].socket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->subTransports[i], ZMQ_SUB, false, mainLog); | |
| items[i].events = ZMQ_POLLIN; | |
| items[i].fd = 0; | |
| items[i].revents = 0; | |
| if(items[i].socket == NULL) | |
| { | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to create socket %s", zmq_strerror(errno)); | |
| } | |
| else | |
| { | |
| rc = zmq_setsockopt(items[i].socket, ZMQ_SUBSCRIBE, NULL, 0); | |
| if(rc != 0) | |
| { | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to subscribe to all messages on transport [%s]: %s", typedArgs->subTransports[i] , zmq_strerror(errno)); | |
| } | |
| else | |
| { | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_INFO, "subscribed to all messages on transport to [%s]", typedArgs->subTransports[i]); | |
| } | |
| } | |
| } | |
| while(typedArgs->running) | |
| { | |
| uint8_t buf[1024]; | |
| char topic[1024]; | |
| rc = zmq_poll(items, typedArgs->numSubs, -1); | |
| if(rc == ETERM || rc == EFAULT) | |
| { | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't poll all sockets: %s", zmq_strerror(errno)); | |
| break; | |
| } | |
| else if(rc == EINTR) | |
| { | |
| //Interrupted, no biggie | |
| } | |
| else | |
| { | |
| for(i=0;i<typedArgs->numSubs;i++) | |
| { | |
| if(items[i].revents & ZMQ_POLLIN) | |
| { | |
| size = zmq_recv(items[i].socket, topic, sizeof(topic), 0); | |
| if(size > -1) | |
| { | |
| topic[size] = '\0'; | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received topic of size %d [%s] from transport [%s]", size, topic, typedArgs->subTransports[i]); | |
| size = zmq_send(outgoingSocket, topic, size, ZMQ_SNDMORE); | |
| if(size > -1) | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded topic of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport); | |
| else | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding topic of size on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno)); | |
| size = zmq_recv(items[i].socket, buf, sizeof(buf), 0); | |
| if(size > -1) | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received data of size %d from transport [%s]", size, typedArgs->subTransports[i]); | |
| else | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error receiving data on transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno)); | |
| size = zmq_send(outgoingSocket, buf, size, 0); | |
| if (size > -1) | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded data of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport); | |
| else | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding data on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno)); | |
| } | |
| else | |
| { | |
| log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't receive topic from transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno)); | |
| } | |
| } | |
| } | |
| } | |
| usleep(500000); | |
| } | |
| return NULL; | |
| } | |
| ForwardZeroMQDataThreadArgs *StartForward(void *zmqContext, const char *pubTransport, const char *logCategory, int numSubTransports, …) | |
| { | |
| ForwardZeroMQDataThreadArgs *threadArgs = malloc(sizeof(ForwardZeroMQDataThreadArgs)); | |
| threadArgs->context = zmqContext; | |
| threadArgs->running = true; | |
| char *logCategoryCopy = malloc(1 + strlen(logCategory)); | |
| strcpy(logCategoryCopy, logCategory); | |
| threadArgs->logCategory = logCategoryCopy; | |
| char *pubTransportCopy = malloc(1 + strlen(pubTransport)); | |
| strcpy(pubTransportCopy, pubTransport); | |
| threadArgs->pubTransport = pubTransportCopy; | |
| threadArgs->numSubs = numSubTransports; | |
| int i; | |
| va_list subArgs; | |
| va_start(subArgs, numSubTransports); | |
| char *subTransport; | |
| for(i=0;i<numSubTransports;i++) | |
| { | |
| subTransport = va_arg(subArgs, char*); | |
| char *subTransportCopy = malloc(1 + strlen(subTransport)); | |
| strcpy(subTransportCopy, subTransport); | |
| threadArgs->subTransports[i] = subTransportCopy; | |
| } | |
| va_end(subArgs); | |
| threadArgs->thread = malloc(sizeof(pthread_t)); | |
| pthread_create(threadArgs->thread, NULL, ZMQForward, threadArgs); | |
| return threadArgs; | |
| } | |
| void StopForward(ForwardZeroMQDataThreadArgs *args) | |
| { | |
| args->running = false; | |
| pthread_join(*(args->thread), NULL); | |
| free(args->thread); | |
| free((char*)args->logCategory); | |
| free((char*)args->pubTransport); | |
| free(args); | |
| } |
[…] Now it’s really easy to throw data around using the 0MQ helpers written up before! […]