Common Endpoints

We have touched upon one of the common endpoints in an earlier chapter—Service Activators. In this section, we will discuss this in detail, and other endpoints, too. First, make sure that you have the integration namespace declared in your XML file for these endpoints:

...
xmlns:int="http://www.springframework.org/schema/integration"

xsi:schemaLocation="http://www.springframework.org/schema/integration 
  http://www.springframework.org/schema/integration/spring-integration-2.1.xsd"
...

Service Activator

The Service Activator is a generic endpoint which invokes a method on a bean whenever a message arrives on the channel. If the method has a return value, then the value will be sent to an output channel if the channel is configured.

Configuring the activator using the namespace is relatively straightforward. Use the service-activator element, setting input-channel and a ref to the bean:

<int:service-activator 
  input-channel="positions-channel"
  ref="newTradeActivator" 
  method="processNewPosition">
</int:service-activator>

<bean id="newTradeActivator" 
  class="com.madhusudhan.jsi.endpoints.common.NewTradeActivator" />

Any message arriving at positions-channel will be passed on to a NewTradeActivator (which the attribute ref points to) and the processNewPosition method is invoked which is declared using the method attribute. If the bean class has only one method, then you do not have to declare the method attribute—the framework resolves it as the service method and invokes it appropriately.

The NewTradeActivator is a simple class that has a single method which expects a Position object. This class is basically the entry point to acting as a service.

public class NewTradeActivator {
  Position position = ..
  public void processNewPosition(Position t) {
    System.out.println("Method invoked to process the new Position"+t);
    // process the position..
    // ...
  }
}

The method can return a non-null value which is wrapped in a Message and sent to an output-channel. For example, if you wish to send a reply to another channel after processing the Position, you can do this by simply returning the position as the method’s return value:

// Return value will be wrapped in a Message
// and sent to an output channel 
public Position processNewPosition(Position t) {
  System.out.println("Method invoked to process the new Position"+t);
  // process the position..
  // ...
  return position;
}

You may omit declaring the optional output-channel attribute. If you do, and if your method has a return value, then the framework will use the message header property called replyChannel to send the reply. An exception will be thrown if no replyChannel header property is found.

The service method can have either Message or a Java object as an argument. In the latter case, the payload from the incoming message is extracted and passed on to the message. As the incoming message is a Java Object, this mode will not tie our implementation to Spring API, making it a preferred option. In the above example, a Position is wrapped up in a Message and sent to the channel.

Message Bridge

A MessageBridge is a simple endpoint that couples different messaging modes or adapters. An example of a common use of the bridge is to tie up a point-to-point (P2P) mode channel to a Publish/Subscribe (Pub/Sub) mode. In a P2P mode, a PollableChannel is used by the endpoint, whereas a PublishSubscribeChannel is used in Pub/Sub mode.

The MessageBridge is declared using the bridge element in the integration namespace:

<int:publish-subscribe-channel 
  id="trades-in-channel" />

<int:channel id="trades-out-channel">
  <int:queue capacity="10" />
</int:channel>

<!-- Bridges pub/sub channel to a p2p channel -->

<int:bridge input-channel="trades-in-channel" 
            output-channel="trades-out-channel" />

In the above snippet, the bridge picks up a message from the input channel and publishes onto the output channel. The input channel is PublishSubscribeChannel, while the output channel is QueueChannel.

To complete our example, a service activator is hooked onto the output channel. As soon as the message arrives at the output channel (via the bridge endpoint), the PositionReceiver bean is invoked for action:

<int:service-activator 
  input-channel="trades-out-channel" 
  ref="positionReceiver"/>

<bean id="positionReceiver" 
  class="com.madhusudhan.jsi.endpoints.PositionReceiver"/>

Message Enricher

A Message Enricher component enriches the incoming message with additional information and sends the updated object to the downstream consumers. For example, a Trade normally consists of a piece of coded information, such as a security ID or a customer account number. The reason for this is not only to keep the Trade object slim and sleek, but also to protect the confidential information from other systems. This data is attached while the Trade passes through different stages as and when required.

The Framework provides two types for enriching messages: Header Enricher and Payload Enricher.

Header Enricher

You can add additional header attributes to the message using the Header Enricher component. Let’s say the incoming Trade message needs to have a couple of header properties: SRC_SYSTEM and TARGET_SYSTEM. As these properties were not in the original message, the message needs to be enriched. We use the header-enricher tag to do this:

<int:header-enricher 
  input-channel="in-channel"
  output-channel="out-channel">
  <int:header name="SRC_SYSTEM" value="BBG" />
  <int:header name="TARGET_SYSTEM" value="LOCAL" />

</int:header-enricher>

As you can see, we added two properties to the outgoing message. So, if you print out the headers of this message, the newly added properties should appear on the console:

