Akka Streams offers a number of predefined building blocks for your graphs (i.e. processing pipelines). Should you need a non-standard solution, there’s an API to help you write the custom part of the graph. In this post I’m going to walk you through implementing your own graph stage.
Recap: Akka Streams concepts
Since the stream processing terminology heavily depends on the library/toolkit you are using, here is a quick reminder of what things are called in the Akka Streams world: the producer is called a
Source, the consumer - a
Sink and the processing stages are
Flows. Each of those is a specialized graph stage whose type is determined by the number of inputs and outputs - a
Source has no inputs and a single output, a
Sink has a single input and no outputs, a
Flow has a single input and a single output.
In terms of the types, each part of the graph is a
GraphStage with a given
Shape - with the most basic shapes being:
SinkShape. There are also other more complex
Shapes available, used for modelling such concepts as broadcasting or merging elements of the stream, but those are out of the scope of this post.
The use case
Let’s say that having a stream of elements of type
E you want to observe their arbitrary property of type
P, accumulate the elements as long as the property remains unchanged and only emit an
immutable.Seq[E] of accumulated elements when the property changes. In a real-life example the elements can be e.g. lines in a CSV file which you would like to group by a given field.
Anatomy of a custom graph stage
A custom graph stage is nothing more than an implementation of:
In our example the stage is going to have a single input and a single output, which makes it a
Flow whose shape is:
The definition of the stage thus becomes:
1 2 3
Now you just need to implement two methods
def shape: FlowShape- to provide a concrete shape
def createLogic(inheritedAttributes: Attributes): GraphStageLogic- to provide your custom logic of the stage
Let’s now dig into the details of those two methods.
Implementing a custom graph stage
Providing a custom
FlowShape simply consists of an
Inlet and an
Outlet, i.e. the ports of the stage. To define a port, you need to provide its name and data type. After defining the ports, the stage implementation becomes:
1 2 3 4 5 6 7
Providing a custom
GraphStages are meant to be reusable, it is crucial to keep them immutable, i.e. not to put any mutable state inside them. On the other hand, however, the stage we are implementing here is definitely stateful - its state consists of the accumulated elements. Here is where the
GraphStageLogic comes to the rescue - since a new instance of it is created for every materialization of the flow, it is the one and only place to keep the mutable state in.
GraphStageLogic, apart from keeping the mutable state, you may also define handlers for the
onPull() events. The
onPush() event occurs when a new element from the upstream is available and can be acquired using
onPull(), on the other hand, occurs when the downstream is ready to accept a new element which can be sent with
So here is what a draft implementation of the
GraphStageLogic with the handlers is going to look like:
1 2 3 4 5 6 7 8 9 10 11 12 13
To implement the actual accumulating logic, you need to:
- know how to extract the observed property of the incoming elements,
- keep track of the incoming elements in some kind of a buffer.
Extracting the observed property
The easiest way to know which property to observe is to have the user provide a function which extracts this property - so you need to adjust the stage definition a bit:
Keeping track of the incoming elements
The internal state of your stage logic will consist of:
Option[P]to keep the current value of the observed property (empty until the first element arrives),
Vector[E]to accumulate the elements (cleared when the observed property changes).
When the next input element arrives (in
onPush()), you want to extract its property and check if it differs from the current value. If there is no current value yet or the values are equal, you add the element to the buffer and
pull() the input, otherwise you
push() the buffer contents downstream and clear the buffer. When the downstream requests a new sequence of elements with
onPull(), you just need to
pull() the input in order to indicate, that the stage is ready to accept a new incoming element.
An additional case that you need to handle is when the upstream has completed (i.e. no more input elements are going to arrive or there was an error in the upstream) - then you need to push the last elements from the buffer (unless it is empty) and complete the stage afterwards. Moreover, to be nice to memory and the GC, you may wish to clear the buffer after the stage is complete.
The full implementation of the above concepts is going to be something like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
If you are wondering why
emit() is used instead of
onUsptreamFinish() (line 40), the answer is - because it is not possible to push a port which has not been pulled. Once the upstream is finished, the buffer may still contain the final group of accumulated elements - but chances are that the output port has not been pulled after the previous group was pushed. You want, however, to send the final group anyway - that is where
emit() comes to the rescue - when it detects that the output port is not available (i.e. cannot be pushed), it replaces the
OutHandler with a temporary one and only then does it execute the actual
Now you are ready to use the custom stage in your application with
.via(new AccumulateWhileUnchanged(...)). For example, having a simple domain like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
when you run:
1 2 3
the output will be:
1 2 3
There is a number of useful utilities to help you test your custom graph stages. With the help of those and using the
SampleElements helper defined above, a sample test case for the above stage looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13
TestSink.probe (line 6) creates an instance of
akka.stream.testkit.TestSubscriber.Probe, which offers methods such as
expectComplete() (lines 10-11) to verify whether the stage behaves correctly.
After diligently going through this post, you should understand how the
GraphStage API is designed and how to use it to implement your own graph stage.
For even more details, please refer to the Custom stream processing section of the Akka Streams documentation.
If you find the
AccumulateWhileUnchanged stage useful, there is no need to rewrite it from scratch, since it is a part of akka-stream-contrib - a project which groups various add-ons to Akka Streams core.