Jacek Kunicki

to code or not to be

Implementing a Custom Akka Streams Graph Stage

Background

Akka Streams offers a number of predefined building blocks for your graphs (i.e. processing pipelines). Should you need a non-standard solution, there’s an API to help you write the custom part of the graph. In this post I’m going to walk you through implementing your own graph stage.

Recap: Akka Streams concepts

Since the stream processing terminology heavily depends on the library/toolkit you are using, here is a quick reminder of what things are called in the Akka Streams world: the producer is called a Source, the consumer - a Sink and the processing stages are Flows. Each of those is a specialized graph stage whose type is determined by the number of inputs and outputs - a Source has no inputs and a single output, a Sink has a single input and no outputs, a Flow has a single input and a single output.

In terms of the types, each part of the graph is a GraphStage with a given Shape - with the most basic shapes being: SourceShape, FlowShape and SinkShape. There are also other more complex Shapes available, used for modelling such concepts as broadcasting or merging elements of the stream, but those are out of the scope of this post.

The use case

Let’s say that having a stream of elements of type E you want to observe their arbitrary property of type P, accumulate the elements as long as the property remains unchanged and only emit an immutable.Seq[E] of accumulated elements when the property changes. In a real-life example the elements can be e.g. lines in a CSV file which you would like to group by a given field.

Anatomy of a custom graph stage

A custom graph stage is nothing more than an implementation of:

1
abstract class GraphStage[S <: Shape]

In our example the stage is going to have a single input and a single output, which makes it a Flow whose shape is:

1
FlowShape[E, immutable.Seq[E]]

The definition of the stage thus becomes:

1
2
3
final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
  // ...
}

Now you just need to implement two methods

  • def shape: FlowShape - to provide a concrete shape
  • def createLogic(inheritedAttributes: Attributes): GraphStageLogic - to provide your custom logic of the stage

Let’s now dig into the details of those two methods.

Implementing a custom graph stage

Providing a custom FlowShape

A FlowShape simply consists of an Inlet and an Outlet, i.e. the ports of the stage. To define a port, you need to provide its name and data type. After defining the ports, the stage implementation becomes:

1
2
3
4
5
6
7
final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

  val in = Inlet[E]("AccumulateWhileUnchanged.in")
  val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

  override def shape = FlowShape(in, out)
}

Providing a custom GraphStageLogic

Since the GraphStages are meant to be reusable, it is crucial to keep them immutable, i.e. not to put any mutable state inside them. On the other hand, however, the stage we are implementing here is definitely stateful - its state consists of the accumulated elements. Here is where the GraphStageLogic comes to the rescue - since a new instance of it is created for every materialization of the flow, it is the one and only place to keep the mutable state in.

Within the GraphStageLogic, apart from keeping the mutable state, you may also define handlers for the onPush() and onPull() events. The onPush() event occurs when a new element from the upstream is available and can be acquired using grab(). The onPull(), on the other hand, occurs when the downstream is ready to accept a new element which can be sent with push().

So here is what a draft implementation of the GraphStageLogic with the handlers is going to look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

  setHandlers(in, out, new InHandler with OutHandler {

    override def onPush(): Unit = {
      // ...
    }

    override def onPull(): Unit = {
      // ...
    }
  })
}

To implement the actual accumulating logic, you need to:

  • know how to extract the observed property of the incoming elements,
  • keep track of the incoming elements in some kind of a buffer.

Extracting the observed property

The easiest way to know which property to observe is to have the user provide a function which extracts this property - so you need to adjust the stage definition a bit:

1
2
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

Keeping track of the incoming elements

The internal state of your stage logic will consist of:

  • an Option[P] to keep the current value of the observed property (empty until the first element arrives),
  • a Vector[E] to accumulate the elements (cleared when the observed property changes).

When the next input element arrives (in onPush()), you want to extract its property and check if it differs from the current value. If there is no current value yet or the values are equal, you add the element to the buffer and pull() the input, otherwise you push() the buffer contents downstream and clear the buffer. When the downstream requests a new sequence of elements with onPull(), you just need to pull() the input in order to indicate, that the stage is ready to accept a new incoming element.

An additional case that you need to handle is when the upstream has completed (i.e. no more input elements are going to arrive or there was an error in the upstream) - then you need to push the last elements from the buffer (unless it is empty) and complete the stage afterwards. Moreover, to be nice to memory and the GC, you may wish to clear the buffer after the stage is complete.

The full implementation of the above concepts is going to be something like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

  val in = Inlet[E]("AccumulateWhileUnchanged.in")
  val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    private var currentState: Option[P] = None
    private val buffer = Vector.newBuilder[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (currentState.isEmpty || currentState.contains(nextState)) {
          buffer += nextElement
          pull(in)
        } else {
          val result = buffer.result()
          buffer.clear()
          buffer += nextElement
          push(out, result)
        }

        currentState = Some(nextState)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val result = buffer.result()
        if (result.nonEmpty) {
          emit(out, result)
        }
        completeStage()
      }
    })

    override def postStop(): Unit = {
      buffer.clear()
    }
  }
}