Headers: {timestamp=1328187611172, 
  SRC_SYSTEM=BBG, TARGET_SYSTEM=LOCAL, id=...}

You can set a number of predefined properties such as priority, reply-channel, error-channel, etc.

This is the enhanced configuration for header-enricher:

<int:header-enricher id="maxi-enricher" input-channel="in-channel"
  output-channel="out-channel">
    <int:priority value="10"/>
    <int:error-channel ref="myErrorChannel"/>
    <int:correlation-id value="APP_OWN_ID"/>
    <int:reply-channel value="reply-channel"/>
    <int:header name="SRC_SYSTEM" value="BBG" />
</int:header-enricher>

<int:publish-subscribe-channel id="myErrorChannel" />

Note that the ref tag looks for a named bean while the value tag takes a literal value only.

Framework also supports setting header properties using payload by allowing the header-enricher’s header property to refer to a bean:

<int:header-enricher id="pojo-enricher" 
  input-channel="in-channel"
  output-channel="out-channel">
  <int:header name="ID" 
  ref="tradeEnricher" method="enrichHeader"/>
</int:header-enricher>

The ID is set by extracting data from the payload with the help of the TradeEnricher bean. The following snippet shows this bean, which has a simple functionality in returning the ID attribute by reading the Trade’s ID and adding SRC to it at the end.

public class TradeEnricher {
  public String enrichHeader(Message m) {
    Trade t = (Trade)m.getPayload();
    return t.getId()+"SRC";
  }
}

Payload Enricher

If the requirement is to add or enrich the payload with additional information, use the PayloadEnricher component. The enricher tag in the integration namespace is used to configure the payload enricher. The workings of a payload enricher require a closer look.

Let’s see the configuration first:

<int:enricher input-channel="in-channel" 
  request-channel="enricher-req-channel" 
  output-channel="stdout">
  <int:property name="price" expression="payload.price"/>
  <int:property name="instrument" expression="payload.instrument"/>
</int:enricher>

<int:service-activator input-channel="enricher-req-channel"
  ref="tradeEnricher">
</int:service-activator>

<bean id="enricherBean" class="com.madhusudhan.jsi.endpoints.enricher.Enricher" />
<bean id="tradeEnricher" class="com.madhusudhan.jsi.endpoints.enricher.PriceEnricher" />

There’s a lot going on here. Like any other endpoint, the enricher expects a message in the input-channel, too. It picks up the message and passes it on to request-channel and waits for a reply. There should be some other component listening on this request-channel to enrich the message. After enriching the payload, this component then publishes the reply back to the reply channel. The reply channel is declared as a header property on the message itself (see the Price Message below). Once the enricher gets a reply, it sets the properties with the enriched data by using expressions (See Spring Expressions later in this chapter for details about expressions).

In the above configuration, a Price is posted onto the in-channel. The Price message does not have any data—no instrument or price set. The enricher then posts this Price onto the enricher-req-channel and waits for a reply. A service activator, which acts as the enricher listening on the enricher-req-channel, consumes the messages and enriches and returns the Price. The return value is published on the reply-channel. The enricher continues processing once it receives a message on the reply-channel. It adds the additional properties such as price and instrument to the message and sends them to the output-channel.

The following snippet shows the published Price. Note that the Price does not have any initial values set (they will be set via the PriceEnricher).

public void publishPrice() {
  //Create a Price object with no values
  Price p = new Price();

  // note the reply-channel as header property
  Message<Price> msg = MessageBuilder.withPayload(p)
    .setHeader(MessageHeaders.REPLY_CHANNEL, "reply-channel")
    .build();

  channel.send(msg, 10000);
  System.out.println("Price Message published.");
}

The PriceEnricher is given the message (via Service Activator) to enrich. We can use any complex logic here to set the data:

public class PriceEnricher {
  public Price enrichHeader(Message m) {
    Price p = (Price)m.getPayload();
    p.setInstrument("IBM");
    p.setPrice(111.11);
    return p;
  }
}

The Enricher component follows a Gateway pattern, which is discussed in the next section.

Gateway

If the prime requirement of your project is to write applications without requiring knowledge of the messaging system or connecting to a messaging framework, then the Gateway pattern is the one to use. We have previously seen some examples of sending and receiving messages by various publishers and consumers. However, we were fetching a reference to the channels from the application context every time we wanted to publish or consume a message. This means that our client code is tied to Framework’s messaging components.

When you use the Gateway pattern, you will not be using any of the messaging components, but will be dealing with a simple interface that will expose your functionality.

Essentially, there are two types of Gateways: Synchronous Gateway and Asynchronous Gateway. In Synchronous Gateway, the message call will be blocked until the process is completed. In Asynchronous Gateway, the message call is not blocked. More on each gateway appears in their respective sections below.

Synchronous Gateway

