O'Reilly logo

Java Message Service, 2nd Edition by Richard Monson-Haefel, Mark Richards, David A Chappell

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Point-to-Point Messaging

This chapter focuses on the point-to-point (p2p) messaging model. The point-to-point model is used when you need to send a message to only one message consumer. Even though multiple consumers may be listening on the queue for the same message, only one of those consumer threads will receive the message. This is different from the publish-and-subscribe model described in Chapter 5, where a message is broadcast to (and consumed by) multiple consumers.

In this chapter, we will describe the point-to-point model through the use of a typical messaging scenario involving a borrower and a mortgage lender. In our example, the QBorrower class will submit a loan application via JMS messaging to a QLender class. The QLender class will receive the loan request through a message queue, determine whether to accept or decline the loan based on certain business rules, and send the result (accept or decline) back to the QBorrower class through another message queue. However, before launching into the messaging example, we will first describe the main characteristics and use cases of the p2p messaging model.

Point-to-Point Overview

In the p2p model, the producer is called a sender and the consumer is called a receiver. The most important characteristics of the point-to-point model are as follows:

  • Messages are exchanged through a virtual channel called a queue. A queue is a destination to which producers send messages and a source from which receivers consume messages.

  • Each message is delivered to only one receiver. Multiple receivers may listen on a queue, but each message in the queue may only be consumed by one of the queue’s receivers.

  • Messages are ordered. A queue delivers messages to consumers in the order they were placed in the queue by the message server. As messages are consumed, they are removed from the head of the queue (unless message priority is used).

  • There is no coupling of the producers to the consumers. Receivers and senders can be added dynamically at runtime, allowing the system to grow or shrink in complexity over time. (This is a characteristic of messaging systems in general.)

Point-to-point messaging is based on the concept of sending a message to a named destination. The actual network location of the destination is transparent to the sender, because the p2p client works with a Queue identifier obtained from a JNDI namespace.

As you will see in the next chapter, the pub/sub model is based on a push model, which means that consumers are delivered messages without having to request them. With the p2p messaging model, the p2p receiver can either push or pull messages, depending on whether it uses the asynchronous onMessage() callback or a synchronous receive() method. Both of these methods are explained in more detail later in this chapter.

In the p2p model there is no direct coupling of the producers to the consumers. The destination queue provides a virtual channel that decouples consumers from producers. In the pub/sub model, multiple consumers that subscribe to the same topic each receive their own copy of every message addressed to that topic. In the p2p model, multiple consumers can use the same queue, but each message delivered to the queue can only be received by one of the queue’s consumers. How messages sent to a queue are distributed to the queue’s consumers depends on the policies of the JMS provider. Some JMS providers use load-balancing techniques to distribute messages evenly among consumers, while others will use more arbitrary policies.

Messages intended for a p2p queue can be either persistent or nonpersistent. Persistent messages survive JMS provider failures, while nonpersistent messages do not. Messages may also have a priority and an expiration time. One important difference between point-to-point and publish/subscribe messaging is that p2p messages are always delivered, regardless of the current connection status of the receiver. Once a message is delivered to a queue, it stays there even if no consumer is currently connected.

There are two types of point-to-point messaging: asynchronous fire-and-forget processing and asynchronous request/reply processing. With fire-and-forget processing, the message producer sends a message to a queue and does not expect to receive a response (at least not right away). This type of processing can be used to trigger an event or make a request to a receiver to execute a particular action that does not require a response (or in some cases, an immediate response). For instance, you may want to use asynchronous fire-and-forget processing to send a message to a logging system, make a request to kick off a report, or trigger an event on another process. Asynchronous fire-and-forget processing is illustrated in Figure 4-1.

p2p async fire-and-forget
Figure 4-1. p2p async fire-and-forget

With asynchronous request/reply processing, the message producer sends a message on one queue and then does a blocking wait on a reply queue waiting for the response from the receiver. The request/reply processing provides for a high degree of decoupling between the producer and consumer, allowing the message producer and consumer components to be heterogeneous languages or platforms. Asynchronous request/reply processing is illustrated in Figure 4-2.

p2p async request/reply
Figure 4-2. p2p async request/reply

The specific p2p interfaces for connecting, creating, sending, and receiving are shown in Table 4-1.

Table 4-1. Interfaces for queues

General API

Point-to-point API

ConnectionFactory

QueueConnectionFactory

Destination

Queue

Connection

QueueConnection

Session

QueueSession

MessageConsumer

QueueSender

MessageProducer

QueueReceiver

When to Use Point-to-Point Messaging

The rationale behind the two models (point-to-point and publish-and-subscribe) lies in the origin of the JMS specification. JMS started out as a way of providing a common API for accessing existing messaging systems. At the time of its conception, some messaging vendors had a p2p model, and some had a pub/sub model. Hence JMS needed to provide an API for both models to gain wide industry support.

In most cases, the decision about which model to use depends on the distinct characteristics of each model. With pub/sub, any number of subscribers can be listening on a topic, all receiving copies of the same message. The publisher generally does not care how many subscribers there are or how many of them are actively listening on the topic. For example, consider a publisher that broadcasts stock quotes. If any particular subscriber is not currently connected and misses out on a great quote, the publisher is not concerned. In contrast, with point-to-point messaging, a particular message is likely to be intended for a one-on-one conversation with a specific application at the other end. In this scenario, every message matters.

