This chapter focuses on the publish-and-subscribe (pub/sub) messaging model that was introduced in Chapter 2. The pub/sub messaging model allows a message producer (also called a publisher) to broadcast a message to one or more consumers (called subscribers). There are three important aspects of the pub/sub model:
Messages are pushed to consumers, which means that consumers are delivered messages without having to request them. Messages are exchanged through a virtual channel called a topic. A topic is a destination where producers can publish, and subscribers can consume, messages. Messages delivered to a topic are automatically pushed to all qualified consumers.
As in enterprise messaging in general, there is no coupling of the producers to the consumers. Subscribers and publishers can be added dynamically at runtime, which allows the system to grow or shrink in complexity over time.
Every client that subscribes to a topic receives its own copy of messages published to that topic. A single message produced by one publisher may be copied and distributed to hundreds, or even thousands of subscribers.
In Chapter 2 you learned the basics of the pub/sub model by developing a simple chat client. In this chapter we will build on those lessons and examine more advanced features of this model, including guaranteed messaging, topic-based addressing, durable subscriptions, request-reply, and temporary topics.
In this chapter we abandon the simple chat example for a more complex and real-world Business-to-Business (B2B) scenario. In our new example, a wholesaler wants to distribute price information to retailers, and the retailers want to respond by generating orders. We’ll implement this scenario using the publish-and-subscribe model: the wholesaler will publish messages containing new prices and hot deals, and the retailers will respond by creating their own messages to order stock.
This scenario is typical of many Business-to-Business operations. We call the clients retailers and wholesalers, but these names are really only for convenience. There’s little difference between our wholesaler/retailer scenario and a stock broker broadcasting stock prices to investors, or a manufacturer broadcasting bid requests to multiple suppliers. The fact that we use a retailer and a wholesaler to illustrate our example is much less important than the way we apply JMS.
Our simple trading system is implemented by two classes, both of
which are JMS clients:
Wholesaler
and
Retailer
.
In the interest of keeping the code simple, we won’t implement
a fancy user interface; our application has a rudimentary
command-line user interface.
Before looking at the code, let’s look at how the application
works. As with the Chat
application, the
Wholesaler
class includes a main(
)
method so it can be run as a standalone
Java application. It’s executed from the command line as
follows:
java chap4.B2B.Wholesaler localhost username password
username and password are
the authentication information for the client. The
Retailer
class can be executed in the same manner:
java chap4.B2B.Retailer localhost username password
Start your JMS server, then run one instance of a
Wholesaler
client and a
Retailer
client in separate command windows. In
the Wholesaler
client you are prompted to enter an
item description, an old price, and a new price. Enter the following
as shown:
Bowling Shoes, 100.00, 55.00
Upon hitting the Enter key, you should see the
Retailer
application display information on the
screen indicating that it has received a price change notice. You
should then see the Wholesaler
indicating that it
has received a “buy” order from the
Retailer
. Here’s the complete interaction
with the Wholesaler
and the
Retailer
:[1]
java chap4.B2B.Wholesaler localhost WHOLESALER passwd1
Enter: Item, Old Price, New Price e.g., Bowling Shoes, 100.00, 55.00Bowling Shoes, 100.00, 55.00
Order received - 1000 Bowling Shoes from DurableRetailer -----------------------java chap4.B2B.Retailer localhost RETAILER passwd2
Retailer application started. Received Hot Buy: Bowling Shoes, 100.00, 55.00 Buying 1000 Bowling Shoes
Here’s what happened. The Wholesaler
publishes a price quotation on a topic, “Hot Deals,”
which is intended for one or more Retailer
s. The
Retailer
s subscribe to the “Hot Deals”
topic in order to receive price quotes. The
Retailer
application has no interaction with a
live user. Instead, it has an autoBuy(
)
method that examines the old price and the new price. If the new
price represents a reduction of greater than ten percent, the
Retailer
sends a message back to the
Wholesaler
on the “Buy Order” topic,
telling it to purchase 1,000 items. In JMS terms, the
Wholesaler
is a
producer
of the “Hot Deals” topic and a
consumer
of the “Buy Order” topic. Conversely, the
Retailer
is a consumer of the “Hot
Deals” topic and a producer of the “Buy Order”
topic, as illustrated in Figure 4.1.
The
rest of this chapter examines the source code for the
Wholesaler
and Retailer
classes, and covers several advanced subjects related to the pub/sub
messaging model.
After the listing, we will take a brief tour of the methods in this
class, and discuss their responsibilities. We will go into detail
about the implementation later in this chapter. Now, here is the
complete definition of the Wholesaler
class, which
is responsible for publishing items to the “Hot Deals”
topic and receiving “Buy Orders” on those deals from
retailers:
public class Wholesaler implements javax.jms.MessageListener{ private javax.jms.TopicConnection connect = null; private javax.jms.TopicSession pubSession = null; private javax.jms.TopicSession subSession = null; private javax.jms.TopicPublisher publisher = null; private javax.jms.TopicSubscriber subscriber = null; private javax.jms.Topic hotDealsTopic = null; private javax.jms.TemporaryTopic buyOrdersTopic = null; public Wholesaler(String broker, String username, String password){ try { Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor InitialContext jndi = new InitialContext(env); TopicConnectionFactory factory = (TopicConnectionFactory)jndi.lookup(broker); connect = factory.createTopicConnection (username, password); pubSession = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); subSession = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); publisher = pubSession.createPublisher(hotDealsTopic); buyOrdersTopic = subSession.createTemporaryTopic( ); subscriber = subSession.createSubscriber(buyOrdersTopic); subscriber.setMessageListener(this); connect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } private void publishPriceQuotes(String dealDesc, String username, String itemDesc, float oldPrice, float newPrice){ try { javax.jms.StreamMessage message = pubSession.createStreamMessage( ); message.writeString(dealDesc); message.writeString(itemDesc); message.writeFloat(oldPrice); message.writeFloat(newPrice); message.setStringProperty("Username", username); message.setStringProperty("Itemdesc", itemDesc); message.setJMSReplyTo(buyOrdersTopic); publisher.publish( message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } catch ( javax.jms.JMSException jmse ){ jmse.printStackTrace( ); } } public void onMessage( javax.jms.Message message){ try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText( ); System.out.println("\nOrder received - "+text+ " from " + message.getJMSCorrelationID( )); } catch (java.lang.Exception rte){ rte.printStackTrace( ); } } public void exit( ){ try { connect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println("java Wholesaler broker username password"); return; } Wholesaler wholesaler = new Wholesaler(broker,username,password); try { // Read all standard input and send it as a message. java.io.BufferedReader stdin = new java.io.BufferedReader (new java.io.InputStreamReader( System.in ) ); System.out.println ("Enter: Item, Old Price, New Price"); System.out.println("\ne.g., Bowling Shoes, 100.00, 55.00"); while ( true ){ String dealDesc = stdin.readLine( ); if (dealDesc != null && dealDesc.length( ) > 0){ // Parse the deal description StringTokenizer tokenizer = new StringTokenizer(dealDesc,",") ; String itemDesc = tokenizer.nextToken( ); String temp = tokenizer.nextToken( ); float oldPrice = Float.valueOf(temp.trim()).floatValue( ); temp = tokenizer.nextToken( ); float newPrice = Float.valueOf(temp.trim()).floatValue( ); wholesaler.publishPriceQuotes(dealDesc,username, itemDesc,oldPrice,newPrice); } else { wholesaler.exit( ); } } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
The main(
)
method creates an instance of the
Wholesaler
class, passing it the information it
needs to set up its publishers and subscribers.
In the Wholesaler
class’s
constructor, JNDI is used to obtain the
“Hot Deals” topic identifier, which is then used to
create a publisher. Most of this should look familiar to you;
it’s similar in many ways to the Chat
application, except for the creation of a temporary topic, which is
discussed in more detail later in this section.
Once the Wholesaler
is instantiated, the
main( )
method continues to monitor the command
line for new “Hot Deals.” When a “Hot Deal”
is entered at the command prompt, the main( )
method parses the information and passes it to the
Wholesaler
instance via the
publishPriceQuotes(
)
method.
The publishPriceQuotes( )
method is responsible
for publishing messages containing information about price quotes to
the “Hot Deals” topic.
The onMessage( )
method receives messages from clients
responding to deals published on the “Hot Deals” topic.
The contents of these messages are simply printed to the command
line.
Here is the complete definition of the
Retailer
class,
which subscribes to the “Hot Deals” topic and responds
with “Buy Orders” on attractive deals:
public class Retailer implements javax.jms.MessageListener{ private javax.jms.TopicConnection connect = null; private javax.jms.TopicSession session = null; private javax.jms.TopicPublisher publisher = null; private javax.jms.Topic hotDealsTopic = null; public Retailer( String broker, String username, String password){ try { Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor InitialContext jndi = new InitialContext(env); TopicConnectionFactory factory = (TopicConnectionFactory)jndi.lookup(broker); connect = factory.createTopicConnection(username, password); connect.setClientID("DurableRetailer"); session = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); javax.jms.TopicSubscriber subscriber = session.createDurableSubscriber(hotDealsTopic, "Hot Deals Subscription"); subscriber.setMessageListener(this); connect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } public void onMessage(javax.jms.Message aMessage){ try { autoBuy(aMessage); } catch (java.lang.RuntimeException rte){ rte.printStackTrace( ); } } private void autoBuy (javax.jms.Message message){ int count = 1000; try { StreamMessage strmMsg = (StreamMessage)message; String dealDesc = strmMsg.readString( ); String itemDesc = strmMsg.readString( ); float oldPrice = strmMsg.readFloat( ); float newPrice = strmMsg.readFloat( ); System.out.println("Received Hot Buy :"+dealDesc); // If price reduction is greater than 10 percent, buy if (newPrice == 0 || oldPrice / newPrice > 1.1){ System.out.println("\nBuying " + count +" "+ itemDesc); TextMessage textMsg = session.createTextMessage( ); textMsg.setText(count + " " + itemDesc ); javax.jms.Topic buytopic = (javax.jms.Topic)message.getJMSReplyTo( ); publisher = session.createPublisher(buytopic); textMsg.setJMSCorrelationID("DurableRetailer"); publisher.publish( textMsg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } else { System.out.println ("\nBad Deal- Not buying."); } } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } } private void exit(String s){ try { if ( s != null && s.equalsIgnoreCase("unsubscribe")) { subscriber.close( ); session.unsubscribe("Hot Deals Subscription"); } connect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println ("java Retailer broker username password"); return; } Retailer retailer = new Retailer(broker, username, password); try { System.out.println("\nRetailer application started.\n"); // Read all standard input and send it as a message. java.io.BufferedReader stdin = new java.io.BufferedReader ( new java.io.InputStreamReader( System.in ) ); while ( true ){ String s = stdin.readLine( ); if ( s == null )retailer.exit(null); else if ( s.equalsIgnoreCase("unsubscribe") ) retailer.exit ( s ); } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
The
main( )
method of
Retailer
is much like the main(
)
method of
Wholesaler
. It creates an instance of the
Retailer
class and passes it the information it
needs to set up its publishers and subscribers.
The constructor of the
Retailer
class is also similar to that of the
Wholesaler
class, except that it creates a
durable subscription using the “Hot
Deals” topic. Durable subscriptions will be discussed in more
detail later in this section.
Once the Retailer
is instantiated, the
main( )
method uses the readLine(
)
method as a way of blocking program execution in order to monitor for
message input.
The publishPriceQuotes(
)
method is responsible for publishing messages
containing information about price quotes to the “Hot
Deals” topic.
The onMessage( )
method receives messages from the
Wholesaler
client, then delegates its work to the
autoBuy( )
method. The autoBuy( )
method examines the message, determines whether the price change is
significant, and arbitrarily orders 1000 items. It orders the items
by publishing a persistent message back to the
Wholesaler
client’s temporary topic, using
the
JMSCorrelationID
as a way of identifying itself. We will examine persistent publishing
and temporary topics in the next section.
In the chat example we explored in Chapter 2, we assumed that JMS clients would communicate with each other using established topics on which messages are asynchronously produced and consumed. In the next sections, we’ll explore ways to augment this basic mechanism. We’ll start by looking at temporary topics, which is a mechanism for JMS clients to create topics dynamically.
The constructor of the
Wholesaler
class creates a temporary topic. This
topic is used as a
JMSReplyTo
destination for messages published
to the “Hot Deals” topic in the
publishPriceQuotes( )
method:
public Wholesaler(String broker, String username, String password){ try { ... session = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); ...buyOrdersTopic = session.createTemporaryTopic( );
... } ... private void publishPriceQuotes(String dealDesc, String username, String itemDesc, float oldPrice, float newPrice){ try { javax.jms.StreamMessage message = session.createStreamMessage( ); ...message.setJMSReplyTo(buyOrdersTopic);
publisher.publish( message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 600000); ... }
When the Retailer
client decides to respond to a
“Hot Deals” message with a buy order, it uses the
JMSReplyTo
destination, which is the temporary
topic created by Wholesaler
application:
private void autoBuy (javax.jms.Message message){ int count = 1000; try { StreamMessage strmMsg = (StreamMessage)message; ... // If price reduction is greater than 10 percent, buy if (newPrice == 0 || oldPrice / newPrice > 1.1){ ...javax.jms.Topic buytopic = (javax.jms.Topic)message.getJMSReplyTo( );
publisher = session.createPublisher(buytopic
); ... }
A temporary topic is a topic that is dynamically created by the JMS
provider, using the createTemporaryTopic(
)
method of the
TopicSession
object. A temporary topic is associated with the connection that
belongs to the TopicSession
that created it. It is
only active for the duration of the connection, and it is guaranteed
to be unique across all connections. Since it is temporary, it
can’t be durable: it lasts only as long as its associated
client connection is active. In all other respects it is just like a
“regular” topic.
Since a temporary topic is unique across all client
connections—-it is obtained dynamically through a method call
on a client’s session object—it is unavailable to other
JMS clients unless the topic identity is transferred using the
JMSReplyTo
header. While any client may publish
messages on another client’s temporary topic, only the sessions
that are associated with the JMS client connection that created the
temporary topic may subscribe to it. JMS clients can also, of course,
publish messages to their own temporary topics.
In the interest of exploring concepts like temporary topics we have
designed our B2B example so that the consumer responds directly to
the producer. In larger real-world applications, however, there may
be many publishers and subscribers exchanging messages across many
topics. A message may represent a workflow, which may take multiple
hops through various stages of a business process. In that type of
scenario the consumer of a message may never respond directly to the
producer that originated the message. It is more likely that the
response to the message will be forwarded to some other process.
Thus, the JMSReplyTo
header can be used as a place
to specify a forwarding address, rather than the destination address
of the original sender.
JMS provides a set of design patterns and helper classes for performing a direct request-reply conversation, which we will get into later in Section 4.6 of this chapter.
A durable subscription is one that outlasts a client’s connection with a message server. While a durable subscriber is disconnected from the JMS server, it is the responsibility of the server to store messages the subscriber misses. When the durable subscriber reconnects, the message server sends it all the unexpired messages that accumulated. This behavior is commonly referred to as store-and-forward messaging . Store-and-forward messaging is a key component of the guaranteed messaging solution. Durable subscriptions make a JMS consumer tolerant of disconnections, whether they are intentional or the result of a partial failure
We can demonstrate durable subscriptions with the B2B example. If you
still have the Retailer
application up and
running, try simulating an abnormal shutdown by typing Ctrl-C in the
command window. Leave the Wholesaler
running. In
the command window for the wholesaler application, type:
Surfboards, 500.00, 299.99 Hockey Sticks, 20.00, 9.99
Once the deals have been entered, restart the
Retailer
application:
java chap4.B2B.Retailer localhost username password
The first time you ran the Retailer
application, a
topic was registered as durable. When you abnormally terminated the
application, the subscription information was retained by the JMS
provider. When the Retailer
application comes back
up, the surfboards and hockey sticks messages are received,
processed, and responded to. Because the Retailer
had a durable subscription to the “Hot Deals” topic, the
JMS server saved the messages that arrived while the
Retailer
was down. The messages were then
delivered when the Retailer
resubscribed to the
topic.
Here’s how we set up the durable subscription. A
durable subscription is created by a
TopicSession
object, the same as with a nondurable
subscription. The Retailer
class obtains a durable
subscription in its
constructor:
public Retailer( String broker, String username, String password){
try {
...
hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
javax.jms.TopicSubscriber subscriber =
session.createDurableSubscriber(hotDealsTopic,
"Hot Deals Subscription");
subscriber.setMessageListener(this);
connect.start( );
....
}
The createDurableSubscriber(
)
method takes two
parameters: a topic
name, and a subscription name. In our example we are using the
String
“Hot Deals Subscription” to
identify the subscription name. While topic names are specified as
being supported as JMS administered objects, subscription names are
not. While not required by JMS, it is good practice for a JMS
provider to provide an administration tool that monitors active
subscription names, as illustrated in Figure 4.2.
A durable subscription’s uniqueness is defined by the client ID and the subscription name. In the event that the client disconnects without unsubscribing, a JMS provider will store these messages until they can be delivered later. Upon reconnecting and resubscribing, the JMS provider will match up the messages based on these two identifiers, and deliver them to the subscriber.
You might think that the client ID and the topic would be enough for the provider to uniquely identify a durable subscription. However, a client may have multiple subscriptions on the same topic; for example, a client may want to use different message selectors to sort the incoming messages. (Message selectors are discussed in detail in Appendix D.) Therefore, durable subscriptions must be identified by their own name; simply using the topic name and the client ID will not suffice.
The JMS specification is intentionally vague about how the JMS
provider determines the uniqueness of a client ID. Various provider
implementations are allowed to have their own internal rules for what
constitutes a unique client. The setClientID( )
method on the connection object is provided in the API as a hint. The
client ID is set in the constructor of our
Retailer
example:
public Retailer( String broker, String username, String password){
try {
....
connect = factory.createTopicConnection (username, password);
connect.setClientID(username);
....
}
....
}
Both the Wholesaler
and
Retailer
classes publish messages using the
persistent delivery mode:
publisher.publish(
message,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
Note the use of the overloaded publish(
)
method, with parameters that specify delivery mode, priority, and
message expiration. This method provides an alternative to using the
Message.setJMSDeliveryMode( )
and
TopicPublisher.setTimeToLive( )
operations, as
discussed in Chapter 3. In JMS, the delivery mode
(persistent, nonpersistent) is a Quality of Service (QoS) setting on
the message itself. Marking the message as persistent ensures that
the message will be saved to a reliable persistent store by the JMS
provider before the publish( )
method returns, and
allows client execution to continue. More on how and why this works
reliably can be found in Chapter 6.
When you are using a temporary topic as a way of posting a reply to a message, you should realize that the total round trip (the initial message and the reply) isn’t guaranteed to survive a certain failure condition, even if you use persistent messages. The problem is that temporary topics cannot be used for durable subscriptions. Consider the following scenario:
A JMS client (producer) creates a temporary topic, puts it in the
JMSReplyTo
header of a message, marks the message as persistent, and publishes it.The subscriber gets the message and publishes a response on the temporary topic using a persistent message.
The original producer expects a reply on the temporary topic, but disconnects or crashes before it is received.
The original producer restarts, and is no longer able to subscribe to the original temporary topic that it had established in its previous life. It can’t resubscribe because the temporary topic was only valid for the duration of the previous connection. Calling
createTemporaryTopic( )
in the new session returns a new temporary topic, not the previous one.
This is a subtle point, since any client with a nondurable
subscription will not get messages during a failure. In other
scenarios it may be acceptable to lose messages for a time, yet still
be able to start receiving newly published “responses”
when the original producer of the message starts up again. In the B2B
example, a failure of the Wholesaler
means that
the reply messages sent to the temporary topic will be lost. An
alternative and superior design would use the
JMSReplyTo
header, with an established topic
instead of a temporary one. Chapter 6 provides
more detail on message delivery semantics, Quality of Service, and
failure conditions.
In the B2B example, we are using the
JMSCorrelationID
as a way for the Retailer
to associate its
identity with its reply message, as
illustrated by the following code in Retailer.autoBuy(
)
:
private void autoBuy (javax.jms.Message message){
...
publisher = session.createPublisher(buytopic);
textMsg.setJMSCorrelationID("DurableRetailer");
publisher.publish(
textMsg,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
...
}
In Wholesaler
, the
JMSCorrelationID
is extracted in the
onMessage( )
handler, and simply printed on the
command line:
public void onMessage( javax.jms.Message message){
...
System.out.println("Order received - "+text+
" from " + message.getJMSCorrelationID( )
);
...
}
Another way to associate the Retailer
’s
identity with the reply message would be to store something unique in
a message property, or in the message body itself.
A more common use of the JMSCorrelationID
is not
for the sake of establishing identity; it is for correlating the
asynchronous reception of a message with a message that had been
previously sent. A message consumer wishing to create a message to be
used as a response may place the JMSMessageID
of
the original message in the JMSCorrelationID
of
the response message.
JMS
provides design patterns and helper classes to make it easier to
write applications that need a direct request-reply between two end
points. We have already shown two JMS features that can be used as
part of a request-reply solution: temporary topics and the
JMSReplyTo
header. These features can be used
independently or in combination to create an asynchronous
request-reply conversation. On occasion you may want to create a
synchronous request-reply conversation.
There are two ways of doing this. You may call the
TopicSubscriber.receive( )
method directly, or you
may make use of the TopicRequestor
class.
The receive( )
method is defined in the MessageConsumer
class,
which is the superclass of TopicSubscriber
. The
receive( )
method is a way of proactively asking
for the message rather than passively receiving it through the
onMessage( )
callback. In fact, the use of the
receive( )
method negates the use of the
onMessage( )
callback. The default behavior of the
receive( )
method is to block program execution
until a message is retrieved from the message server. The
receive( )
method effectively changes the
pub/sub model from
a “push” to a “pull” model. From the
client’s perspective, you can think of this as a polling
mechanism; although that’s not necessarily how it is
implemented by the JMS provider.
There are three flavors of the receive( )
method:
package javax.jms; public interface MessageConsumer{ ... Message receive( ); Message receive(long timeout); Message receiveNoWait( ); ... }
The receive( )
method with no parameters blocks
indefinitely, until a message is received. The
receive(long
timeout)
method
blocks until a message is received, or until the timeout period
expires, whichever comes first. The receive( )
method will return null
if the session is closed
while the method is blocking. The receiveNoWait( )
method does not block at all. It either returns a message if one is
available, or it returns null
, if there is nothing
currently pending to be delivered. Here is a slightly modified
version of Wholesaler.publishPriceQuotes()
that
makes use of the receive( )
method:
private void publishPriceQuotes(String dealDesc, String username,
String itemDesc, float oldPrice,
float newPrice){
...
System.out.println("\nInitiating Synchronous Request");
// Publish the message persistently
publisher.publish(
msg, //message
javax.jms.DeliveryMode.PERSISTENT, //publish persistently
javax.jms.Message.DEFAULT_PRIORITY,//priority
MESSAGE_LIFESPAN); //Time to Live
javax.jms.Message aMessage = subscriber.receive( );
System.out.println("\nRequest Sent, Reply Received!");
if (aMessage != null)
{
onMessage(aMessage);
}
...
}
In this example the subscriber, which subscribes to the “Buy
Order” temporary topic, has its receive( )
method called. The receive( )
method blocks until
a message is published by the Retailer
to the
“Buy Order” topic. The Wholesaler
client becomes a synchronous client waiting for the
Retailer
to respond. When the receive(
)
method returns with a message, the
Wholesaler
simply calls onMessage(
)
directly to process the message.
Due to
threading restrictions imposed on a
JMS session object, it is impractical to have both synchronous and
asynchronous operations on a session. Hence the
Wholesaler
’s constructor does not make a
call to setMessageListener(this)
. The
onMessage( )
handler will never get called
automatically.
The recipient side of the conversation still looks the same as in our
previous example. The Retailer.autoBuy( )
method
receives the message, gets the return address from the
JMSReplyTo
header, and publishes a response using
that topic.
It is erroneous for a session to be operated by more than one thread
of control at any given time. In our example, there appears to be
only one thread of control: the main thread of the application.
However, when the onMessage( )
handler is invoked,
it is being called by another thread that is owned by the JMS
provider. Due to the asynchronous nature of the onMessage(
)
callback, it could possibly be invoked while the main
thread is blocking on a synchronous receive(
)
.
The
TopicRequestor
class is distributed in source code form as a part of the JMS 1.0.2
distribution package. The class is very simple. Its constructor takes
two parameters: a session and a topic. The constructor creates a
temporary topic to be used for the duration of the session. Its most
important method is request(
)
,
which looks like this:
public Message request(Message message) throws JMSException {
message.setJMSReplyTo(tempTopic);
publisher.publish(message);
return(subscriber.receive( )
);
}
The use of the TopicRequestor
is similar to our
receive( )
example, except that the calls to
publish( )
and receive( )
are
replaced with one call to request( )
. Here is a
modified excerpt from Wholesaler.publishPriceQuotes(
)
illustrating how to use a
TopicRequestor
:
private void publishPriceQuotes(String dealDesc, String username,
String itemesc, float oldPrice,
float newPrice){
...
System.out.println("\nInitiating Synchronous Request");
javax.jms.TopicRequestor requestor =
new javax.jms.TopicRequestor(session, pricetopic);
javax.jms.Message aMessage = requestor.request(msg)
;
System.out.println("\nRequest Sent, Reply Received!");
if (aMessage != null)
{
onMessage(aMessage);
}
...
}
As in our previous receive( )
example, the
recipient side of the conversation remains unchanged.
Retailer.autoBuy( )
receives the message, gets the
return address from the JMSReplyTo
header, and
publishes a response using that topic.
As you can see, the TopicRequestor
object is a
higher-level abstraction built on top of the
TopicSubscriber.receive( )
mechanism. It is very
handy if you are willing to live with its limitations. Here are some
reasons why you may want to call receive( )
yourself instead of using the TopicRequestor
:
You may want to set time-to-live or persistent properties on the message.
You may not want to use a temporary topic.
TopicRequestor
creates its own temporary topic as its way of getting a response back.You want to use the alternate
receive(long
timeout)
orreceiveNoWait( )
options.You may want to publish on a topic, and receive responses on a p2p queue.
You may want to receive more than one message in response to a request.
TopicRequestor.close( )
will arbitrarily close the session. It may not be the behavior you are looking for.You may want to receive the responses using a transaction. (More on JMS transactions can be found in Chapter 6.)
Upon closing the session, the JMS provider should automatically take
care of unsubscribing any nondurable
subscriptions that were created by the session. But there may be
cases where you want to explicitly unsubscribe a durable subscriber
in a client application. Here is how that is accomplished in
Retailer.exit( )
:
private void exit(String s){
try {
if ( s != null &&
s.equalsIgnoreCase("unsubscribe"))
{
subscriber.close( );
session.unsubscribe("Hot Deals Subscription");
}
connect.close( );
} catch (javax.jms.JMSException jmse){
jmse.printStackTrace( );
}
System.exit(0);
}
For nondurable subscriptions, calling the close( )
method on the TopicSubscriber
class is sufficient.
For durable subscriptions, there is a unsubscribe(String
name)
method on the TopicSession
object,
which takes the subscription name as its parameter. This informs the
JMS provider that it should no longer store messages on behalf of
this client. It is an error to call the unsubscribe(
)
method without first closing the subscription. Hence both
methods need to be called for durable subscriptions.
[1]
WHOLESALER
and RETAILER
are usernames you have set up when configuring your JMS server.
passwd1
and passwd2
are the
passwords you’ve assigned to those usernames. If you are using
an evaluation version of a JMS provider, it may not be necessary to
set up usernames and passwords; check your vendor’s
documentation for more information.
Get Java Message Service now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.