Just like QUEUE, which is like the request-reply broker, FORWARDER is like the pub-sub proxy server. It allows both publishers and subscribers to be moving parts and it self becomes the stable hub for interconnecting them.
FORWARDER collects messages from a set of publishers and forwards these to a set of subscribers.
You will notice that two zmq sockets, pub and sub are bound to well known ports. The frontend speaks to publishers and the backend speaks to subscribers. You should use ZMQ_FORWARDER with a ZMQ_SUB socket for the frontend and a ZMQ_PUB socket for the backend.
Another important thing to notice is that we want all the published messages to reach to the various subscribers, hence message filtering should be off in the forwarder device. See line no 11.
import zmq def main(): try: context = zmq.Context(1) # Socket facing clients frontend = context.socket(zmq.SUB) frontend.bind("tcp://*:5559") frontend.setsockopt(zmq.SUBSCRIBE, "") # Socket facing services backend = context.socket(zmq.PUB) backend.bind("tcp://*:5560") zmq.device(zmq.FORWARDER, frontend, backend) except Exception, e: print e print "bringing down zmq device" finally: pass frontend.close() backend.close() context.term() if __name__ == "__main__": main()
Only thing that changes here is that publisher connects to the intermediary and is not bound to any well known port.
import zmq import random import sys import time port = "5559" context = zmq.Context() socket = context.socket(zmq.PUB) socket.connect("tcp://localhost:%s" % port) publisher_id = random.randrange(0,9999) while True: topic = random.randrange(1,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(1)
The subscribers are completely unaffected by introduction of intermediary - “forwarder device” and gains the ability to get messages from different publishers at no cost.
import sys import zmq port = "5560" # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print "Collecting updates from server..." socket.connect ("tcp://localhost:%s" % port) topicfilter = "9" socket.setsockopt(zmq.SUBSCRIBE, topicfilter) for update_nbr in range(10): string = socket.recv() topic, messagedata = string.split() print topic, messagedata
Executing these programs from separate shell:
python forwarder_device.py python forwarder_subscriber.py python forwarder_server.py python forwarder_server.py
Output on the subscriber:
Collecting updates from server... 9 server#3581 9 server#9578 9 server#3581 9 server#9578 9 server#3581 9 server#9578 9 server#3581 9 server#3581 9 server#9578 9 server#3581