The first step in writing a gateway is to define an interface that describes the interaction methods with the messaging system. In our example, we have an ITradeGateway interface with a single method processTrade. This is the only interface that will be exposed to the client with no implementation provided.

public interface ITradeGateway {
  public Trade processTrade(Trade t);
}

The next step is to configure a gateway:

<int:gateway id="tradeGateway" 
  default-request-channel="trades-in-channel"
  default-reply-channel="trades-out-channel" 
  service-interface="com.madhusudhan.jsi.endpoints.gateway.ITradeGateway" />

There’s a lot happening in the backend.

When the application context is loaded with the above configuration, a gateway endpoint is created with default request and reply channels. The gateway has a service-interface attribute which points to our ITradeGateway interface. The framework’s GatewayProxyFactoryBean creates a proxy for this service interface (and that’s the reason you don’t have to provide any implementation for the interface). The proxy will serve the client’s incoming and outgoing requests using the channels provided.

So, if a client calls a processTrade method, it will be served by the proxy. It publishes a Message with a Trade object onto the trades-in-channel. The proxy then blocks the call until it receives a reply from the trades-out-channel. The reply is then passed back to the client. There will be another component picking up a message from the trades-in-channel to process the Trade accordingly.

The client code looks like this:

public GatewayEndpointTest() {
  ...
  public GatewayEndpointTest() {
    ctx = new ClassPathXmlApplicationContext("endpoints-gateway-beans.xml");
    // obtain our service interface
    tradeGateway = ctx.getBean("tradeGateway",ITradeGateway.class);
  }

  public void publishTrade(Trade t) {
   // call the method to publish the trade!
   Trade it = tradeGateway.processTrade(t);
   System.out.println("Trade Message published (Reply)."+it.getStatus());
  }
  public static void main(String[] args) {
    GatewayEndpointTest test = new GatewayEndpointTest();
    Trade t = new Trade();
    test.publishTrade(t);
  }
}

We get the tradeGateway bean (which is the service interface) from the application context and invoke the processTrade method. There is no dependency on the messaging framework in this code. From the client’s perspective, the client is invoking a method on a service interface.

To complete the example, we can configure a Service Activator to pick up messages from the trades-in-channel (this is the same channel where the messages are published by the proxy) and post the replies to the trades-out-channel (this is the same channel where the proxy is listening for replies). The following code snippet illustrates the Service Activator endpoint:

<int:service-activator 
  input-channel="trades-in-channel"
  output-channel="trades-out-channel" 
  ref="tradeProcessor" 
  method="receiveTrade" >
</int:service-activator>
    
<bean id="tradeProcessor" 
  class="com.madhusudhan.jsi.endpoints.gateway.TradeProcessor" />

The TradeProcessor is a simple class that is invoked by the Activator endpoint when a message arrives at the trades-in-channel. It then processes the Trade and sends a reply (via the return value):

public class TradeProcessor {
  public Trade receiveTrade(Trade t) {
    System.out.println("Received the Trade via Gateway:"+t);
    t.setStatus("PROCESSED");
    return t;
  }
}

Asynchronous Gateway

The client in the above example will be blocked until it gets a reply from the processors. If the client’s requirement is to fire and continue, using Asynchronous Gateway is the right choice.

In order to achieve the asynchronous behavior, the service interface is required to have the return type changed so it now returns a Future object:

import java.util.concurrent.Future;

public interface ITradeGatewayAsync {
  public Future<Trade> processTrade(Trade t);
}

This is the only change required to make the gateway behave asynchronously. In your client program, the processTrade will now return a reply to your message as a Future object:

public void publishTrade(Trade t) {
  Future<Trade> f = tradeGateway.processTrade(t);
  try { 
    Trade ft = f.get();
  } catch (Exception e) { .. }
}

Delayer

The Delayer endpoint is used to introduce delay between sender and receiver. This component forces the messages to be delivered at a later time based on the configuration. It will pick up a message from an input channel, apply the delay and send to the output channel when delay expires.

The configuration is simple, as demonstrated in the following snippet:

<int:delayer default-delay="5000" 
  input-channel="in-channel"
  output-channel="out-channel">
</int:delayer>

All messages arriving at the in-channel will be delivered to the out-channel after a delay of five seconds. The messages will be delivered instantly if the default-delay is set to zero or negative.

You can also use a header field to define the delay period for each message. In order to do this, you need to let the framework know by using the attribute delay-header-name as shown below:

<int:delayer default-delay="5000" 
  input-channel="prices-in-channel"
  output-channel="prices-out-channel" 
  delay-header-name="MSG_DELAY">
</int:delayer>

All messages which have an MSG_DELAY header attribute will have a delay set by the value of the header field. Messages with no MSG_DELAY header attribute will have default-delay set on them.

Get Just Spring Integration now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.