Point-to-point is used when you want one receiver to process any given message once and only once. This is perhaps the most critical difference between the two models: point-to-point guarantees that only one consumer will process a given message. This is extremely important when messages need to be processed separately but in tandem, balancing the load of message processing across many JMS clients. Another advantage is that the point-to-point model provides a QueueBrowser that allows the JMS client to take a snapshot of the queue to see messages waiting to be consumed. Pub/sub does not include a browsing feature. We’ll talk more about the QueueBrowser later in this chapter.

Another use case for point-to-point messaging is when you need synchronous communication between components, but those components are written in different programming languages or implemented in different technology platforms (e.g., J2EE and .NET). For example, you may have a stock trading client written as a Java Swing client that needs to communicate with a .NET/C# trading server to process the stock trade. In this scenario, point-to-point messaging can be used to provide the interoperability between these heterogeneous platforms.

As you will see later in this chapter, another good reason to use point-to-point messaging is to provide a higher degree of throughput to server-side components through the use of message-based load balancing, particularly for homogeneous components (i.e., Java to Java). Introducing p2p messaging allows you to add a degree of concurrent processing to your architecture without having to deal with threads or Java concurrency issues.

The QBorrower and QLender Application

To illustrate how point-to-point messaging works, we will use a simple decoupled request/reply example where a QBorrower class makes a simple mortgage loan request to a QLender class using point-to-point messaging. The QBorrower class sends the loan request to the QLender class using a LoanRequest queue, and based on certain business rules, the QLender class sends a response back to the QBorrower class using a LoanResponseQ queue indicating whether the loan request was approved or denied. Since the QBorrower is interested in finding out right away whether the loan was approved or not, once the loan request is sent, the QBorrower class will block and wait for a response from the QLender class before proceeding. This simple example models a typical messaging request/reply scenario.

Configuring and Running the Application

Before looking at the code, let’s look at how the application works. As with the Chat application, the QBorrower class and QLender class both include a main() method so they can be run as a standalone Java application. To keep the code vendor-agnostic, both classes need the connection factory name and queue names when starting the application. The QLender class is executed from the command line as follows:

java ch04.p2p.QLender ConnectionFactory RequestQueue

where ConnectionFactory is the name of the queue connection factory defined in your JMS provider and RequestQueue is the name of the queue that the QLender class should be listening on to receive loan requests. As you’ll see later in this chapter, the QBorrower sends the destination for the response message in the JMSReplyTo header property, which is why you do not need to specify it when starting the QLender class.

The QBorrower class can be executed in the same manner in a separate command window:

java ch04.p2p.QBorrower ConnectionFactory RequestQueue ReplyQueue            

where ConnectionFactory is the name of the queue connection factory defined in your JMS provider, RequestQueue is the name of the queue that the QBorrower class should send loan requests to, and ReplyQueue is the name of the queue that the QBorrower class should use to receive the results from the QLender class.

You will also need to define a jndi.properties file in your classpath that contains the JNDI connection information for the JMS provider. The jndi.properties file contains the initial context factory class, provider URL, username, and password needed to connect to the JMS server. Each vendor will have a different context factory class and URL name for connecting to the server. You will need to consult the documentation of your specific JMS provider or Java EE container to obtain these values. We have included the steps for configuring ActiveMQ to run the examples in this chapter in Appendix D.

The QBorrower and QLender classes both require the queue connection factory name and queue names to run. We have chosen to name the connection factory QueueCF, and the loan request and loan response queues LoanRequestQ and LoanResponseQ, respectively. These JNDI resources are typically configured in the JMS provider XML configuration files or configuration screens. You will need to consult your JMS provider documentation on how to configure these resources (please refer to Appendix D for the specific configuration settings for ActiveMQ used to run the examples in this chapter).

You can run the QBorrower and QLender classes by entering the following two commands in separate command windows:

java ch04.p2p.QLender QueueCF LoanRequestQ

java ch04.p2p.QBorrower QueueCF LoanRequestQ LoanResponseQ

When the QBorrower class starts, you will be prompted to enter a salary amount and the requested loan amount. When you press enter, the QBorrower class will send the salary and loan amount to the QLender class via the LoanRequestQ queue, wait for the response on the LoanResponseQ queue, and display whether the loan was approved or denied:

QBorrower Application Started
Press enter to quit application
Enter: Salary, Loan_Amount
e.g. 50000, 120000

> 80000, 200000
Loan request was Accepted!

> 50000, 300000
Loan request was Declined
> 

Here’s what happened. The QBorrower sent the salary ($80,000) and the loan amount ($200,000) to the LoanRequestQ queue, then blocked and waited for a response from the QLender class. The QLender class received the request on the LoanRequestQ queue, applied the simple business logic based on the salary to loan ratio, and sent back the response on the LoanResponseQ queue. The message was then received by the QBorrower class, and the contents of the return message displayed on the console. This interaction is illustrated in Figure 4-3.

Producers and consumers in the loan example
Figure 4-3. Producers and consumers in the loan example

