By Richard Monson-Haefel, David A. Chappell
Book Price: $34.95 USD
£24.95 GBP
PDF Price: $27.99
Cover | Table of Contents | Colophon
There were a number of MOM vendors that participated in the creation of JMS. It was an industry effort rather than a Sun effort. Sun was the spec lead and did shepherd the work but it would not have been successful without the direct involvement of the messaging vendors. Although our original objective was to provide a Java API for connectivity to MOM systems, this changed over the course of the work to a broader objective of supporting messaging as a first class Java distributed computing paradigm on equal footing with RPC.—Mark Hapner, JMS spec lead, Sun Microsystems
Chat
program to join a specific chat room (topic), and deliver and receive
messages to and from that room:
package chap2.chat;
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.Properties;
public class Chat implements javax.jms.MessageListener{
private TopicSession pubSession;
private TopicSession subSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String username;
/* Constructor. Establish JMS publisher and subscriber */
public Chat(String topicName, String username, String password)
throws Exception {
// Obtain a JNDI connection
Properties env = new Properties( );
// ... specify the JNDI properties specific to the vendor
InitialContext jndi = new InitialContext(env);
// Look up a JMS connection factory
TopicConnectionFactory conFactory =
(TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
// Create a JMS connection
TopicConnection connection =
conFactory.createTopicConnection(username,password);
// Create two JMS session objects
TopicSession pubSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher =
pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber =
subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application
set(connection, pubSession, subSession, publisher, username);
// Start the JMS connection; allows messages to be delivered
connection.start( );
}
/* Initialize the instance variables */
public void set(TopicConnection con, TopicSession pubSess,
TopicSession subSess, TopicPublisher pub,
String username) {
this.connection = con;
this.pubSession = pubSess;
this.subSession = subSess;
this.publisher = pub;
this.username = username;
}
/* Receive message from topic subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText( );
System.out.println(text);
} catch (JMSException jmse){ jmse.printStackTrace( ); }
}
/* Create and send message using topic publisher */
protected void writeMessage(String text) throws JMSException {
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
/* Close the JMS connection */
public void close( ) throws JMSException {
connection.close( );
}
/* Run the Chat client */
public static void main(String [] args){
try{
if (args.length!=3)
System.out.println("Topic or username missing");
// args[0]=topicName; args[1]=username; args[2]=password
Chat chat = new Chat(args[0],args[1],args[2]);
// Read from command line
BufferedReader commandLine = new
java.io.BufferedReader(new InputStreamReader(System.in));
// Loop until the word "exit" is typed
while(true){
String s = commandLine.readLine( );
if (s.equalsIgnoreCase("exit")){
chat.close( ); // close down connection
System.exit(0);// exit program
} else
chat.writeMessage(s);
}
} catch (Exception e){ e.printStackTrace( ); }
}
}
Chat
program to join a specific chat room (topic), and deliver and receive
messages to and from that room:
package chap2.chat;
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.Properties;
public class Chat implements javax.jms.MessageListener{
private TopicSession pubSession;
private TopicSession subSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String username;
/* Constructor. Establish JMS publisher and subscriber */
public Chat(String topicName, String username, String password)
throws Exception {
// Obtain a JNDI connection
Properties env = new Properties( );
// ... specify the JNDI properties specific to the vendor
InitialContext jndi = new InitialContext(env);
// Look up a JMS connection factory
TopicConnectionFactory conFactory =
(TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
// Create a JMS connection
TopicConnection connection =
conFactory.createTopicConnection(username,password);
// Create two JMS session objects
TopicSession pubSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher =
pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber =
subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application
set(connection, pubSession, subSession, publisher, username);
// Start the JMS connection; allows messages to be delivered
connection.start( );
}
/* Initialize the instance variables */
public void set(TopicConnection con, TopicSession pubSess,
TopicSession subSess, TopicPublisher pub,
String username) {
this.connection = con;
this.pubSession = pubSess;
this.subSession = subSess;
this.publisher = pub;
this.username = username;
}
/* Receive message from topic subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText( );
System.out.println(text);
} catch (JMSException jmse){ jmse.printStackTrace( ); }
}
/* Create and send message using topic publisher */
protected void writeMessage(String text) throws JMSException {
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
/* Close the JMS connection */
public void close( ) throws JMSException {
connection.close( );
}
/* Run the Chat client */
public static void main(String [] args){
try{
if (args.length!=3)
System.out.println("Topic or username missing");
// args[0]=topicName; args[1]=username; args[2]=password
Chat chat = new Chat(args[0],args[1],args[2]);
// Read from command line
BufferedReader commandLine = new
java.io.BufferedReader(new InputStreamReader(System.in));
// Loop until the word "exit" is typed
while(true){
String s = commandLine.readLine( );
if (s.equalsIgnoreCase("exit")){
chat.close( ); // close down connection
System.exit(0);// exit program
} else
chat.writeMessage(s);
}
} catch (Exception e){ e.printStackTrace( ); }
}
}
Message is the most important part of the
entire JMS specification. All data and events in a JMS application
are communicated with messages, while the rest of JMS exists to
facilitate the transfer of messages. They are the lifeblood of the
system.Message
object has two parts: the message data
itself, called the payload or message body, and the message headers
and properties (see Figure 3.1).
setJMS<HEADER>(
)
,
getJMS<HEADER>( )
. Here is a partial definition of the
Message
interface that shows all the
JMS
header methods:public interface Message {
public Destination getJMSDestination( ) throws JMSException;
public void setJMSDestination(Destination destination)
throws JMSException;
public int getJMSDeliveryMode( ) throws JMSException;
public void setJMSDeliveryMode(int deliveryMode)
throws JMSException;
public String getJMSMessageID( ) throws JMSException;
public void setJMSMessageID(String id) throws JMSException;
public long getJMSTimestamp( ) throws JMSException;
public void setJMSTimestamp(long timestamp) throws JMSException;
public long getJMSExpiration( ) throws JMSException;
public void setJMSExpiration(long expiration) throws JMSException;
public boolean getJMSRedelivered( ) throws JMSException;
public void setJMSRedelivered(boolean redelivered)
throws JMSException;
public int getJMSPriority( ) throws JMSException;
public void setJMSPriority(int priority) throws JMSException;
public Destination getJMSReplyTo( ) throws JMSException;
public void setJMSReplyTo(Destination replyTo) throws JMSException;
public String getJMSCorrelationID( ) throws JMSException;
public void setJMSCorrelationID(String correlationID)
throws JMSException;
public byte[] getJMSCorrelationIDAsBytes( ) throws JMSException;
public void setJMSCorrelationIDAsBytes(byte[] correlationID)
throws JMSException;
public String getJMSType( ) throws JMSException;
public void setJMSType(String type) throws JMSException;
}
Message
interface provides several accessor
and mutator methods for reading and writing properties. The value of
a property can be a
String, boolean,
byte, double,
int, long, or
float.Message objects by the application developer; the
JMS extension and provider-specific properties are additional headers
that are, for the most part, automatically added by the JMS provider.TextMessage message = pubSession.createTextMessage( );
message.setText(text);
message.setStringProperty("username",username);
publisher.publish(message);
username is
not meaningful outside the Chat application; it is
used exclusively by the application to filter messages based on the
identity of the publisher.boolean, byte,
short, int,
long, float,
double, or
String. The
javax.jms.Message
interface provides accessor
and mutator methods for each of these property value types. Here is a
subset of the Message interface definition that
shows these methods:package javax.jms;
public interface Message {
public String getStringProperty(String name)
throws JMSException, MessageFormatException;
public void protected void writeMessage(String text) throws JMSException{
TextMessage message = session.createTextMessage( );
message.setText(text);
message.setStringProperty("username",username);
publisher.publish(message);
}
TopicSubscriber subscriber =
session.createSubscriber(chatTopic, " username != 'William' ",false);
username property
equal to 'William'.Message
interface types that must be supported by
JMS providers. Although JMS defines the Message
interfaces, it doesn't define their implementation. This allows
vendors to implement and transport messages in their own way, while
maintaining a consistent and standard interface for the JMS
application developer. The six message interfaces are
Message and its five sub-interfaces:
TextMessage, StreamMessage,
MapMessage, ObjectMessage, and
BytesMessage.Message interfaces are defined according to
the kind of payload they are designed to carry. In some cases,
Message types were included in JMS to support
legacy payloads that are common and useful, which is the case with
the text, bytes, and
stream message types. In other cases, the
Message types were defined to facilitate emerging
needs; for example, ObjectMessage can transport
serializable Java objects. Some vendors may provide other proprietary
message types. Progress' SonicMQ and SoftWired's iBus,
for example, provide an XMLMessage type that
extends the TextMessage, allowing developers to
deal with the message directly through DOM or SAX interfaces. The
XMLMessage type may become a standard message type
in a future version of the specification. At the time of this
writing, Sun Microsystems was starting discussions about adding an
XMLMessage type.javax.jms.Message
, which serves as the base interface
to the other message types. As shown below, the
Message
type can be
created and used as a JMS message with no payload:// Create and deliver a Message
Message message = session.createMessage( );
publisher.publish(message);
...
// Receive a message on the consumer
public void onMessage(Message message){
// No payload, process event notification
}
Wholesaler
and
Retailer
.
In the interest of keeping the code simple, we won't implement
a fancy user interface; our application has a rudimentary
command-line user interface.Chat application, the
Wholesaler class includes a main(
)
method so it can be run as a standalone
Java application. It's executed from the command line as
follows:java chap4.B2B.Wholesaler localhost username password
Retailer class can be executed in the same manner:java chap4.B2B.Retailer localhost username password
Wholesaler client and a
Retailer client in separate command windows. In
the Wholesaler client you are prompted to enter an
item description, an old price, and a new price. Enter the following
as shown:Wholesaler class creates a temporary topic. This
topic is used as a
JMSReplyTo
destination for messages published
to the "Hot Deals" topic in the
publishPriceQuotes( ) method:public Wholesaler(String broker, String username, String password){
try {
...
session =
connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
...
buyOrdersTopic = session.createTemporaryTopic( );
...
}
...
private void publishPriceQuotes(String dealDesc, String username,
String itemDesc, float oldPrice,
float newPrice){
try {
javax.jms.StreamMessage message = session.createStreamMessage( );
...
message.setJMSReplyTo(buyOrdersTopic);
publisher.publish(
message,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
600000);
...
}
Retailer client decides to respond to a
"Hot Deals" message with a buy order, it uses the
JMSReplyTo destination, which is the temporary
topic created by Wholesaler application:private void autoBuy (javax.jms.Message message){
int count = 1000;
try {
StreamMessage strmMsg = (StreamMessage)message;
...
// If price reduction is greater than 10 percent, buy
if (newPrice == 0 || oldPrice / newPrice > 1.1){
...
javax.jms.Topic buytopic =
(javax.jms.Topic)message.getJMSReplyTo( );
publisher = session.createPublisher(Retailer application up and
running, try simulating an abnormal shutdown by typing Ctrl-C in the
command window. Leave the Wholesaler running. In
the command window for the wholesaler application, type:Surfboards, 500.00, 299.99 Hockey Sticks, 20.00, 9.99
Retailer application:java chap4.B2B.Retailer localhost username password
Retailer application, a
topic was registered as durable. When you abnormally terminated the
application, the subscription information was retained by the JMS
provider. When the Retailer application comes back
up, the surfboards and hockey sticks messages are received,
processed, and responded to. Because the Retailer
had a durable subscription to the "Hot Deals" topic, the
JMS server saved the messages that arrived while the
Retailer was down. The messages were then
delivered when the Retailer resubscribed to the
topic.TopicSession object, the same as with a nondurable
subscription. The Retailer class obtains a durable
subscription in its
constructor:public Retailer( String broker, String username, String password){
try {
...
hotDealsTopic = (Topic)jndi.lookup("Hot Deals");Wholesaler and
Retailer classes publish messages using the
persistent delivery mode:publisher.publish(
message,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
publish(
)
method, with parameters that specify delivery mode, priority, and
message expiration. This method provides an alternative to using the
Message.setJMSDeliveryMode( ) and
TopicPublisher.setTimeToLive( ) operations, as
discussed in Chapter 3. In JMS, the delivery mode
(persistent, nonpersistent) is a Quality of Service (QoS) setting on
the message itself. Marking the message as persistent ensures that
the message will be saved to a reliable persistent store by the JMS
provider before the publish( ) method returns, and
allows client execution to continue. More on how and why this works
reliably can be found in Chapter 6.JMSReplyTo header of a message, marks the message
as persistent, and publishes it.JMSCorrelationID
as a way for the Retailer to associate its
identity with its reply message, as
illustrated by the following code in Retailer.autoBuy(
)
:private void autoBuy (javax.jms.Message message){
...
publisher = session.createPublisher(buytopic);
textMsg.setJMSCorrelationID("DurableRetailer");
publisher.publish(
textMsg,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
...
}
Wholesaler, the
JMSCorrelationID is extracted in the
onMessage( ) handler, and simply printed on the
command line:public void onMessage( javax.jms.Message message){
...
System.out.println("Order received - "+text+
" from " + message.getJMSCorrelationID( ));
...
}
Retailer's
identity with the reply message would be to store something unique in
a message property, or in the message body itself.JMSCorrelationID is not
for the sake of establishing identity; it is for correlating the
asynchronous reception of a message with a message that had been
previously sent. A message consumer wishing to create a message to be
used as a response may place the JMSMessageID of
the original message in the JMSCorrelationID of
the response message.JMSReplyTo header. These features can be used
independently or in combination to create an asynchronous
request-reply conversation. On occasion you may want to create a
synchronous request-reply conversation.
There are two ways of doing this. You may call the
TopicSubscriber.receive( ) method directly, or you
may make use of the TopicRequestor class.receive( )
method is defined in the MessageConsumer class,
which is the superclass of TopicSubscriber. The
receive( ) method is a way of proactively asking
for the message rather than passively receiving it through the
onMessage( ) callback. In fact, the use of the
receive( ) method negates the use of the
onMessage( ) callback. The default behavior of the
receive( ) method is to block program execution
until a message is retrieved from the message server. The
receive( ) method effectively changes the
pub/sub model from
a "push" to a "pull" model. From the
client's perspective, you can think of this as a polling
mechanism; although that's not necessarily how it is
implemented by the JMS provider.receive( ) method:package javax.jms;
public interface MessageConsumer{
...
Message receive( );
Message receive(long timeout);
Message receiveNoWait( );
...
}
The receive( ) method with no parameters blocks
indefinitely, until a message is received. The
receive(long
timeout) method
blocks until a message is received, or until the timeout period
expires, whichever comes first. The receive( )
method will return null if the session is closed
while the method is blocking. The receiveNoWait( )
method does not block at all. It either returns a message if one is
available, or it returns Retailer.exit( )
:private void exit(String s){
try {
if ( s != null &&
s.equalsIgnoreCase("unsubscribe"))
{
subscriber.close( );
session.unsubscribe("Hot Deals Subscription");
}
connect.close( );
} catch (javax.jms.JMSException jmse){
jmse.printStackTrace( );
}
System.exit(0);
}
close( )
method on the TopicSubscriber class is sufficient.
For durable subscriptions, there is a unsubscribe(String
name) method on the TopicSession object,
which takes the subscription name as its parameter. This informs the
JMS provider that it should no longer store messages on behalf of
this client. It is an error to call the unsubscribe(
) method without first closing the subscription. Hence both
methods need to be called for durable subscriptions.
Wholesaler and Retailer
classes, called QWholesaler and
QRetailer. QWholesaler still
uses pub/sub to broadcast price quotes, while
QRetailer uses a p2p queue to respond with
"buy" orders instead of publishing to a temporary topic.QueueBrowser
interface, and load balancing among multiple recipients of a queue.Queue identifier obtained from a JNDI
namespace, the same way that a pub/sub client uses a
Topic identifier.onMessage( ) callback, or a synchronous
receive( ) method. Both of these methods are
explained in more detail later.QWholesaler
and QRetailer examples that we'll develop
now are functionally equivalent to the Wholesaler
and Retailer examples introduced in Chapter 4. The difference lies in the use of the
point-to-point queue for responses to price quotes. If you wish to
see these classes in action, start your JMS provider and execute the
following commands, each in a separate command window:java chap5.B2B.QWholesaler localhost username password java chap5.B2B.QRetailer localhost username password
QRetailer
class in its entirety. Later, we will examine this
class
in detail:import java.util.StringTokenizer;
import java.util.Properties;
import javax.naming.InitialContext;
import javax.jms.TopicConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public class QRetailer implements javax.jms.MessageListener {
private javax.jms.QueueConnection qConnect = null;
private javax.jms.QueueSession qSession = null;
private javax.jms.QueueSender qSender = null;
private javax.jms.TopicConnect