|
static void *ZMQForward(void *args) |
|
{ |
|
ForwardZeroMQDataThreadArgs *typedArgs = args; |
|
int rc, i, size; |
|
zmq_pollitem_t items[typedArgs->numSubs]; |
|
log4c_category_t *mainLog = log4c_category_get(typedArgs->logCategory); |
|
|
|
void *outgoingSocket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->pubTransport, ZMQ_PUB, true, mainLog); |
|
|
|
if(outgoingSocket == NULL) |
|
return NULL; |
|
|
|
for(i=0;i<(typedArgs->numSubs);i++) |
|
{ |
|
items[i].socket = GetZMQSocketWithRetries(typedArgs->context, typedArgs->subTransports[i], ZMQ_SUB, false, mainLog); |
|
items[i].events = ZMQ_POLLIN; |
|
items[i].fd = 0; |
|
items[i].revents = 0; |
|
|
|
if(items[i].socket == NULL) |
|
{ |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to create socket %s", zmq_strerror(errno)); |
|
} |
|
else |
|
{ |
|
rc = zmq_setsockopt(items[i].socket, ZMQ_SUBSCRIBE, NULL, 0); |
|
if(rc != 0) |
|
{ |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "unable to subscribe to all messages on transport [%s]: %s", typedArgs->subTransports[i] , zmq_strerror(errno)); |
|
} |
|
else |
|
{ |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_INFO, "subscribed to all messages on transport to [%s]", typedArgs->subTransports[i]); |
|
} |
|
} |
|
} |
|
|
|
while(typedArgs->running) |
|
{ |
|
uint8_t buf[1024]; |
|
char topic[1024]; |
|
rc = zmq_poll(items, typedArgs->numSubs, -1); |
|
if(rc == ETERM || rc == EFAULT) |
|
{ |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't poll all sockets: %s", zmq_strerror(errno)); |
|
break; |
|
} |
|
else if(rc == EINTR) |
|
{ |
|
//Interrupted, no biggie |
|
} |
|
else |
|
{ |
|
for(i=0;i<typedArgs->numSubs;i++) |
|
{ |
|
if(items[i].revents & ZMQ_POLLIN) |
|
{ |
|
size = zmq_recv(items[i].socket, topic, sizeof(topic), 0); |
|
if(size > -1) |
|
{ |
|
topic[size] = '\0'; |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received topic of size %d [%s] from transport [%s]", size, topic, typedArgs->subTransports[i]); |
|
size = zmq_send(outgoingSocket, topic, size, ZMQ_SNDMORE); |
|
if(size > -1) |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded topic of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport); |
|
else |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding topic of size on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno)); |
|
size = zmq_recv(items[i].socket, buf, sizeof(buf), 0); |
|
if(size > -1) |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "received data of size %d from transport [%s]", size, typedArgs->subTransports[i]); |
|
else |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error receiving data on transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno)); |
|
size = zmq_send(outgoingSocket, buf, size, 0); |
|
if (size > -1) |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_DEBUG, "forwarded data of size %d from transport [%s] to [%s]", size, typedArgs->subTransports[i], typedArgs->pubTransport); |
|
else |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "error forwarding data on transport [%s] %s", typedArgs->pubTransport, zmq_strerror(errno)); |
|
} |
|
else |
|
{ |
|
log4c_category_log(mainLog, LOG4C_PRIORITY_ERROR, "couldn't receive topic from transport [%s] %s", typedArgs->subTransports[i], zmq_strerror(errno)); |
|
} |
|
} |
|
} |
|
} |
|
usleep(500000); |
|
} |
|
return NULL; |
|
} |
|
|
|
ForwardZeroMQDataThreadArgs *StartForward(void *zmqContext, const char *pubTransport, const char *logCategory, int numSubTransports, …) |
|
{ |
|
ForwardZeroMQDataThreadArgs *threadArgs = malloc(sizeof(ForwardZeroMQDataThreadArgs)); |
|
threadArgs->context = zmqContext; |
|
threadArgs->running = true; |
|
|
|
char *logCategoryCopy = malloc(1 + strlen(logCategory)); |
|
strcpy(logCategoryCopy, logCategory); |
|
threadArgs->logCategory = logCategoryCopy; |
|
|
|
char *pubTransportCopy = malloc(1 + strlen(pubTransport)); |
|
strcpy(pubTransportCopy, pubTransport); |
|
threadArgs->pubTransport = pubTransportCopy; |
|
|
|
threadArgs->numSubs = numSubTransports; |
|
int i; |
|
va_list subArgs; |
|
va_start(subArgs, numSubTransports); |
|
char *subTransport; |
|
for(i=0;i<numSubTransports;i++) |
|
{ |
|
subTransport = va_arg(subArgs, char*); |
|
char *subTransportCopy = malloc(1 + strlen(subTransport)); |
|
strcpy(subTransportCopy, subTransport); |
|
threadArgs->subTransports[i] = subTransportCopy; |
|
} |
|
va_end(subArgs); |
|
|
|
threadArgs->thread = malloc(sizeof(pthread_t)); |
|
pthread_create(threadArgs->thread, NULL, ZMQForward, threadArgs); |
|
|
|
return threadArgs; |
|
} |
|
|
|
void StopForward(ForwardZeroMQDataThreadArgs *args) |
|
{ |
|
args->running = false; |
|
pthread_join(*(args->thread), NULL); |
|
free(args->thread); |
|
free((char*)args->logCategory); |
|
free((char*)args->pubTransport); |
|
free(args); |
|
} |