Queue

Here we will use the ProcessDevice to create a QUEUE device for connecting client and server.

queuedevice.py

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
number_of_workers = 2

As noted earlier, we do not pass socket instance but socket type to ProcessDevice. Also here, we observe the constraint on request/reply pattern by setting the high water mark to 1.

queuedevice = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ)
queuedevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
queuedevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
queuedevice.setsockopt_in(zmq.HWM, 1)
queuedevice.setsockopt_out(zmq.HWM, 1)
queuedevice.start()
time.sleep (2)  

Server waits on a request to which it replies.


def server(backend_port):
    print "Connecting a server to queue device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://127.0.0.1:%s" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Received request: ", message  
        socket.send("Response from %s" % server_id)

Client makes a request and waits for a reply.

def client(frontend_port, client_id):
    print "Connecting a worker #%s to queue device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:%s" % frontend_port)
    #  Do 10 requests, waiting each time for a response
    for request in range (1,5):
        print "Sending request #%s" % request
        socket.send ("Request fron client: %s" % client_id)
        #  Get the reply.
        message = socket.recv()
        print "Received reply ", request, "[", message, "]"

We have already started our device. Now we will bring up the server, before bringing up the client. Clients make a few request to server connected to our device.

Process(target=server, args=(backend_port,)).start()  

time.sleep(2)
    
for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()

Output:

Connecting a server to queue device
Connecting a worker #0 to queue device
Sending request #1
Connecting a worker #1 to queue device
Received request:  Request fron client: 0
Received reply  1 [ Response from 6548 ]
Sending request #2
Received request:  Request fron client: 0
Sending request #1
Received reply  2 [ Response from 6548 ]
Sending request #3
Received request:  Request fron client: 0
Received request:  Request fron client: 1
Received reply  3 [ Response from 6548 ]
Sending request #4
Received request:  Request fron client: 0
Received reply  1 [ Response from 6548 ]
Sending request #2
Received request:  Request fron client: 1
Received reply  4 [ Response from 6548 ]
Received reply  2 [ Response from 6548 ]
Sending request #3
Received request:  Request fron client: 1
Received reply  3 [ Response from 6548 ]
Sending request #4
Received request:  Request fron client: 1
Received reply  4 [ Response from 6548 ]