Implementing the pipes and filters pattern using actors in Akka for Java

How messages help you decouple, test, and re-use your software’s code.

By Raoul-Gabriel Urma and Richard Warburton
November 16, 2017
Industry Industry (source: bstad)

We would like to introduce a couple of interesting concepts from Akka by giving an overview of how to implement the pipes and filters enterprise integration pattern. This is a commonly used pattern that helps us flexibly compose together sequences of alterations to a message. In order to implement this pattern, we use Akka—a popular library that provides new approaches to write modern reactive software in Java and Scala.

The business problem

Recently, we came across an author-publishing application made available as a service. It was responsible for processing markdown text, and it would execute a series of operations back to back:

Learn faster. Dig deeper. See farther.

Join the O'Reilly online learning platform. Get a free trial today and find answers on the fly, or master something new and useful.

Learn more
  1. Warn authors if they were using forbidden short forms such as “isn’t” or “I’m” (let’s call this the “text checker”).
  2. Transform Latex code to Unicode. E.g., \alpha to α (let’s call this the “latex to unicode transformer”).
  3. Upload the result to an S3 bucket so it is publicly available (an “S3 uploader”).

After playing with the service, it became clear that it would benefit from several extensions:

  1. Use the Latex to Unicode functionality to transform text independently of the text-checking functionality.
  2. Add another functionality to translate the text using American English spelling.
  3. Create a bespoke chain of functionalities—for example, just text checker + S3 uploader.

The software engineering problem

The problem with this application is that the service was written in a way that violates the single responsibility principle. This states that each class should be written so that it only has one responsibility—or reason to change. Our earlier code example, however, was written as one single god class implementation, and the different functionalities were coupled together inside a single pass. For example, one could not use the Latex to Unicode transformer without calling the text checker functionality. Consequently, it was difficult to create new or reuse existing behavior to create a different pipeline of functionalities.

Here’s a simplified version of the problematic code and associated diagram to illustrate the problem:

public class PipesAndFilterProblem {
    public static void main(String[] args) {
        String[] forbiddenWords = {"isn't", "i'm", "don't"};
        String message = "I'm feeling \\beta!";
        String messageLowerCase = message.toLowerCase();
        if(Arrays.stream(forbiddenWords).noneMatch(messageLowerCase::contains)) {
            String converted =  message.replaceAll("\\\\alpha", "α").replaceAll("\\\\beta", "β");
            // ... upload to S3
            System.out.println(converted);
        }
    }
}
Figure 1. All three steps munged into one method.

The above code has two key issues that we need to address.

  • How do you reorder functionalities? For example, Latex to Unicode first and then the text checker.
  • How do you add a new functionality? For example, the text translation.

Pipes and filters: Making independent components from one monolithic task

The pipes-and-filters pattern is a solution for this problem. You break down a complex task into independent components that you can then compose together to create a complex chain. There are several benefits with this approach:

  1. Each component is decoupled from one another and can be maintained independently.
  2. Each component can also be tested in isolation.
  3. You can re-use components to create different chains.
Figure 2. Splitting out the different steps that follow SRP.

A good comparison to understand the decoupling in pipes and filters is to think about the ways commands in Unix work. You can say cat file | uniq | wc -l. Each program is loosely coupled from one another, but can also be arbitrarily composed to create complex pipelines.

A filter is a piece of functionality to execute. A pipe is simply a mechanism to pass the output of one filter to become the input of another one.

One can distinguish two approaches for delivering the output of one filter to another filter:

  • Point-to-point: The output of one filter becomes the input of exactly one filter. In other words, there’s one producer and one consumer
  • Publisher/Subscriber: The output of one filter can be broadcasted to become the input of several filters (one producer and multiple consumers).

For the purpose of our business problem, we will implement the point-to-point semantics.

Akka: Actors bring messages and a simple concurrency model

Akka is a library available in both Java and Scala to build distributed, message-based applications. We will focus on using a new abstraction provided by Akka called “Actors.”

In simplest terms, an actor is a component that processes messages off a queue (its “mailbox”). Actors have other interesting characteristics such as:

  • Actors do not expose internal state, which reduces the scope for concurrency bugs.
  • Messages are sent to actors asynchronously, which enables flexible scheduling of the processing of the messages.
  • Actors can be executed on a distributed system.

Figure 3 exemplifies the interaction between two actors:

Figure 3. Actors, Mailboxes, and local State.

So, how do you implement an actor? The code below represents the text-checker functionality as an independent actor written in Akka. It receives messages to process and check for the use of short forms. If the message is correct, it is messaged back to the sender. Don’t worry about all the details of this code; we encourage users to take a deeper look at the Akka API if you are curious. In a nutshell:

  • An UntypedActor is an abstract class that you can extend from. You need to implement the method onReceive, which consumes messages from the internal mailbox of an actor.
  • An ActorSystem is the entry point to running and using actors.
  • Props lets you configure an actor.
  • ActorRef is a reference to an actor, which protects its internal state.

You can play with the code by modifying the unit tests below. The code is kept simple for the purpose of the example, but one can imagine that a text checker would go through an extensive list of words loaded from a file.

public class SingleTextCheckerActor extends UntypedActor {
    private final static String[] forbiddenWords = {"isn't", "i'm", "don't"};

    @Override
    public void onReceive(Object message) throws Throwable {
        String lowercaseMessage = ((String) message).toLowerCase();
        if(!containsForbiddenWord(lowercaseMessage)){
            getSender().tell(message, getSelf());
        }
    }

