O'Reilly logo

Python Cookbook by David Ascher, Alex Martelli

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Using Publish/Subscribe in a Distributed Middleware Architecture

Credit: Graham Dumpleton

Problem

You need to allow distributed services to set themselves up as publishers of information and/or subscribers to that information by writing a suitable central exchange (middleware) server.

Solution

The OSE package supports a publisher/subscriber programming model through its netsvc module. To exploit it, we first need a central middleware process to which all others connect:

# The central.py script -- needs the OSE package from http://ose.sourceforge.net

import netsvc
import signal

dispatcher = netsvc.Dispatcher(  )
dispatcher.monitor(signal.SIGINT)

exchange = netsvc.Exchange(netsvc.EXCHANGE_SERVER)
exchange.listen(11111)

dispatcher.run(  )

Then, we need service processes that periodically publish information to the central middleware process, such as:

# The publish.py script -- needs the OSE package from http://ose.sourceforge.net

import netsvc
import signal
import random

class Publisher(netsvc.Service):

    def _ _init_ _(self):
        netsvc.Service._ _init_ _(self,"SEED")
        self._count = 0
        time = netsvc.DateTime(  )
        data = { "time": time }
        self.publishReport("init", data, -1)
        self.startTimer(self.publish, 1, "1")

    def publish(self,name):
        self._count = self._count + 1
        time = netsvc.DateTime(  )
        value = int(0xFFFF*random.random(  ))
        data = { "time": time, "count": self._count, "value": value }
        self.publishReport("next", data)
        self.startTimer(self.publish, 1, "1")

dispatcher = netsvc.Dispatcher(  )
dispatcher.monitor(signal.SIGINT)

exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
exchange.connect("localhost", 11111, 5)

publisher = Publisher(  )

dispatcher.run(  )

Finally, we need services that subscribe to the published information, such as:

# The subscribe.py script -- needs the OSE package from http://ose.sourceforge.net

import netsvc
import signal

class Subscriber(netsvc.Service):

    def _ _init_ _(self):
        netsvc.Service._ _init_ _(self)
        self.monitorReports(self.seed, "SEED", "next")

    def seed(self, service, subjectName, content):
        print "%s - %s" % (content["time"], content["value"])

dispatcher = netsvc.Dispatcher(  )
dispatcher.monitor(signal.SIGINT)

exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
exchange.connect("localhost", 11111, 5)

subscriber = Subscriber(  )

dispatcher.run(  )

Discussion

This recipe is a simple example of how to set up a distributed publish/subscribe system. It shows the creation of a central exchange service that all participating processes connect to. Services can then set themselves up as publishers, and other services can subscribe to what is being published. This recipe can form the basis of many different types of applications, ranging from instant messaging to alarm systems for monitoring network equipment and stock market data feeds. (Partly because Python is used at various levels, but also because of how the underlying architecture is designed, you shouldn’t expect to be able to pass the huge amount of data updates that make up a complete stock market feed through applications built in this manner. Generally, such applications deal only with a subset of this data anyway.)

The netsvc module comes as part of the OSE package, which is available from http://ose.sourceforge.net. This recipe shows only a small subset of the actual functionality available in OSE. Other middleware-like functionality, such as a system for message-oriented request/reply, is also available (see Recipe 13.9).

The first script in the recipe, central.py, implements the middleware process to which all others subscribe. Like all OSE processes, it instantiates a Dispatcher, instantiates an Exchange in the role of an exchange server, tells it to listen on port 11111, and runs the dispatcher.

The second script, publish.py, implements an example of a publisher service. It, too, instantiates a Dispatcher and then an Exchange, but the latter is an exchange client and, therefore, rather than listening for connections, it connects to port 11111 where the middleware must be running. Before running the dispatcher, its next crucial step is instantiating a Publisher, its own custom subclass of Service. This in turn calls Service’s publishReport method, at first with an 'init' message, then through the startTimer method, which is told to run the publish method with a 'next' message every second. Each published message is accompanied by an arbitrary dictionary, which, in this recipe, carries just a few demonstration entries.

The third script, subscribe.py, implements an example of a subscriber for the publisher service in publish.py. Like the latter, it instantiates a Dispatcher and an Exchange, which connects as an exchange client to port 11111 where the middleware must be running. It implements Subscriber, the Service subclass, which calls Service’s monitorReports method for the 'next' message, registering its own seed method to be called back for each such message that is published. The latter method then prints out a couple of the entries from the content dictionary argument so we can check if the whole arrangement is functioning correctly.

To try this recipe, after downloading and installing the OSE package, run python central.py from one terminal. Then, from one or more other terminals, run an arbitrary mix of python publish.py and python subscribe.py. You will see that all subscribers are regularly notified of all the messages published by every publisher.

In a somewhat different sense, publish/subscribe is also a popular approach to loosening coupling in GUI architectures (see Recipe 9.12).

See Also

Recipe 13.9 describes another feature of the OSE package, while Recipe 9.12 shows a different approach to publish/subscribe in a GUI context; the OSE package (http://ose.sourceforge.net).

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required