O'Reilly logo

Just Spring Integration by Madhusudhan Konda

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

For the Curious: Endpoint API

Now that you have seen the basic types of consumers, you should understand where they are really used. Under normal circumstances, you do not have to use them, as you will be configuring them under an XML namespace. The framework provides these components under various namespaces, so you can add the respective elements declaratively straight out of the box. This reduces the amount of coding and encourages a declarative programming model.

But for now, keep in mind that Transformers, Filters, Routers, etc., are all supported by the spring XML namespace. This means you can declaratively create an instance of any of these flow components. For example, a transformer element is used to create a Transformer component which will fetch messages from the input channel to kick off the transformation.

Understanding the link between the classes and namespace elements will give us much more command of the framework. However, you are strongly encouraged to use namespaces to configure these endpoints, instead of using the API classes.

Consumers

If you recall from the earlier chapter on channels, there are two types of channels: one is pollable while the other is subscribable. Based on the same definition, we have two types of endpoint consumers: Polling Consumer and an Event-Driven Consumer.

A PollingConsumer polls the channel for messages based on a polling configuration. It is driven by the client program. The EventDrivenConsumer, on the other hand, subscribes to a subscribable channel so it will be notified asynchronously when a message arrives at the channel.

Polling Consumers

One of the characteristics of polling consumers is to poll for messages in a timely fashion. Framework provides the PollingConsumer class that does this job. The class is instantiated by using a constructor that takes the reference to a pollable channel and a message handler. The message handler is a simple interface to handle the messages published onto the channel.

The following snippet shows how to create a PollingConsumer object. You can use the framework’s class as is, or if preferred, create a wrapper around it.

private MessageHandler positionsHandler = null;
private QueueChannel positionsChannel = null;
...

// Instantiating a PollingConsumer
PollingConsumer consumer = new PollingConsumer(positionsChannel, positionsHandler);

Let’s take an example of a custom consumer.

The PositionsPollingConsumer is a message consumer that grabs the Position messages from the positions-channel. We know that we can use the receive method on the channel itself to receive the message (shown in the following snippet), which has been described in earlier chapters.

Message m = channel.receive();//or other receive methods
System.out.println("Payload: " + m.getPayload());

Instead, what we do here is create an endpoint that polls from these channels. Channels merely act as buckets for messages, which is actually their main function.

As mentioned above, the PollingConsumer requires a channel and a handler to be instantiated. The handler is created by implementing the framework’s MessageHandler:

public class PositionsHandler implements MessageHandler {
  public void handleMessage(Message<?> message) throws MessagingException {
    System.out.println("Handling a message: "+ message.getPayload().toString());
  }
}

The PollingPositionsConsumer, shown below, creates an instance of the framework’s PollingConsumer to fetch the messages programmatically, rather than declaratively.

public class PositionsPollingConsumer {
  private PollingConsumer consumer = null;
  private PositionsHandler positionsHandler = null;
  
  public PositionsPollingConsumer(ApplicationContext ctx, QueueChannel positionsChannel) {
    //instance of handler 
    positionsHandler = new PositionsHandler();
    // now create the framework's consumer
    consumer = new PollingConsumer(positionsChannel, positionsHandler);
    //You must set the context, or else an error will be thrown
    consumer.setBeanFactory(ctx);
  }
  public void startConsumer() {
    consumer.start();
  }
}

Now that our consumer is coded, it’s just a matter of calling the startConsumer() method. This method calls the start method on PollingConsumer internally:

PositionsPollingConsumer ppc = new PositionsPollingConsumer(ctx, positionsChannel);

ppc.startConsumer();

Polling Using Triggers

The PollingConsumer coded above does not poll—it just picks up the messages. However, we wish to add the function of polling to the component so it will poll in a timely fashion. This is achieved using the framework’s Triggers.

There are two types of triggers provided by the framework: PeriodicTrigger, which polls at a fixed interval, and CronTrigger, which polls based on Unix’s cron expressions. The CronTrigger is more flexible when the task scheduling has complex requirements.

Once an appropriate Trigger is chosen, it needs to be instantiated and wired onto the consumer. For example, if we want to poll every two seconds, create the PeriodicTrigger as shown below. You can also set other properties, such as initialDelay and fixedRate, to control the polling much further.

PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
// let the polling kick in after half a second
periodicTrigger.setInitialDelay(500);
// fixed rate polling?
periodicTrigger.setFixedRate(false);

The initialDelay is set to start the polling only after the expiry of that time period. The fixedRate is a Boolean variable that will indicate if the polling should be done on a regular time interval. If, for whatever reason, the current message processing has taken more than the polling period (two seconds in the above example), the poller will poll for another message if this flag is set to true.

The CronTrigger enables the consumer to do more sophisticated polling. For example, you have a job that wakes up at midnight on weekdays to do house clean-up. It works on setting up cron expressions which can cater to such complex scenarios.

Cron expressions are expressed as space-separated fields. There are six such fields, each field representing an aspect of time. Declare an expression that represents your time requirements and pass it on to the trigger object as shown below:

// start polling all weekdays at exactly one minute past midnight
String cronExpression="* 01 00 * * MON-FRI";

cronTrigger = new CronTrigger(cronExpression);

The above cron expression allows the poller to wake one minute past midnight on all weekdays to check. The poller checks for any messages when it wakes, processes them, and goes back to sleep mode.

Event-Driven Consumers

The second type of endpoint consumers that subscribe rather than poll to the message stream is categorized as Event-Driven Consumers. The framework defines this type of consumer as an EventDrivenConsumer class. Their fundamental characteristic is that they wait for someone (the framework’s responsibility) to deliver the message as soon as it appears on the channel. From our earlier discussions on event messages, you know that SubscribableChannel supports this type of consumer.

The instantiation of this consumer is exactly like that of PollingConsumer—supply a channel to which it should subscribe to get a message stream and a handler to handle the messages.

private EventDrivenConsumer consumer = null;
private PositionsHandler positionsHandler = null;
private ApplicationContext ctx = null;

public PositionsEventDrivenConsumer(ApplicationContext ctx,
  PublishSubscribeChannel positionsChannel) {
  
  positionsHandler = new PositionsHandler();

  // instantiate the event driven consumer
  consumer = new EventDrivenConsumer(positionsChannel, positionsHandler);
  consumer.setBeanFactory(ctx);

The PositionsHandler is a simple class that implements MessageHandler with providing the handleMessage method. Starting the consumer is as simple as calling the start method on the EventDrivenConsumer:

public void startConsumer() {
  // EventDrivenConsumer exposes start method
  consumer.start();
}

Unless you have a compelling requirement, use EventDrivenConsumer.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required