Now that you have seen the basic types of consumers, you should understand where they are really used. Under normal circumstances, you do not have to use them, as you will be configuring them under an XML namespace. The framework provides these components under various namespaces, so you can add the respective elements declaratively straight out of the box. This reduces the amount of coding and encourages a declarative programming model.
But for now, keep in mind that Transformers, Filters, Routers, etc.,
are all supported by the spring XML
namespace. This
means you can declaratively create an instance of any of these flow
components. For example, a transformer
element is used to create a Transformer
component which
will fetch messages from the input
channel to kick off
the transformation.
Understanding the link between the classes and namespace elements will give us much more command of the framework. However, you are strongly encouraged to use namespaces to configure these endpoints, instead of using the API classes.
If you recall from the earlier chapter on channels, there are two types of channels: one is pollable while the other is subscribable. Based on the same definition, we have two types of endpoint consumers: Polling Consumer and an Event-Driven Consumer.
A PollingConsumer
polls the channel for
messages based on a polling configuration. It is driven by the client
program. The EventDrivenConsumer
, on the other hand,
subscribes to a subscribable channel so it will be notified
asynchronously when a message arrives at the channel.
One of the characteristics of polling consumers is to poll for
messages in a timely fashion. Framework provides the PollingConsumer
class that does this job.
The class is instantiated by using a constructor that takes the
reference to a pollable channel and a message handler. The message
handler is a simple interface to handle the messages published onto
the channel.
The following snippet shows how to create a PollingConsumer
object. You can use the
framework’s class as is, or if preferred, create a wrapper around
it.
private MessageHandler positionsHandler = null; private QueueChannel positionsChannel = null; ... // Instantiating a PollingConsumer PollingConsumer consumer = new PollingConsumer(positionsChannel, positionsHandler);
Let’s take an example of a custom consumer.
The PositionsPollingConsumer
is a message consumer that grabs the Position
messages from the positions-channel
. We know that we can use
the receive
method on the channel itself to receive
the message (shown in the following snippet), which has been described
in earlier chapters.
Message m = channel.receive();//or other receive methods System.out.println("Payload: " + m.getPayload());
Instead, what we do here is create an endpoint that polls from these channels. Channels merely act as buckets for messages, which is actually their main function.
As mentioned above, the PollingConsumer
requires a channel and a
handler to be instantiated. The handler is created by implementing the
framework’s MessageHandler
:
public class PositionsHandler implements MessageHandler { public void handleMessage(Message<?> message) throws MessagingException { System.out.println("Handling a message: "+ message.getPayload().toString()); } }
The PollingPositionsConsumer
,
shown below, creates an instance of the framework’s PollingConsumer
to fetch the messages
programmatically, rather than declaratively.
public class PositionsPollingConsumer { private PollingConsumer consumer = null; private PositionsHandler positionsHandler = null; public PositionsPollingConsumer(ApplicationContext ctx, QueueChannel positionsChannel) { //instance of handler positionsHandler = new PositionsHandler(); // now create the framework's consumer consumer = new PollingConsumer(positionsChannel, positionsHandler); //You must set the context, or else an error will be thrown consumer.setBeanFactory(ctx); } public void startConsumer() { consumer.start(); } }
Now that our consumer is coded, it’s just a matter of calling
the startConsumer()
method. This
method calls the start
method on
PollingConsumer
internally:
PositionsPollingConsumer ppc = new PositionsPollingConsumer(ctx, positionsChannel); ppc.startConsumer();
The PollingConsumer
coded
above does not poll—it just picks up the messages. However, we wish
to add the function of polling to the component so it will poll in a
timely fashion. This is achieved using the framework’s
Triggers.
There are two types of triggers provided by the framework:
PeriodicTrigger
, which polls at a
fixed interval, and CronTrigger
,
which polls based on Unix’s cron
expressions. The
CronTrigger
is more flexible when
the task scheduling has complex requirements.
Once an appropriate Trigger is chosen, it needs to be
instantiated and wired onto the consumer. For example, if we want to
poll every two seconds, create the Periodic
Trigger
as
shown below. You can also set other properties, such as
initialDelay
and fixed
Rate
, to control the polling much
further.
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000); // let the polling kick in after half a second periodicTrigger.setInitialDelay(500); // fixed rate polling? periodicTrigger.setFixedRate(false);
The initialDelay
is set to
start the polling only after the expiry of that time period. The
fixed
Rate
is a Boolean variable that will
indicate if the polling should be done on a regular time interval.
If, for whatever reason, the current message processing has taken
more than the polling period (two seconds in the above example), the
poller will poll for another message if this flag is set to
true
.
The CronTrigger
enables the
consumer to do more sophisticated polling. For example, you have a
job that wakes up at midnight on weekdays to do house clean-up. It
works on setting up cron
expressions which can
cater to such complex scenarios.
Cron
expressions are expressed as
space-separated fields. There are six such fields, each field
representing an aspect of time. Declare an expression that
represents your time requirements and pass it on to the
trigger
object as shown below:
// start polling all weekdays at exactly one minute past midnight String cronExpression="* 01 00 * * MON-FRI"; cronTrigger = new CronTrigger(cronExpression);
The above cron
expression allows the poller
to wake one minute past midnight on all weekdays to check. The
poller checks for any messages when it wakes, processes them, and
goes back to sleep mode.
The second type of endpoint consumers that subscribe rather than
poll to the message stream is categorized as Event-Driven Consumers.
The framework defines this type of consumer as an EventDrivenConsumer
class. Their fundamental
characteristic is that they wait for someone (the framework’s
responsibility) to deliver the message as soon as it appears on the
channel. From our earlier discussions on event messages, you know that
SubscribableChannel
supports this
type of consumer.
The instantiation of this consumer is exactly like that of
PollingConsumer
—supply a channel to
which it should subscribe to get a message stream and a handler to
handle the messages.
private EventDrivenConsumer consumer = null; private PositionsHandler positionsHandler = null; private ApplicationContext ctx = null; public PositionsEventDrivenConsumer(ApplicationContext ctx, PublishSubscribeChannel positionsChannel) { positionsHandler = new PositionsHandler(); // instantiate the event driven consumer consumer = new EventDrivenConsumer(positionsChannel, positionsHandler); consumer.setBeanFactory(ctx);
The PositionsHandler
is a
simple class that implements MessageHandler
with providing the handleMessage
method. Starting the consumer
is as simple as calling the start
method on the EventDrivenConsumer
:
public void startConsumer() { // EventDrivenConsumer exposes start method consumer.start(); }
Unless you have a compelling requirement, use
EventDrivenConsumer
.
Get Just Spring Integration now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.