The rest of this chapter examines the source code for the QBorrower and QLender classes, and covers several advanced subjects related to the point-to-point messaging model.

The QBorrower Class

The QBorrower class is responsible for sending a loan request message to a queue containing a salary and loan amount. The class is fairly straightforward: the constructor establishes a connection to the JMS provider, creates a QueueSession, and gets the request and response queues using a JNDI lookup. The main method instantiates the QBorrower class and, upon receiving a salary and loan amount from standard input, invokes the sendLoanRequest method to send the message to the queue. Here is the listing for the QBorrower class in its entirety. We will be examining the JMS aspects of this class in detail after the full listing:

package ch04.p2p;

import java.io.*;
import java.util.StringTokenizer;
import javax.jms.*;
import javax.naming.*;

public class QBorrower {

   private QueueConnection qConnect = null;    
   private QueueSession qSession = null;
   private Queue responseQ = null;
   private Queue requestQ = null;

   public QBorrower(String queuecf, String requestQueue, 
                   String responseQueue) {    
      try {
         // Connect to the provider and get the JMS connection
         Context ctx = new InitialContext();
         QueueConnectionFactory qFactory = (QueueConnectionFactory)
            ctx.lookup(queuecf);
         qConnect = qFactory.createQueueConnection();

         // Create the JMS Session
         qSession = qConnect.createQueueSession(
            false, Session.AUTO_ACKNOWLEDGE);

         // Lookup the request and response queues
         requestQ = (Queue)ctx.lookup(requestQueue);
         responseQ = (Queue)ctx.lookup(responseQueue);

         // Now that setup is complete, start the Connection
         qConnect.start();

      } catch (JMSException jmse) {
         jmse.printStackTrace(); 
         System.exit(1);
      } catch (NamingException jne) {
         jne.printStackTrace(); 
         System.exit(1);
      }
   }

   private void sendLoanRequest(double salary, double loanAmt) {
      try {
         // Create JMS message
         MapMessage msg = qSession.createMapMessage();
         msg.setDouble("Salary", salary);
         msg.setDouble("LoanAmount", loanAmt);
         msg.setJMSReplyTo(responseQ);

         // Create the sender and send the message
         QueueSender qSender = qSession.createSender(requestQ);
         qSender.send(msg);
        
         // Wait to see if the loan request was accepted or declined
         String filter = 
            "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
         QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);
         TextMessage tmsg = (TextMessage)qReceiver.receive(30000);
         if (tmsg == null) {
            System.out.println("QLender not responding");
         } else {
            System.out.println("Loan request was " + tmsg.getText());
         }
        
      } catch (JMSException jmse) {
         jmse.printStackTrace(); 
         System.exit(1);
      }
   }

   private void exit() {
      try {
         qConnect.close();
      } catch (JMSException jmse) {
         jmse.printStackTrace();
      }
      System.exit(0);
   }

   public static void main(String argv[]) {
      String queuecf = null;
      String requestq = null;
      String responseq = null;
      if (argv.length == 3) {
         queuecf = argv[0];
         requestq = argv[1];
         responseq = argv[2];
      } else {
         System.out.println("Invalid arguments. Should be: ");
         System.out.println
            ("java QBorrower factory requestQueue responseQueue");
         System.exit(0);
      }
      
      QBorrower borrower = new QBorrower(queuecf, requestq, responseq);
      
      try {
         // Read all standard input and send it as a message
         BufferedReader stdin = new BufferedReader
            (new InputStreamReader(System.in));
         System.out.println ("QBorrower Application Started");
         System.out.println ("Press enter to quit application");
         System.out.println ("Enter: Salary, Loan_Amount");
         System.out.println("\ne.g. 50000, 120000");

         while (true) {
            System.out.print("> ");
            String loanRequest = stdin.readLine();
            if (loanRequest == null || 
                loanRequest.trim().length() <= 0) {
               borrower.exit();
            }
            
            // Parse the deal description
            StringTokenizer st = new StringTokenizer(loanRequest, ",") ;
            double salary = 
               Double.valueOf(st.nextToken().trim()).doubleValue();
            double loanAmt = 
               Double.valueOf(st.nextToken().trim()).doubleValue();

            borrower.sendLoanRequest(salary, loanAmt);
         }
      } catch (IOException ioe) {
         ioe.printStackTrace();
      }
   }
}

The main method of the QBorrower class accepts three arguments from the command line: the JNDI name of the queue connection factory, the JNDI name of the loan request queue, and finally, the JNDI name of the loan response queue where the response from the QLender class will be received. Once the input parameters have been validated, the QBorrower class is instantiated and a loop is started that reads the salary and loan amount into the class from the console:

String loanRequest = stdin.readLine();

The salary and loan amount input data is then parsed, and finally the sendLoanRequest method invoked. The input loop continues until the user presses enter on the console without entering any data:

if (loanRequest == null || 
    loanRequest.trim().length() <= 0) { 
   borrower.exit(); 
}

Now let’s look at the JMS portion of the code in detail, starting with the constructor and ending with the sendLoanRequest method.

JMS Initialization

In the QBorrower class example, all of the JMS initialization logic is handled in the constructor. The first thing the constructor does is establish a connection to the JMS provider by creating an InitialContext:

Context ctx = new InitialContext();