    private boolean containsForbiddenWord(String message) {
        return Arrays.stream(forbiddenWords)
                     .anyMatch(message::contains);
    }
}
public class SingleTextCheckerActorTest {
    private ActorSystem system;

    @Test
    public void testMessageDoesNotContainForbiddenWord() {
        JavaTestKit testProbe = new JavaTestKit(system);
        Props props = Props.create(SingleTextCheckerActor.class);
        ActorRef subject = system.actorOf(props);
        String msg = "I am waiting for you.";
        subject.tell(msg, testProbe.getRef());
        testProbe.expectMsgEquals(duration("1 second"), msg);
    }

    @Test
    public void testMessageContainsForbiddenWord() {
        JavaTestKit testProbe = new JavaTestKit(system);
        Props props = Props.create(SingleTextCheckerActor.class);
        ActorRef subject = system.actorOf(props);
        subject.tell("I'm feeling good!", testProbe.getRef());
        testProbe.expectNoMsg(duration("1 second"));
    }

    @Before
    public void setup() {
        system = ActorSystem.create();
    }

    @After
    public void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }
}

Extending actors to the pipe and filters pattern

So, how can you extend this to implement the pipe and filters pattern? In a nutshell, a filter can be seen an actor processing a message. A pipe is the mailbox of another actor, as shown in Figure 4.

Figure 4. How Pipes and Filters map to Actors.

We need a way for our text-checker actor to send its output to another actor that will process the message. In fact, all actors should have the ability to pass on the result to another actor. By doing this, you can build a chain of functionalities. Let’s extend our text-checker actor so it can send a message to another actor:

public class TextCheckerActor extends UntypedActor {

    private ActorRef nextActor;
    private final static String[] forbiddenWords = {"isn't", "i'm", "don't"};
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public TextCheckerActor(ActorRef nextActor) {
        this.nextActor = nextActor;
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        log.info("Received Message: " + message);
        String lowercaseMessage = ((String) message).toLowerCase();
        if(!containsForbiddenWord(lowercaseMessage)){
            nextActor.tell(message, getSelf());
        }
    }

    private boolean containsForbiddenWord(String message) {
        return Arrays.stream(forbiddenWords)
                .anyMatch(message::contains);
    }
}

We can create another actor to convert latex symbols to unicode. This actor will act as another filter:

public class LatexToUnicodeActor extends UntypedActor {

    private ActorRef nextActor;
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public LatexToUnicodeActor(ActorRef nextActor) {
        this.nextActor = nextActor;
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        log.info("Received Message: " + message);

        String msg = (String) message;
        String result =  msg.replaceAll("\\\\alpha", "α")
                            .replaceAll("\\\\beta", "β");

        nextActor.tell(result, getSelf());
    }
}

Now that you have two filters, how do you connect them? Here’s a test that puts it all together:

public class PipesAndFilterTest {

    private ActorSystem system;

    @Test
    public void testTextCheckerAndLatextoUnicodePipeline() {
        JavaTestKit endProbe = new JavaTestKit(system);
        // LatexToUnicode filter
        Props latexToUnicodeProps = Props.create(LatexToUnicodeActor.class, endProbe.getRef());
        ActorRef latexToUnicodeActor = system.actorOf(latexToUnicodeProps, "latex-to-unicode-actor");
        // TextChecker filter
        Props textCheckerProps = Props.create(TextCheckerActor.class, latexToUnicodeActor);
        ActorRef textCheckerActor = system.actorOf(textCheckerProps, "text-checker-actor");
        // test
        textCheckerActor.tell("I think the answer is \\alpha + \\beta", textCheckerActor);
        endProbe.expectMsgEquals(duration("1 second"), "I think the answer is α + β");
    }

    @Test
    public void testTextCheckerAndNoLatextoUnicodePipeline() {
        JavaTestKit endProbe = new JavaTestKit(system);
        // LatexToUnicode filter
        Props latexToUnicodeProps = Props.create(LatexToUnicodeActor.class, endProbe.getRef());
        ActorRef latexToUnicodeActor = system.actorOf(latexToUnicodeProps, "latex-to-unicode-actor");
        // TextChecker filter
        Props textCheckerProps = Props.create(TextCheckerActor.class, latexToUnicodeActor);
        ActorRef textCheckerActor = system.actorOf(textCheckerProps, "text-checker-actor");
        // test
        textCheckerActor.tell("I'm wondering whether the answer is \\alpha + \\beta", textCheckerActor);
        endProbe.expectNoMsg(duration("1 second"));
    }

    @Before
    public void setup() {
        system = ActorSystem.create();
    }

    @After
    public void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }
}

The benefits actors bring over a traditional messaging queue system

We’ve overviewed how you can get started working with Akka and actors. You may be wondering about the benefits of using actors here as compared to perhaps implementing this yourself or using a messaging queue system. Here’s a quick list of how using actors is the better choice:

  1. The implementation code is simple.
  2. Akka already provides actors that are a lightweight queue mechanism.
  3. Message passing is asynchronous, which enables flexible scheduling.
  4. Actors are naturally decoupled from one another.
  5. A configuration can be specified to let you run these actors distributedly without having to change the implementation of each actor. Note that by default, Akka guarantees at-most-once delivery.
  6. Akka has support for logging, monitoring, and testing of actors out of the box.
  7. Actors can be exposed as a REST service through Akka http.

Note that if distribution of the services is not an issue, we could have simply solved the problem using function composition, which is more lightweight in comparison to adding actors. However, using actors, you get distribution and elasticity thanks to their location transparency property as well as support for logging and monitoring provided by Akka.

To learn more about how to use actors, we recommend you check out our Programming Actors with Akka learning path.

Post topics: Software Engineering
Share: