Forwarder

Forwarder device

Just like QUEUE, which is like the request-reply broker, FORWARDER is like the pub-sub proxy server. It allows both publishers and subscribers to be moving parts and it self becomes the stable hub for interconnecting them.

FORWARDER collects messages from a set of publishers and forwards these to a set of subscribers.

../../_images/forwarder.png

You will notice that two zmq sockets, pub and sub are bound to well known ports. The frontend speaks to publishers and the backend speaks to subscribers. You should use ZMQ_FORWARDER with a ZMQ_SUB socket for the frontend and a ZMQ_PUB socket for the backend.

Another important thing to notice is that we want all the published messages to reach to the various subscribers, hence message filtering should be off in the forwarder device. See line no 11.

forwarder_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://*:5559")
        
        frontend.setsockopt(zmq.SUBSCRIBE, "")
        
        # Socket facing services
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.FORWARDER, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Only thing that changes here is that publisher connects to the intermediary and is not bound to any well known port.

forwarder_server.py

import zmq
import random
import sys
import time

port = "5559"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:%s" % port)
publisher_id = random.randrange(0,9999)
while True:
    topic = random.randrange(1,10)
    messagedata = "server#%s" % publisher_id
    print "%s %s" % (topic, messagedata)
    socket.send("%d %s" % (topic, messagedata))
    time.sleep(1)

The subscribers are completely unaffected by introduction of intermediary - “forwarder device” and gains the ability to get messages from different publishers at no cost.

forwarder_subscriber.py

import sys
import zmq

port = "5560"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from server..."
socket.connect ("tcp://localhost:%s" % port)
topicfilter = "9"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
for update_nbr in range(10):
    string = socket.recv()
    topic, messagedata = string.split()
    print topic, messagedata

Executing these programs from separate shell:

python forwarder_device.py
python forwarder_subscriber.py
python forwarder_server.py
python forwarder_server.py

Output on the subscriber:

Collecting updates from server...
9 server#3581
9 server#9578
9 server#3581
9 server#9578
9 server#3581
9 server#9578
9 server#3581
9 server#3581
9 server#9578
9 server#3581