If you are wondering why emit() is used instead of push() in onUsptreamFinish() (line 40), the answer is - because it is not possible to push a port which has not been pulled. Once the upstream is finished, the buffer may still contain the final group of accumulated elements - but chances are that the output port has not been pulled after the previous group was pushed. You want, however, to send the final group anyway - that is where emit() comes to the rescue - when it detects that the output port is not available (i.e. cannot be pushed), it replaces the OutHandler with a temporary one and only then does it execute the actual push().

Now you are ready to use the custom stage in your application with .via(new AccumulateWhileUnchanged(...)). For example, having a simple domain like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case class Element(id: Int, value: Int)

object SampleElements {

  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)

  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)

  val All = Ones ++ Twos ++ Threes
}

when you run:

1
2
3
Source(SampleElements.All)
  .via(new AccumulateWhileUnchanged(_.value))
  .runWith(Sink.foreach(println))

the output will be:

1
2
3
Vector(Element(1,1), Element(2,1), Element(3,1))
Vector(Element(4,2), Element(5,2))
Vector(Element(6,3))

Testing

There is a number of useful utilities to help you test your custom graph stages. With the help of those and using the SampleElements helper defined above, a sample test case for the above stage looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
"AccumulateWhileUnchanged" should {

  "emit accumulated elements when the given property changes" in {
    val (_, sink) = Source(SampleElements.All)
      .via(AccumulateWhileUnchanged(_.value))
      .toMat(TestSink.probe)(Keep.both)
      .run()

    sink.request(42)
    sink.expectNext(SampleElements.Ones, SampleElements.Twos, SampleElements.Threes)
    sink.expectComplete()
  }
}

The TestSink.probe (line 6) creates an instance of akka.stream.testkit.TestSubscriber.Probe, which offers methods such as expectNext() or expectComplete() (lines 10-11) to verify whether the stage behaves correctly.

Summary

After diligently going through this post, you should understand how the GraphStage API is designed and how to use it to implement your own graph stage.

For even more details, please refer to the Custom stream processing section of the Akka Streams documentation.

If you find the AccumulateWhileUnchanged stage useful, there is no need to rewrite it from scratch, since it is a part of akka-stream-contrib - a project which groups various add-ons to Akka Streams core.

Multiple Entrypoints in Docker

Background

When using Docker containers for a number of building blocks of your application, the recommended approach is to run a separate container for every building block, so that the components are well separated. And the Docker best practices suggest running a single process per container.

However, you may imagine a scenario in which it’s reasonable to run multiple services in a single container (an example will follow). Then the question arises how to run those services using a single ENTRYPOINT command in the Dockerfile.

Example scenario

Let’s say your application communicates with an external service through an SSH tunnel (or a VPN, or anything more than a direct connection - you name it). Then, there is a number of ways to set up the tunnel along with the application itself:

  1. Setup the tunnel on the host, run the container with the host network mode (i.e. with the --net=host switch) - which means that the container has no separate network stack but uses the host’s one, so it can just access the tunnel.

  2. Setup the tunnel on the host, run the container in the default network mode (i.e. bridge) and somehow access the tunnel on the host from the container. Somehow here means e.g. using the --add-host switch (with the host’s IP) when running the container, which adds an /etc/hosts entry in the container (see the docs for details). Or you can try some other hacks as well.

  3. Run the tunnel in the container.

Now, the problem with options 1 and 2 is that you lose the run anywhere part of the Docker philosophy, since anywhere becomes limited to anywhere-with-an-ssh-tunnel. Plus, your infrastructure is no more immutable, since by setting up the tunnel you just made a change to the host. A change about which you need to remember every time you run the container somewhere else.

Therefore, option 3 seems to be the way to go. But since Docker allows only a single ENTRYPOINT (to be precise, only the last ENTRYPOINT in the Dockerfile has an effect), you need to find a way to run multiple processes (the tunnel and the application) with a single command. Let’s see how you can do it.

Solution

The simplest idea is to create a shell script to run the required processes and use that script as the ENTRYPOINT. But I wouldn’t write a blogpost about writing a shell script, would I? Instead, let’s dig into the recommended technique which uses supervisord - a process control system.

In the big picture, supervisord is a tool which lets you run multiple programs at once from a single place. The benefits over a plain old shell script are the numerous configuration and monitoring options. Here I’m going to cover only the basic usage, which is just enough for our scenario - feel free to explore the more advanced stuff yourself.

