We have touched upon one of the common endpoints in an earlier
chapter—Service Activators. In this section, we will discuss this in
detail, and other endpoints, too. First, make sure that you have the
integration
namespace declared in your
XML file for these endpoints:
... xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd" ...
The Service Activator is a generic endpoint which invokes a method on a bean whenever a message arrives on the channel. If the method has a return value, then the value will be sent to an output channel if the channel is configured.
Configuring the activator using the namespace is relatively
straightforward. Use the service-activator
element, setting input-channel
and a ref
to the bean:
<int:service-activator input-channel="positions-channel" ref="newTradeActivator" method="processNewPosition"> </int:service-activator> <bean id="newTradeActivator" class="com.madhusudhan.jsi.endpoints.common.NewTradeActivator" />
Any message arriving at positions-channel
will be passed on to a
NewTradeActivator
(which the
attribute ref
points to) and the
processNewPosition
method is invoked
which is declared using the method
attribute. If the bean class has only one method, then you do not have
to declare the method attribute—the framework resolves it as the service
method and invokes it appropriately.
The NewTradeActivator
is a
simple class that has a single method which expects a Position
object. This class is basically the
entry point to acting as a service.
public class NewTradeActivator { Position position = .. public void processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... } }
The method can return a non-null
value which is
wrapped in a Message
and sent to an
output-channel
. For example, if you
wish to send a reply to another channel after processing the Position
, you can do this by simply returning
the position
as the method’s return value:
// Return value will be wrapped in a Message // and sent to an output channel public Position processNewPosition(Position t) { System.out.println("Method invoked to process the new Position"+t); // process the position.. // ... return position; }
You may omit declaring the optional output-channel
attribute. If you do, and if
your method has a return value, then the framework will use the message
header property called replyChannel
to send the reply. An exception will be thrown if no replyChannel
header property is found.
The service method can have either Message
or a Java object as an argument. In
the latter case, the payload from the incoming message is extracted and
passed on to the message. As the incoming message is a Java Object, this
mode will not tie our implementation to Spring API, making it a
preferred option. In the above example, a Position
is wrapped up in a Message
and sent to the channel.
A MessageBridge
is a simple
endpoint that couples different messaging modes or adapters. An example
of a common use of the bridge is to tie up a
point-to-point
(P2P) mode channel to a
Publish/Subscribe
(Pub/Sub) mode. In a
P2P
mode, a PollableChannel
is used by the endpoint,
whereas a PublishSubscribeChannel
is
used in Pub/Sub
mode.
The MessageBridge
is declared
using the bridge
element in the
integration namespace:
<int:publish-subscribe-channel id="trades-in-channel" /> <int:channel id="trades-out-channel"> <int:queue capacity="10" /> </int:channel> <!-- Bridges pub/sub channel to a p2p channel --> <int:bridge input-channel="trades-in-channel" output-channel="trades-out-channel" />
In the above snippet, the bridge picks up a message from the input
channel and publishes onto the output channel. The input channel is
PublishSubscribeChannel
, while the
output channel is QueueChannel
.
To complete our example, a service activator is hooked onto the
output channel. As soon as the message arrives at the output channel
(via the bridge endpoint), the Position
Receiver
bean is invoked for
action:
<int:service-activator input-channel="trades-out-channel" ref="positionReceiver"/> <bean id="positionReceiver" class="com.madhusudhan.jsi.endpoints.PositionReceiver"/>
A Message Enricher
component enriches the
incoming message with additional information and sends the updated
object to the downstream consumers. For example, a Trade normally
consists of a piece of coded information, such as a security ID or a
customer account number. The reason for this is not only to keep the
Trade
object slim and sleek, but also to protect the
confidential information from other systems. This data is attached while
the Trade passes through different stages as and when required.
The Framework provides two types for enriching messages:
Header Enricher
and Payload
Enricher
.
You can add additional header attributes to the message using
the Header Enricher
component. Let’s say the
incoming Trade
message needs to have a couple of
header properties: SRC_SYSTEM
and
TARGET_SYSTEM
. As these properties were not in the
original message, the message needs to be enriched. We use the
header-enricher
tag to do this:
<int:header-enricher
input-channel="in-channel"
output-channel="out-channel">
<int:header name="SRC_SYSTEM" value="BBG" />
<int:header name="TARGET_SYSTEM" value="LOCAL" />
</int:header-enricher>
As you can see, we added two properties to the outgoing message. So, if you print out the headers of this message, the newly added properties should appear on the console:
Headers: {timestamp=1328187611172, SRC_SYSTEM=BBG, TARGET_SYSTEM=LOCAL, id=...}
You can set a number of predefined properties such as priority
, reply-channel
, error-channel
, etc.
This is the enhanced configuration for header-enricher
:
<int:header-enricher id="maxi-enricher" input-channel="in-channel" output-channel="out-channel"> <int:priority value="10"/> <int:error-channel ref="myErrorChannel"/> <int:correlation-id value="APP_OWN_ID"/> <int:reply-channel value="reply-channel"/> <int:header name="SRC_SYSTEM" value="BBG" /> </int:header-enricher> <int:publish-subscribe-channel id="myErrorChannel" />
Note that the ref
tag looks
for a named bean while the value
tag takes a literal value only.
Framework also supports setting header properties using
payload
by allowing the header-enricher
’s header property to refer
to a bean:
<int:header-enricher id="pojo-enricher"
input-channel="in-channel"
output-channel="out-channel">
<int:header name="ID"
ref="tradeEnricher" method="enrichHeader"/>
</int:header-enricher>
The ID
is set by extracting
data from the payload with the help of the TradeEnricher
bean. The following snippet
shows this bean, which has a simple functionality in returning the
ID
attribute by reading the Trade’s
ID and adding SRC
to it at the
end.
public class TradeEnricher { public String enrichHeader(Message m) { Trade t = (Trade)m.getPayload(); return t.getId()+"SRC"; } }
If the requirement is to add or enrich the payload with
additional information, use the PayloadEnricher
component. The
enricher
tag in the integration
namespace is used to configure the payload enricher. The workings of a
payload enricher require a closer look.
Let’s see the configuration first:
<int:enricher input-channel="in-channel" request-channel="enricher-req-channel" output-channel="stdout"> <int:property name="price" expression="payload.price"/> <int:property name="instrument" expression="payload.instrument"/> </int:enricher> <int:service-activator input-channel="enricher-req-channel" ref="tradeEnricher"> </int:service-activator> <bean id="enricherBean" class="com.madhusudhan.jsi.endpoints.enricher.Enricher" /> <bean id="tradeEnricher" class="com.madhusudhan.jsi.endpoints.enricher.PriceEnricher" />
There’s a lot going on here. Like any other endpoint, the
enricher expects a message in the input-channel
, too. It picks up the message
and passes it on to request-channel
and waits for a reply. There should be some other component listening
on this request-channel
to enrich
the message. After enriching the payload, this component then
publishes the reply back to the reply channel. The reply channel is
declared as a header
property on the message itself
(see the Price
Message
below). Once the enricher gets a
reply, it sets the properties with the enriched data by using
expressions (See Spring Expressions later in this
chapter for details about expressions).
In the above configuration, a Price
is posted onto the in-channel
. The Price
message does not have any data—no
instrument
or price
set. The enricher then posts this
Price
onto the enricher-req-channel
and waits for a reply.
A service activator, which acts as the enricher listening on the
enricher-req-channel
, consumes the
messages and enriches and returns the Price
. The return value is published on the
reply-channel
. The enricher
continues processing once it receives a message on the reply-channel
. It adds the additional
properties such as price
and
instrument
to the message and sends
them to the output-channel
.
The following snippet shows the published
Price
. Note that the Price
does
not have any initial values set (they will be set via the PriceEnricher
).
public void publishPrice() { //Create a Price object with no values Price p = new Price(); // note the reply-channel as header property Message<Price> msg = MessageBuilder.withPayload(p) .setHeader(MessageHeaders.REPLY_CHANNEL, "reply-channel") .build(); channel.send(msg, 10000); System.out.println("Price Message published."); }
The PriceEnricher
is given
the message (via Service Activator
) to enrich. We
can use any complex logic here to set the data:
public class PriceEnricher { public Price enrichHeader(Message m) { Price p = (Price)m.getPayload(); p.setInstrument("IBM"); p.setPrice(111.11); return p; } }
The Enricher
component follows a Gateway
pattern, which is discussed in the next section.
If the prime requirement of your project is to write applications without requiring knowledge of the messaging system or connecting to a messaging framework, then the Gateway pattern is the one to use. We have previously seen some examples of sending and receiving messages by various publishers and consumers. However, we were fetching a reference to the channels from the application context every time we wanted to publish or consume a message. This means that our client code is tied to Framework’s messaging components.
When you use the Gateway pattern, you will not be using any of the messaging components, but will be dealing with a simple interface that will expose your functionality.
Essentially, there are two types of Gateways: Synchronous Gateway and Asynchronous Gateway. In Synchronous Gateway, the message call will be blocked until the process is completed. In Asynchronous Gateway, the message call is not blocked. More on each gateway appears in their respective sections below.
The first step in writing a gateway is to define an interface
that describes the interaction methods with the messaging system. In
our example, we have an ITradeGateway
interface with a single method
processTrade
. This is the only
interface that will be exposed to the client with no implementation
provided.
public interface ITradeGateway { public Trade processTrade(Trade t); }
The next step is to configure a gateway:
<int:gateway id="tradeGateway" default-request-channel="trades-in-channel" default-reply-channel="trades-out-channel" service-interface="com.madhusudhan.jsi.endpoints.gateway.ITradeGateway" />
There’s a lot happening in the backend.
When the application context is loaded with the above
configuration, a gateway endpoint is created with default request and
reply channels. The gateway has a service-interface
attribute which points to
our ITradeGateway
interface. The
framework’s GatewayProxyFactoryBean
creates a proxy for this service interface (and that’s the reason you
don’t have to provide any implementation for the interface). The proxy
will serve the client’s incoming and outgoing requests using the
channels provided.
So, if a client calls a processTrade
method, it will be served by
the proxy. It publishes a Message
with a Trade
object onto the
trades-in-channel
. The proxy then
blocks the call until it receives a reply from the trades-out-channel
. The reply is then passed
back to the client. There will be another component picking up a
message from the trades-in-channel
to process the Trade accordingly.
The client code looks like this:
public GatewayEndpointTest() { ... public GatewayEndpointTest() { ctx = new ClassPathXmlApplicationContext("endpoints-gateway-beans.xml"); // obtain our service interface tradeGateway = ctx.getBean("tradeGateway",ITradeGateway.class); } public void publishTrade(Trade t) { // call the method to publish the trade! Trade it = tradeGateway.processTrade(t); System.out.println("Trade Message published (Reply)."+it.getStatus()); } public static void main(String[] args) { GatewayEndpointTest test = new GatewayEndpointTest(); Trade t = new Trade(); test.publishTrade(t); } }
We get the tradeGateway
bean
(which is the service interface) from the application context and
invoke the processTrade
method.
There is no dependency on the messaging framework in this code. From
the client’s perspective, the client is invoking a method on a service
interface.
To complete the example, we can configure a Service Activator to
pick up messages from the trades-in-channel
(this is the same channel
where the messages are published by the proxy) and post the replies to
the trades-out-channel
(this is the same channel
where the proxy is listening for replies). The following code snippet
illustrates the Service Activator
endpoint:
<int:service-activator input-channel="trades-in-channel" output-channel="trades-out-channel" ref="tradeProcessor" method="receiveTrade" > </int:service-activator> <bean id="tradeProcessor" class="com.madhusudhan.jsi.endpoints.gateway.TradeProcessor" />
The TradeProcessor
is a
simple class that is invoked by the Activator
endpoint when a message arrives at the trades-in-channel
. It then processes the
Trade and sends a reply (via the return value):
public class TradeProcessor { public Trade receiveTrade(Trade t) { System.out.println("Received the Trade via Gateway:"+t); t.setStatus("PROCESSED"); return t; } }
The client in the above example will be blocked until it gets a reply from the processors. If the client’s requirement is to fire and continue, using Asynchronous Gateway is the right choice.
In order to achieve the asynchronous behavior, the service
interface is required to have the return type changed so it now
returns a Future
object:
import java.util.concurrent.Future;
public interface ITradeGatewayAsync {
public Future<Trade> processTrade(Trade t);
}
This is the only change required to make the gateway behave
asynchronously. In your client program, the processTrade
will now return a reply to your
message as a Future
object:
public void publishTrade(Trade t) { Future<Trade> f = tradeGateway.processTrade(t); try { Trade ft = f.get(); } catch (Exception e) { .. } }
The Delayer
endpoint is used to
introduce delay between sender and receiver. This component forces the
messages to be delivered at a later time based on the configuration. It
will pick up a message from an input channel, apply the delay and send
to the output channel when delay expires.
The configuration is simple, as demonstrated in the following snippet:
<int:delayer default-delay="5000" input-channel="in-channel" output-channel="out-channel"> </int:delayer>
All messages arriving at the in-channel
will be delivered to the out-channel
after a delay of five seconds. The
messages will be delivered instantly if the default-delay
is set to zero or
negative.
You can also use a header
field to define the
delay period for each message. In order to do this, you need to let the
framework know by using the attribute delay-header-name
as shown below:
<int:delayer default-delay="5000"
input-channel="prices-in-channel"
output-channel="prices-out-channel"
delay-header-name="MSG_DELAY">
</int:delayer>
All messages which have an MSG_DELAY
header attribute will have a delay
set by the value of the header field. Messages with no MSG_DELAY
header attribute will have default-delay
set on them.
Get Just Spring Integration now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.