Queue

Queue device

This is the intermediary that sits between clients and servers, forwarding request to servers and relaying replies back to client. The ZMQ device takes a device type (ZMQ.QUEUE) and the two sockets bound to well known ports.

../../_images/Queue.png

queue_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.XREP)
        frontend.bind("tcp://*:5559")
        # Socket facing services
        backend = context.socket(zmq.XREQ)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.QUEUE, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Note

ZMQ devices are full programs, devices include a while(True) loop and thus block execution permanently once invoked.

Here, you can see that client has not changed at all from our previous example by introduction of an intermediary ZMQ device.

queue_client.py

import zmq
import sys
import random

port = "5559"
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
client_id = random.randrange(1,10005)
#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"..."    
    socket.send ("Hello from %s" % client_id)
    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"

Here, the only change to the server is that it is not bound to a well known port. Instead it connects to a well known port of the intermediary.

queue_server.py

import zmq
import time
import sys
import random

port = "5560"
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:%s" % port)
server_id = random.randrange(1,10005)
while True:
    #  Wait for next request from client
    message = socket.recv()
    print "Received request: ", message
    time.sleep (1)  
    socket.send("World from server %s" % server_id)

Execute the following on different shells:

python  queue_device.py
python queue_server.py
python queue_server.py
python queue_client.py
python queue_client.py

If you run a single client, you can see that requests are load balanced among available server:

Connecting to server...
Sending request  1 ...
Received reply  1 [ World from server 7003 ]
Sending request  2 ...
Received reply  2 [ World from server 4411 ]
Sending request  3 ...
Received reply  3 [ World from server 7003 ]
Sending request  4 ...
Received reply  4 [ World from server 4411 ]
Sending request  5 ...
Received reply  5 [ World from server 7003 ]
Sending request  6 ...
Received reply  6 [ World from server 4411 ]
Sending request  7 ...
Received reply  7 [ World from server 7003 ]
Sending request  8 ...
Received reply  8 [ World from server 4411 ]
Sending request  9 ...
Received reply  9 [ World from server 7003 ]