Credit: Graham Dumpleton
You need to allow distributed services to set themselves up as publishers of information and/or subscribers to that information by writing a suitable central exchange (middleware) server.
The OSE package supports a
publisher/subscriber programming model through its
netsvc
module. To exploit it, we first need a
central middleware process to which all others connect:
# The central.py script -- needs the OSE package from http://ose.sourceforge.net import netsvc import signal dispatcher = netsvc.Dispatcher( ) dispatcher.monitor(signal.SIGINT) exchange = netsvc.Exchange(netsvc.EXCHANGE_SERVER) exchange.listen(11111) dispatcher.run( )
Then, we need service processes that periodically publish information to the central middleware process, such as:
# The publish.py script -- needs the OSE package from http://ose.sourceforge.net import netsvc import signal import random class Publisher(netsvc.Service): def _ _init_ _(self): netsvc.Service._ _init_ _(self,"SEED") self._count = 0 time = netsvc.DateTime( ) data = { "time": time } self.publishReport("init", data, -1) self.startTimer(self.publish, 1, "1") def publish(self,name): self._count = self._count + 1 time = netsvc.DateTime( ) value = int(0xFFFF*random.random( )) data = { "time": time, "count": self._count, "value": value } self.publishReport("next", data) self.startTimer(self.publish, 1, "1") dispatcher = netsvc.Dispatcher( ) dispatcher.monitor(signal.SIGINT) exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT) exchange.connect("localhost", 11111, 5) publisher = Publisher( ) dispatcher.run( )
Finally, we need services that subscribe to the published information, such as:
# The subscribe.py script -- needs the OSE package from http://ose.sourceforge.net import netsvc import signal class Subscriber(netsvc.Service): def _ _init_ _(self): netsvc.Service._ _init_ _(self) self.monitorReports(self.seed, "SEED", "next") def seed(self, service, subjectName, content): print "%s - %s" % (content["time"], content["value"]) dispatcher = netsvc.Dispatcher( ) dispatcher.monitor(signal.SIGINT) exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT) exchange.connect("localhost", 11111, 5) subscriber = Subscriber( ) dispatcher.run( )
This recipe is a simple example of how to set up a distributed publish/subscribe system. It shows the creation of a central exchange service that all participating processes connect to. Services can then set themselves up as publishers, and other services can subscribe to what is being published. This recipe can form the basis of many different types of applications, ranging from instant messaging to alarm systems for monitoring network equipment and stock market data feeds. (Partly because Python is used at various levels, but also because of how the underlying architecture is designed, you shouldn’t expect to be able to pass the huge amount of data updates that make up a complete stock market feed through applications built in this manner. Generally, such applications deal only with a subset of this data anyway.)
The netsvc
module comes as part of the OSE package,
which is available from http://ose.sourceforge.net. This recipe shows
only a small subset of the actual functionality available in OSE.
Other middleware-like functionality, such as a system for
message-oriented request/reply, is also available (see Recipe 13.9).
The first script in the recipe,
central.py
,
implements the middleware process to which all others subscribe. Like
all OSE processes, it instantiates a Dispatcher
,
instantiates an Exchange
in the role of an
exchange server, tells it to listen on port 11111, and runs the
dispatcher.
The second script,
publish.py
,
implements an example of a publisher service. It, too, instantiates a
Dispatcher
and then an
Exchange
, but the latter is an exchange client
and, therefore, rather than listening for connections, it connects to
port 11111 where the middleware must be running. Before running the
dispatcher, its next crucial step is instantiating a
Publisher
, its own custom subclass of
Service
. This in turn calls
Service
’s
publishReport
method, at first with an
'init'
message, then through the
startTimer
method, which is told to run the
publish
method with a 'next'
message every second. Each published message is accompanied by an
arbitrary dictionary, which, in this recipe, carries just a few
demonstration entries.
The third script,
subscribe.py
,
implements an example of a subscriber for the publisher service in
publish.py
. Like the latter, it instantiates a
Dispatcher
and an Exchange
,
which connects as an exchange client to port 11111 where the
middleware must be running. It implements
Subscriber
, the Service
subclass, which calls Service
’s
monitorReports
method for the
'next'
message, registering its own
seed
method to be called back for each such
message that is published. The latter method then prints out a couple
of the entries from the content
dictionary
argument so we can check if the whole arrangement is functioning
correctly.
To try this recipe, after downloading and installing the OSE package,
run python
central.py
from one
terminal. Then, from one or more other terminals, run an arbitrary
mix of python publish.py
and python subscribe.py
. You will see that all subscribers are
regularly notified of all the messages published by every publisher.
In a somewhat different sense, publish/subscribe is also a popular approach to loosening coupling in GUI architectures (see Recipe 9.12).
Recipe 13.9 describes another feature of the OSE package, while Recipe 9.12 shows a different approach to publish/subscribe in a GUI context; the OSE package (http://ose.sourceforge.net).
Get Python Cookbook now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.