The connection information needed to connect to the JMS provider is specified in the jndi.properties file located in the classpath (see Appendix D for an example). Once we have a JNDI context, we can get the QueueConnectionFactory using the JNDI connection factory name passed into the constructor arguments. The QueueConnectionFactory is then used to create the QueueConnection using a factory method on the QueueConnectionFactory:

QueueConnectionFactory qFactory = 
   (QueueConnectionFactory) ctx.lookup(queuecf);
qConnect = qFactory.createQueueConnection();

Alternatively, you can pass a username and password into the createQueueConnection method as String arguments to perform basic authentication on the connection. A JMSSecurityException will be thrown if the user fails to authenticate:

qConnect = qFactory.createQueueConnection("system", "manager");

At this point a connection is created to the JMS provider. When the QueueConnection is created, the connection is initially in stopped mode. This means you can send messages to the queue, but no message consumers (including the QBorrower class, which is also a message consumer) may receive messages from this connection until it is started.

The QueueConnection object is used to create a JMS Session object (specifically, a QueueSession), which is the working thread and transactional unit of work in JMS. Unlike JDBC, which requires a connection for each transactional unit of work, JMS uses a single connection and multiple Session objects. Typically, applications will create a single JMS Connection on application startup and maintain a pool of Session objects for use whenever a message needs to be produced or consumed.

The QueueSession object is created through a factory object on the QueueConnection object. The QueueConnection variable is declared outside of the constructor in our example so that the connection can be closed in the exit method of the QBorrower class. It is important to close the connection after it is no longer being used to free up resources. Closing the Connection object also closes any open Session objects associated with the connection. The statement in the constructor to create the QueueSession is as follows:

qSession = 
   qConnect.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

Notice that the createQueueSession method takes two parameters. The first parameter indicates whether the QueueSession is transacted or not. A value of true indicates that the session is transacted, meaning that messages sent to queues during the lifespan of the QueueSession will not be delivered to the receivers until the commit method is invoked on the QueueSession. Likewise, invoking the rollback method on the QueueSession will remove any messages sent during the transacted session. The second parameter indicates the acknowledgment mode. The three possible values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE. The acknowledgment mode is ignored if the session is transacted. Acknowledgment modes are discussed in more detail in Chapter 7.

The next two lines in the constructor perform a JNDI lookup to the JMS provider to obtain the administered destinations. In our case, the JMS destination is cast to a Queue. The argument supplied to each of the lookup methods is a String value containing the JNDI name of the queues we are using in the class:

requestQ = (Queue)ctx.lookup(requestQueue); 
responseQ = (Queue)ctx.lookup(responseQueue);

The final line of code starts the connection, allowing messages to be received on this connection. It is generally a good idea to perform all of your initialization logic before starting the connection:

qConnect.start();

Interestingly enough, you do not need to start the connection if all you are doing is sending messages. However, it is generally advisable to start the connection to avoid future issues if there is a chance the connection may be shared or request/reply processing added to the sender class.

Another useful thing you can obtain from the JMS Connection is the metadata about the connection. Invoking the getMetaData method on the Connection object gives you a ConnectionMetaData object that provides useful information such as the JMS version, JMS provider name, JMS provider version, and the JMSX property name extensions supported by the JMS provider:

import java.util.Enumeration;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MetaData {
  public static void main(String[] args) {    
    try {
      Context ctx = new InitialContext();
      QueueConnectionFactory qFactory = (QueueConnectionFactory)
         ctx.lookup("QueueCF");
      QueueConnection qConnect = qFactory.createQueueConnection();
      ConnectionMetaData metadata = qConnect.getMetaData();
      System.out.println("JMS Version:  " + 
                          metadata.getJMSMajorVersion() + "." + 
                          metadata.getJMSMinorVersion());
      System.out.println("JMS Provider: " + 
                          metadata.getJMSProviderName());
      System.out.println("JMSX Properties Supported: ");
      Enumeration e = metadata.getJMSXPropertyNames();
      while (e.hasMoreElements()) {
        System.out.println("   " + e.nextElement());
      }
    } catch (Exception ex) {
      ex.printStackTrace(); 
      System.exit(1);
    }
  }
}

For example, invoking the previous code using the ActiveMQ open source JMS provider will yield the following results:

JMS Version:  1.1
JMS Provider: ActiveMQ
JMSX Properties Supported: 
   JMSXGroupID
   JMSXGroupSeq
   JMSXDeliveryCount
   JMSXProducerTXID

This information can be logged on application startup, indicating the JMS provider and version numbers. It is particularly useful for products or applications that may use multiple providers.

Sending the message and receiving the response

Once the QBorrower class is initialized, the salary and loan amounts are entered through the command line. At this point, the sendLoanRequest method is invoked from the main method to send the loan request to the queue and wait for the response from the QLender class. At the start of this method, we chose to create a MapMessage but we could have used any of the five JMS message types:

MapMessage msg = qSession.createMapMessage(); 
msg.setDouble("Salary", salary); 
msg.setDouble("LoanAmount", loanAmt);
msg.setJMSReplyTo(responseQ);