To use supervisord, you first need to install it, preferably using a package manager. In an Ubuntu/Debian-based container you need to add the following to the Dockerfile:

1
RUN apt-get update && apt-get install -y supervisor

Since in this example you’re also going to need an SSH tunnel, let’s install the SSH client too:

1
RUN apt-get update && apt-get install -y supervisor openssh-client

Note: always remember to combine apt-get update and apt-get install into a single command in order to get the latest package versions - see the docs for details.

Now that supervisord and the SSH client are installed, it’s time for some configuration. Let’s assume that your application’s entrypoint is /opt/myapp/bin/myapp. The supervisord configuration will then be something like:

1
2
3
4
5
6
7
8
9
10
11
12
[supervisord]
nodaemon=true
logfile=/var/log/supervisord/supervisord.log
childlogdir=/var/log/myapp

[program:ssh]
command=ssh -N -L8080:localhost:8080 user@example.com

[program:myapp]
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
command=/opt/myapp/bin/myapp --some-configuration

Note: If you put the congfiguration file in the defult location inside the container, i.e. /etc/supervisor/conf.d/supervisord.conf, it will automatically be picked up by supervisord. However, for the sake of this example, let’s assume you chose a custom location inside the container, e.g. /etc/myapp/supervisord.conf.

The [supervisord] section configures the main supervisord process. The nodaemon=true indicates that the process should stay in the foreground (as the container would be terminated otherwise). Additionally, you can specify a log file for supervisord logs (with the logfile parameter) and a directory for the messages captured from the stdout and stderr of the child processes (here: ssh and myapp) - with the childlogdir parameter.

Next come the configurations of your processes that will run in the container.

For ssh you define your arbitrary port forwarding with the -L switch. Plus you can use the -N flag, which means that no remote command would be executed - which is useful for just forwarding ports, according to the SSH man page.

In myapp configuration the stdout_logfile parameter indicates where the stdout of the process should go - in this case it goes to the container’s stdout. A log rotation policy can be configured with stdout_logfile_maxbytes, where a value of zero means no rotation. The command parameter is self-explanatory - this is the full command to run your application.

Having configured supervisord, the last step is to actually run it when the container starts - with the ENTRYPOINT command:

1
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/myapp/supervisord.conf"]

Here you can see that we use the -c switch to provide the path to a custom configuration file. This wouldn’t be necessary if the configuration was in /etc/supervisor/conf.d/supervisord.conf.

After running the container you should see a few logs from supervisord, indicating that both the daemon and your processes have been started:

1
2
3
4
5
6
2016-02-09 16:47:13,949 CRIT Supervisor running as root (no user in config file)
2016-02-09 16:47:13,951 INFO supervisord started with pid 1
2016-02-09 16:47:14,953 INFO spawned: 'ssh' with pid 8
2016-02-09 16:47:14,954 INFO spawned: 'myapp' with pid 9
2016-02-09 16:47:16,754 INFO success: ssh entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2016-02-09 16:47:16,755 INFO success: myapp entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

Summary

You have just learned the recommended way to run multiple processes in a Docker container - if you know what you’re doing, i.e. you really need more than one process in your container. Please refer to supervisord docs if your scenario is anything more than this basic one.

SSL Client Certificates on the JVM

Background

The most common scenario when using SSL/TLS is the basic handshake where the server is the only party that is authenticated with its certificate - the client remains unauthenticated. We may then connect to the server just knowing its address:

1
openssl s_client -connect google.com:443

In this post I’m going to deal with a less popular scenario - the client-authenticated handshake - in which the client is required to present its certificate as well and use its private key.

Let’s assume our secure server is secure.server.com:443 and we already have the client’s certificate in client.crt and the client’s private key in client.key, both of them in the PEM format. We can again use s_client to test the connection, but this time we need to present the certificate and private key:

1
openssl s_client -connect secure.server.com:443 -cert client.crt -key client.key

However, things get a little bit less straightforward on the JVM. Any secure HTTP connection on the JVM, no matter which library you use, boils down to using the javax.net.ssl.HttpsURLConnection, which is a part of the Java Secure Socket Extension (JSSE).

JSSE, key stores and trust stores

Among other stuff, JSSE has a concept of key stores and trust stores. The former are containers for keys/certificates presented to the server, the latter let the JVM know whether a given server certificate is signed by a trusted Certificate Authority (CA). The default format for both stores is JKS (which stands for Java keystore), but JSSE is also capable of reading the PKCS #12 format.

Custom key store

As you may already have guessed, in order to use the aforementioned client’s certificate and key, we need to store them in a keystore. We’ll go for the PKCS #12 format and use openssl to do the necessary conversions:

1
openssl pkcs12 -export -out keystore.p12 -in client.crt -inkey client.key

Please make sure not to provide an empty password when openssl prompts you - not only is it unreasonable from the security point of view, but it will also make mysterious NullPointerExceptions fly around when you attempt to use a key store which has an empty password.

In order for the JVM to use the custom key store, you need to set the following system properties:

  -Djavax.net.ssl.keyStore=keystore.p12
  -Djavax.net.ssl.keyStoreType=pkcs12
  -Djavax.net.ssl.keyStorePassword=<password>

where <password> is the key store password you chose when prompted by openssl. You may of course set those properties at runtime by calling System.getProperties().put(key, value) (in Java) or sys.props += key -> value (in Scala).

Provided that the certificate of secure.server.com is signed by a trusted CA, the steps so far are enough to get up and running. However, if the server’s certificate is a self-signed one, you need an additional step, which is telling JSSE to trust the self-signed certificate.

Custom trust store

We’re going to achieve this by creating a trust store containing the certificate of the CA (the untrusted one) which signed the server’s certificate. But where do we take the CA’s certificate from? Once again openssl comes to the rescue. After executing

1
openssl s_client -connect secure.server.com:443 -showcerts < /dev/null

you’re going to see - among other output - a number of certificates in the PEM format, i.e. something like:

  -----BEGIN CERTIFICATE-----
  (some Base64 content)
  -----END CERTIFICATE-----

You’re interested in the last certificate in the sequence, which is going to be the CA’s certificate - you need to save it (including the BEGIN/END CERTIFICATE lines) into a file, e.g. ca.crt.

Now it’s time to decide whether you want to import the CA’s certificate into the global JSSE trust store or just to create a local trust store with a single certificate. The global trust store contains certificates of trusted CAs like VeriSign/Symantec, so it’s necessary if you want to connect to most of the well-known servers like google.com. The tricky part is that when you tell JSSE to use a custom trust store, it won’t be using the global one anymore, so you will only be able to connect to servers whose certificates are signed by the CA in the custom trust store.

Therefore, you have three options to choose from:

  1. Extend the global trust store by importing the untrusted CA’s certificate into it. This is the easiest solution, but you need to remember that it will affect all applications running on the given JVM, i.e. all of them will trust certificates signed by the CA in question.

  2. Make a copy of the global trust store and import the CA’s certificate into the copy, then use the copy as a custom trust store in your application. In this case your application will be able to connect both to the well-known servers and to secure.server.com.

  3. Create a custom trust store with only the certificate of the untrusted CA. Your application is then only going to trust certificated signed by the selected CA and it won’t be able to make a secure connection to a well-known server like google.com.

Let’s now explore the above options in more detail.

1. Extending the global trust store

The global trust store is located in $JAVA_HOME/jre/lib/security/cacerts. To import the ca.crt into the trust store, we’re going to use JDK’s keytool utility (if you have java in the PATH, you should have keytool as well):

1
2
keytool -import -file ca.crt -alias "CA alias of your choice" \
        -keystore $JAVA_HOME/jre/lib/security/cacerts

Note: the default password for the global trust store is changeit (yes, not the most secure one).

Since the global trust store is used by default in a JVM application, no further configuration is needed.

2. Using an extended copy of the global trust store

First simply create a copy of the global trust store:

1
cp $JAVA_HOME/jre/lib/security/cacerts my-cacerts.jks

Then import ca.crt like in the previous case (again, the default password is changeit):

1
keytool -import -file ca.crt -alias "CA alias of your choice" -keystore my-cacerts.jks

Finally, you need to tell the JVM to use the custom trust store by setting the following system properties:

  -Djavax.net.ssl.trustStore=my-cacerts.jks
  -Djavax.net.ssl.trustStoreType=JKS
  -Djavax.net.ssl.trustStorePassword=changeit

3. Using a single-certificate trust store

The first step here is to create a new key store (yes, a trust store is a actually a key store):

1
keytool -genkey -dname "cn=CLIENT" -alias truststorekey -keyalg RSA -keystore truststore.jks

The cn value in the dname parameter is an arbitrary name and doesn’t really matter. The same applies to the alias parameter. And again, please remember not to set an empty password.

Then you import ca.crt into the newly created trust store:

1
keytool -import -file ca.crt -alias "CA alias of your choice" -keystore truststore.jks

Finally, you need to tell the JVM to use the custom trust store by setting the following system properties:

  -Djavax.net.ssl.trustStore=truststore.jks
  -Djavax.net.ssl.trustStoreType=JKS
  -Djavax.net.ssl.trustStorePassword=<password>

where <password> is the password you chose when creating your custom trust store.

Summary

Hopefully, this post has shed some light on the not-so-common scenario of a secure JVM client authenticating itself with a certificate and private key. You should now be able to seamlessly implement this kind of authentication in your JVM application.