Notice that the JMS message is created from the Session object via a factory method matching the message type. Instantiating a new JMS message object using the new keyword will not work; it must be created from the Session object. After creating and loading the message object, we are also setting the JMSReplyTo message header property to the response queue, which further decouples the producer from the consumer. The practice of setting the JMSReplyTo header property in the message producer as opposed to specifying the reply-to queue in the message consumer is a standard practice when using the request/reply model.

After the message is created, we then create the QueueSender object, specifying the queue we wish to send messages to, and then send the message using the send method:

QueueSender qSender = qSession.createSender(requestQ); 
qSender.send(msg);

There are several overridden send methods available in the QueueSender object. The one we are using here accepts only the JMS message object as the single argument. The other overridden methods allow you to specify the Queue, the delivery mode, the message priority, and finally the message expiry. Since we are not specifying any of the other values in the example just shown, the message priority is set to normal (4), the delivery mode is set to persistent messages (DeliveryMode.PERSISTENT), and the message expiry (time to live) is set to 0, indicating that the message will never expire. All of these parameters can be overridden by using one of the other send methods.

Once the message has been sent, the QBorrower class will block and wait for a response from the QLender on whether the loan was approved or denied. The first step in this process is to set up a message selector so that we can correlate the response message with the one we sent. This is necessary because there may be many other loan requests being sent to and from the loan request queues while we are making our loan request. To make sure we get the proper response back, we would use a technique called message correlation. Message correlation is required when using the request/reply model of point-to-point messaging where the queue is being shared by multiple producers and consumers (see Message Correlation for more details):

String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter); 

Notice we specify the filter when creating the QueueReceiver, indicating that we only want to receive messages when the JMSCorrelationID is equal to the original JMSMessageID. Now that we have a QueueReceiver, we can invoke the receive method to do a blocking wait until the response message is received. In this case, we are using the overridden receive method that accepts a timeout value in milliseconds:

TextMessage tmsg = (TextMessage)qReceiver.receive(30000); 
if (tmsg == null) { 
   System.out.println("QLender not responding"); 
} else { 
   System.out.println("Loan request was " + tmsg.getText()); 
}

It is a good idea to always specify a reasonable timeout value on the receive method; otherwise, it will sit there and wait forever (in effect, the application would “hang”). Specifying a reasonable timeout value allows the request/reply sender (in this case the QBorrower) to take action in the event the message has not been delivered in a timely fashion or there is a problem on the receiving side (in this case the QLender). If a timeout condition does occur, the message returned from the receive method will be null. Note that it is the entire message object that is null, not just the message payload. The receive method returns a Message object. If the message type is known, then you can cast the return message as we did in the preceding code example. However, a more failsafe technique would be to check the return Message type using the instanceof keyword as indicated here:

Message rmsg = qReceiver.receive(30000); 
if (rmsg == null) { 
   System.out.println("QLender not responding"); 
} else {
   if (rmsg instanceof TextMessage) { 
   TextMessage tmsg = (TextMessage)rmsg;
      System.out.println("Loan request was " + tmsg.getText());
   } else {
      throw new IllegalStateException("Invalid message type);
   } 
}

Notice that the message received does not need to be of the same message type as the one sent. In the example just shown, we sent the loan request using a MapMessage, yet we received the response from the receiver as a TextMessage. While you could potentially increase the level of decoupling between the sender and receiver by including the message type as part of the application properties of the message, you would still need to know how to interpret the payload in the message. For example, with a StreamMessage or BytesMessage you would still need to know the order of data being sent so that you could in turn read it in the proper order and data type. As you can guess, because of the “contract” of the data between the sender and receiver, there is still a fair amount of coupling in the point-to-point model, at least from the payload perspective.

The QLender Class

The role of the QLender class is to listen for loan requests on the loan request queue, determine if the salary meets the necessary business requirements, and finally send the results back to the borrower. Notice that the QLender class is structured a bit differently from the QBorrower class. In our example, the QLender class is referred to as a message listener and, as such, implements the javax.jms.MessageListener interface and overrides the onMessage() method. Here is the complete listing for the QLender class:

package ch04.p2p;

import java.io.*;
import javax.jms.*;
import javax.naming.*;

public class QLender implements MessageListener {

   private QueueConnection qConnect = null;    
   private QueueSession qSession = null;
   private Queue requestQ = null;

   public QLender(String queuecf, String requestQueue) {    
      try {
         // Connect to the provider and get the JMS connection
         Context ctx = new InitialContext();
         QueueConnectionFactory qFactory = (QueueConnectionFactory)
            ctx.lookup(queuecf);
         qConnect = qFactory.createQueueConnection();

         // Create the JMS Session
         qSession = qConnect.createQueueSession(
            false, Session.AUTO_ACKNOWLEDGE);

         // Lookup the request queue
         requestQ = (Queue)ctx.lookup(requestQueue);

         // Now that setup is complete, start the Connection
         qConnect.start();

         // Create the message listener
         QueueReceiver qReceiver = qSession.createReceiver(requestQ);
         qReceiver.setMessageListener(this);

         System.out.println("Waiting for loan requests...");

      } catch (JMSException jmse) {
         jmse.printStackTrace(); 
         System.exit(1);
      } catch (NamingException jne) {
         jne.printStackTrace(); 
         System.exit(1);
      }
   }

   public void onMessage(Message message) {
      try {
         boolean accepted = false;

         // Get the data from the message
         MapMessage msg = (MapMessage)message;
         double salary = msg.getDouble("Salary");
         double loanAmt = msg.getDouble("LoanAmount");
         
         // Determine whether to accept or decline the loan
         if (loanAmt < 200000) {
            accepted = (salary / loanAmt) > .25; 
         } else {
            accepted = (salary / loanAmt) > .33;
         }
         System.out.println("" +
            "Percent = " + (salary / loanAmt) + ", loan is " 
            + (accepted ? "Accepted!" : "Declined"));
         
         // Send the results back to the borrower
         TextMessage tmsg = qSession.createTextMessage();
         tmsg.setText(accepted ? "Accepted!" : "Declined");
         tmsg.setJMSCorrelationID(message.getJMSMessageID());
         
         // Create the sender and send the message
         QueueSender qSender = 
            qSession.createSender((Queue)message.getJMSReplyTo());
         qSender.send(tmsg);

         System.out.println("\nWaiting for loan requests...");

      } catch (JMSException jmse) {
         jmse.printStackTrace(); 
         System.exit(1);
      } catch (Exception jmse) {
         jmse.printStackTrace(); 
         System.exit(1);
      }
   }
    
   private void exit() {
      try {
         qConnect.close();
      } catch (JMSException jmse) {
         jmse.printStackTrace();
      }
      System.exit(0);
   }

   public static void main(String argv[]) {
      String queuecf = null;
      String requestq = null;
      if (argv.length == 2) {
         queuecf = argv[0];
         requestq = argv[1];
      } else {
         System.out.println("Invalid arguments. Should be: ");
         System.out.println
            ("java QLender factory request_queue");
         System.exit(0);
      }
      
      QLender lender = new QLender(queuecf, requestq);
      
      try {
         // Run until enter is pressed
         BufferedReader stdin = new BufferedReader
            (new InputStreamReader(System.in));
         System.out.println ("QLender application started");
         System.out.println ("Press enter to quit application");
         stdin.readLine();
         lender.exit();
      } catch (IOException ioe) {
         ioe.printStackTrace();
      }
   }
}

The QLender class is what is referred to as an asynchronous message listener, meaning that unlike the prior QBorrower class it will not block when waiting for messages. This is evident from the fact that the QLender class implements the MessageListener interface and overrides the onMessage method.

The main method of the QLender class validates the command-line arguments and invokes the constructor by instantiating a new QLender class. It then keeps the primary thread alive until the enter key is pressed on the command line.

The constructor in the QLender class works much in the same way as the QBorrower class. The first part of the constructor establishes a connection to the provider, does a JNDI lookup to get the queue, creates a QueueSession, and starts the connection:

...
// Connect to the provider and get the JMS connection
Context ctx = new InitialContext();
QueueConnectionFactory qFactory = (QueueConnectionFactory)
   ctx.lookup(queuecf);
qConnect = qFactory.createQueueConnection();

// Create the JMS Session
qSession = qConnect.createQueueSession(
   false, Session.AUTO_ACKNOWLEDGE);

// Lookup the request queue
requestQ = (Queue)ctx.lookup(requestQueue);

// Now that setup is complete, start the Connection
qConnect.start();
...

Once the connection is started, the QLender class can begin to receive messages. However, before it can receive messages, it must be registered by the QueueReceiver as a message listener:

QueueReceiver qReceiver = qSession.createReceiver(requestQ); 
qReceiver.setMessageListener(this);

At this point, a separate listener thread is started. That thread will wait until a message is received, and upon receipt of a message, will invoke the onMessage method of the listener class. In this case, we set the message listener to the QLender class using the this keyword in the setMessageListener method. We could have easily delegated the messaging work to another class that implemented the MessageListener interface:

qReceiver.setMessageListener(someOtherClass);

When a message is received on the queue specified in the createReceiver method, the listener thread will asynchronously invoke the onMessage method of the listener class (in our case, the QLender class is also the listener class). The onMessage method first casts the message to a MapMessage (the message type we are expecting to receive from the borrower). It then extracts the salary and loan amount requested from the message payload, checks the salary to loan amount ratio, then determines whether to accept or decline the loan request:

...
public void onMessage(Message message) {
   try {
      boolean accepted = false;

      // Get the data from the message
      MapMessage msg = (MapMessage)message;
      double salary = msg.getDouble("Salary");
      double loanAmt = msg.getDouble("LoanAmount");
         
      // Determine whether to accept or decline the loan
      if (loanAmt < 200000) {
         accepted = (salary / loanAmt) > .25; 
      } else {
         accepted = (salary / loanAmt) > .33;
      }
      System.out.println("" +
         "Percent = " + (salary / loanAmt) + ", loan is " 
         + (accepted ? "Accepted!" : "Declined"));
      ...         

Again, to make this more failsafe, it would be better to check the JMS message type using the instanceof keyword in the event another message type was being sent to that queue:

if (message instanceof MapMessage) {
   //process request
} else {
   throw new IllegalArgumentException("unsupported message type");
}

Once the loan request has been analyzed and the results determined, the QLender class needs to send the response back to the borrower. It does this by first creating a JMS message to send. The response message does not need to be the same JMS message type as the loan request message that was received by the QLender. To illustrate this point the QLender returns a TextMessage back to the QBorrower:

TextMessage tmsg = qSession.createTextMessage(); 
tmsg.setText(accepted ? "Accepted!" : "Declined"); 

The next statement sets the JMSCorrelationID, which is the JMS header property that is used by the QBorrower class to filter incoming response messages:

tmsg.setJMSCorrelationID(message.getJMSMessageID()); 

Message correlation is discussed in more detail in the next section of this chapter.

Once the message is created, the onMessage method then sends the message to the response queue specified by the JMSReplyTo message header property. As you may remember, in the QBorrower class we set the JMSReplyTo header property when sending the original loan request. The QLender class can now use that property as the destination to send the response message to:

QueueSender qSender = 
   qSession.createSender((Queue)message.getJMSReplyTo()); 
qSender.send(tmsg); 

Message Correlation

In the previous code example, the borrower sent a loan request on a request queue and waited for a reply from the lender on a response queue. Many borrowers may be making requests at the same time, meaning that the lender application is sending many messages to the response queue. Since the response queue may contain many messages, how can you be sure that the response you received from the lender was meant for you and not another borrower?

In general, whenever using the request/reply model, you must make sure the response you are receiving is associated with the original message you sent. Message correlation is the technique used to ensure that you receive the right message. The most popular method for correlating messages is leveraging the JMSCorrelationID message header property in conjunction with the JMSMessageID header property. The JMSCorrelationID property contains a unique String value that is known by both the sender and receiver. The JMSMessageID is typically used, since it is unique and is available to the sender and receiver.

When the message consumer (e.g., QLender) is ready to send the reply message, it sets the JMSCorrelationID message property to the message ID from the original message:

public class QLender implements MessageListener {

   ...
   public void onMessage(Message message) {
      try {
         ...         
         // Send the results back to the borrower
         TextMessage tmsg = qSession.createTextMessage();
         tmsg.setText(accepted ? "Accepted!" : "Declined");
         tmsg.setJMSCorrelationID(message.getJMSMessageID());
         
         // Create the sender and send the message
         QueueSender qSender = 
            qSession.createSender((Queue)message.getJMSReplyTo());
         qSender.send(tmsg);

         System.out.println("\nWaiting for loan requests...");
         ...
      }
   }
   ...            

The original message producer (e.g., QBorrower) expecting the response about whether the loan was approved creates a message selector based on the JMSCorrelationID message property:

public class QBorrower {

   ...
   private void sendLoanRequest(double salary, double loanAmt) {
      try {
         ...

         // Wait to see if the loan request was accepted or declined
         String filter = 
            "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
         QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);
         TextMessage tmsg = (TextMessage)qReceiver.receive(30000);
         ...
      }
   }
   ...            

Although the JMSMessageID is typically used to identify the unique message, it certainly is not a requirement. You can use anything that would correlate the request and reply messages. For example, as an alternative you could use the Java UUID class to generate a unique ID. In the following code example, the QBorrower class generates a unique ID and sets an application message property called “UUID” to the generated value:

public class QBorrower {

   ...
   private void sendLoanRequest(double salary, double loanAmt) {
      try {
         // Create JMS message
         MapMessage msg = qSession.createMapMessage();
         msg.setDouble("Salary", salary);
         msg.setDouble("LoanAmount", loanAmt);
         msg.setJMSReplyTo(responseQ);
         UUID uuid = UUID.randomUUID();
         String uniqueId = uuid.toString();
         msg.setStringProperty("UUID", uniqueId);

         // Create the sender and send the message
         QueueSender qSender = qSession.createSender(requestQ);
         qSender.send(msg);
        
         // Wait to see if the loan request was accepted or declined
         String filter = 
            "JMSCorrelationID = '" + uniqueId + "'";
         QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);
         TextMessage tmsg = (TextMessage)qReceiver.receive(30000);
         ...
      }
   }
   ...            

The QLender application must now get the UUID property from the original message and set the JMSCorrelationID message property to this value:

public class QLender implements MessageListener {

   ...
   public void onMessage(Message message) {
      try {
         ...         
         // Send the results back to the borrower
         TextMessage tmsg = qSession.createTextMessage();
         tmsg.setText(accepted ? "Accepted!" : "Declined");
         tmsg.setJMSCorrelationID(message.getStringProperty("UUID"));
         
         // Create the sender and send the message
         QueueSender qSender = 
            qSession.createSender((Queue)message.getJMSReplyTo());
         qSender.send(tmsg);

         System.out.println("\nWaiting for loan requests...");
         ...
      }
   }
   ...            

Although it is commonly used, you are not required to use the JMSCorrelationID message header property to correlate messages. As a matter of fact, you could set the correlation property to any application property in the message. While this is certainly possible, you should leverage the header properties if they exist for full compatibility with messaging servers, third-party brokers, and third-party message bridges.

Dynamic Versus Administered Queues

Dynamic queues are queues that are created through the application source code using a vendor-specific API. Administered queues are queues that are defined in the JMS provider configuration files or administration tools.

The setup and configuration of dynamic queues tends to be vendor-specific. A queue may be used exclusively by one consumer or shared by multiple consumers. It may have a size limit (limiting the number of unconsumed messages held in the queue) with options for in-memory storage versus overflow to disk. In addition, a queue may be configured with a vendor-specific addressing syntax or special routing capabilities.

JMS does not attempt to define a set of APIs for all the possible options on a queue. It should be possible to set these options administratively, using the vendor-specific administration capabilities. Most vendors supply a command-line administration tool, a graphical administration tool, or an API for administering queues at runtime. Some vendors supply all three. Using vendor-specific administration APIs to create and configure a queue may be convenient at times. However, it is not very portable and may require that the application have administrator privileges.

JMS provides a QueueSession.createQueue(String queueName) method, but this is not intended to define a new queue in the messaging system. It is intended to return a Queue object that represents an existing queue. There is also a JMS-defined method for creating a temporary queue that can only be consumed by the JMS client that created it using the QueueSession.createTemporaryQueue() method.

Creating dynamic queues is useful if you have a large number of queues that may increase over time. For example, consider the scenario where a book publisher has relationships with a large number of bookstores. The book publisher regularly sends new book information and order status to the bookstores. Let’s assume that there are 1,000 bookstores related to the book publisher. That equates to 1,000 queues—somewhat excessive to administer. The book publisher can dynamically create the bookstore queues based on a numbering scheme, therefore quickly defining the queues necessary for this scenario (e.g., BookstoreQ1, BookstoreQ2, etc.).

Load Balancing Using Multiple Receivers

A queue may have multiple receivers attached to it for the purpose of distributing the workload of message processing. The JMS specification states that this capability must be implemented by a JMS provider, although it does not define the rules for how the messages are distributed among consumers. A sender could use this feature to distribute messages to multiple instances of an application, each of which would provide its own receiver.

When multiple receivers are attached to a queue, each message in the queue is delivered to one receiver. The absolute order of messages cannot be guaranteed, since one receiver may process messages faster than another. From the receiver’s perspective, the messages it consumes should be in relative order; messages delivered to the queue earlier are consumed first. However, if a message needs to be redelivered due to an acknowledgment failure, it is possible that it could be delivered to another receiver. The other receiver may have already processed more recently delivered messages, which would place the redelivered message out of the original order.

If you would like to see multiple recipients in action, try starting two instances of the QLender class and one instance of the QBorrower class, each in a separate command window:

java ch04.p2p.QLender QueueCF LoanRequestQ
java ch04.p2p.QLender QueueCF LoanRequestQ
java ch04.p2p.QBorrower QueueCF LoanRequestQ LoanResponseQ         

Now, when entering a salary and loan amount in the command window, you will notice that the message is delivered to one or the other QLender application, but not both. The exact load balancing scheme will vary between JMS providers. Some may use a round-robin load balancing scheme, whereas others may use a first-available balancing scheme. You will need to consult your JMS provider documentation to determine the specific load balancing algorithm used.

Examining a Queue

A QueueBrowser is a specialized object that allows you to peek ahead at pending messages on a Queue without actually consuming them. This feature is unique to point-to-point messaging. Queue browsing can be useful for monitoring the contents of a queue from an administration tool or for browsing through multiple messages to locate a message that is more important than the one at the head of the queue. It is also useful for other monitoring tasks, such as determining the current queue depth.

Messages obtained from a QueueBrowser are copies of messages contained in the queue and are not considered to be consumed—they are merely for browsing. It is also important to note that the QueueBrowser is not guaranteed to have a definitive list of messages in the queue. The QueueBrowser contains only a snapshot, or a copy of, the queue as it appears at the time the QueueBrowser is created. The contents of the queue may change between the time the browser is created and the time you examine its contents. However, no matter how small that window of time is, new messages may arrive and other messages may be consumed by other JMS clients.

A QueueBrowser is created from the Session object using the createBrowser() method. This method takes as an argument the queue from which you would like to view the messages. It is during the createBrowser() method invocation that the snapshot is taken from the queue. You can then get a list of the messages by using the method getEnumeration() from the QueueBrowser:

...
QueueBrowser browser = session.createBrowser(queue);
Enumeration e = browser.getEnumeration();
while (e.hasMoreElements()) {
   //display messages
}
...         

The full LoanRequestQueueBrowser class is listed here:

import java.util.Enumeration;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class LoanRequestQueueBrowser {

  public static void main(String[] args) {
    try {
      //establish connection
      Context context = new InitialContext();
      QueueConnectionFactory factory = (QueueConnectionFactory) 
         context.lookup("QueueCF");
      QueueConnection connection = factory.createQueueConnection();
      connection.start();

      //establish session 
      Queue queue = (Queue) context.lookup("LoanRequestQ");
      QueueSession session = connection.createQueueSession
         (false, Session.AUTO_ACKNOWLEDGE);
      QueueBrowser browser = session.createBrowser(queue);

      Enumeration e = browser.getEnumeration();
      while (e.hasMoreElements()) {
        TextMessage msg = (TextMessage)e.nextElement();
        System.out.println("Browsing: " + msg.getText());
      }
        
      browser.close();
      connection.close();
      System.exit(0);
        
    } catch (Exception exception) {
       exception.printStackTrace();
    }
  }
}

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required