Concourse CI

I know ‘Docker and Flume with Grandad’ isn’t finished yet but I’ve been tinkering with Concourse CI recently and wanted to understand the bare basics.

As usual, I’ve written about the experience and rather than wait until I’ve got it all formatted for publishing here, I thought I’d offer up a direct link to the github readme for those that are interested.

So, you can check out my notes on how to get Concourse CI up and running for a simple Java project over here (https://github.com/neildunlop/simplejava).

Docker & Flume with Grandad – Defining and Running our Flume Container

We have already seen how we write a Dockerfile that will create an image for us. This image can then be used to make instances of containers that run the image. We have successfully started the RabbitMQ container that will act as an input to our system, but this tutorial is called ‘Docker and Flume with Grandad‘ so we should build our Apache Flume image next and get a container up and running with that.

Note that this section is built upon the tutorial I found at (http://probablyfine.co.uk/2014/05/05/using-docker-with-apache-flume-1/), and the ‘probablyfine/flume‘ base image seems to be one of the more popular flume images available on Docker Hub. If you want more details check out the link.

To create a simple Flume image that we’ll build on and modify, do the following:

  1. Open a terminal
  2. Navigate to the ‘docker-poc’ folder we created earlier, it should already contain our ‘rabbit’ folder we created earlier.
  3. Create a new folder to hold our Dockerfile and associated assets for our Apache Flume container with `mkdir flume`
  4. Navigate into this new folder with `cd flume`.
  5. Create an empty Dockerfile with `touch Dockerfile`.
  6. Edit the newly created file with `nano Dockerfile`.
  7. Add the following content to the Dockerfile we just created:
FROM probablyfine/flume

ADD flume.conf /var/tmp/flume.conf

EXPOSE 44444

ENTRYPOINT [ "flume-ng", "agent", "-c", "/opt/flume/conf", "-f", 
"/var/tmp/flume.conf", "-n", "docker", 
"-Dflume.root.logger=INFO,console" ]

Remember to save the file.

What does this file do?

Like the last Dockerfile we created, this file contains some commands that create the base of our image and perform some operations.

As before, rhe `FROM` command sets the base image that we want to use in our container. This is basically using the ‘probablyfine/flume‘ image as a starting point. You can always go and check out its contents. This particular image has a basic OS and all the software needed to get Flume working.

The `ADD` command is one we have not used before. This command takes a named file from the same directory of the Dockerfile and copies to a specified location and filename in the image. This is an easy way for us to add files to the image. I guess the command name gives that away! Since Flume uses a config file to tell it where data is coming from, how it should process that data and where it should send it to, we will need to create this file before we build this image.

As before, the `EXPOSE‘ command makes a port, 44444 in this case, accessible to other docker machines and allows them to be mapped to ports on the host machine.

The `ENTRYPONT` command allows us to pass commands to the running docker container. Crucially, it allows us to specify what program to use to run those commands. Docker has a default entrypoint of ‘/bin/sh -c’ which means in our rabbitmq image Dockerfile, when we used ‘CMD rabbitmq-server start‘ what docker was doing behind the scenes was ‘/bin/sh -c rabbitmq-server start‘. Docker was passing our commands to the command line.

In this Dockerfile we actually want to pass commands to a different program called ‘flume-ng‘. ‘flume-ng‘ is the program we want to run, everything else are arguments to be passed to ‘flume-ng‘. Specifically, the arguments that Flume expects are: the configuration directory, the name of the configuration file and the name to be given to the running agent. (A Flume instance can run more than one data processing pipeline and each pipeline is managed by an agent).

So, if you want to run things on the command line inside your docker container, use CMD, if you want to pass commands to a specific program inside your docker container, use ENTRYPOINT. See http://blog.stapps.io/docker-cmd-vs-entrypoint/ for a good overview of CMD and ENTRYPOINT.

These four commands pull an image from the Docker Repository, add a config file, expose a port to the outside world and run flume as a service. Voila, one flume server.

Creating our Flume config file

As noted above, our Dockerfile contains a command to ADD a file to the container. The file being added is ‘flume.conf’ and it is used to tell flume where it will get data from, how it will process that data and where it should send that data once it has been processed. To make this ‘flume.conf‘ file perform the following steps:

  1. Navigate to ‘/docker-poc/flume’
  2. Create the file with `touch flume.conf`.
  3. Edit the file using `nano flume.conf`.
  4. Add the following to the file contents:
docker.sources = netcatSource
docker.channels = inMemoryChannel
docker.sinks = logSink

docker.sources.netcatSource.type = netcat
docker.sources.netcatSource.bind = 0.0.0.0
docker.sources.netcatSource.port = 44444
docker.sources.netcatSource.channels = inMemoryChannel

docker.channels.inMemoryChannel.type = memory
docker.channels.inMemoryChannel.capacity = 1000
docker.channels.inMemoryChannel.transactionCapacity = 100

docker.sinks.logSink.type = logger
docker.sinks.logSink.channel = inMemoryChannel

Remember to save the file.

Flume Configuration Overview

While it’s not strictly related to Docker, having a broad overview of how Flume is configured might help us to get things up and running. Flume has three main concepts:

Sources – Places where data comes into the system. It contains a large range of adapter to get data from external systems.
Channels – Pathways data travels down within the system, being processed by various components along the way.
Sinks – Places where data exits the system. Again, a large range of adapters exist to help to connect to external systems.

A Flume instance can run more than one data processing pipeline. Each pipeline is managed by an ‘agent‘. Each agent has a name. When we launch Flume inside our docker container, one of the arguments we pass to Flume in our ENTRYPOINT arguments is the name we want to give the agent. In our case, the name of the agent is ‘docker’.

We use the same name as the starting element in the flume configuration. This is how each agent knows how to identify its configuration elements within the configuration file.

In our configuration we setup a pretty simple pipeline that has a source to get data in, a channel to process that data and a sink to get the data out. The key elements in the configuration are:

  1. A NetcatSource that uses a built in adapter to read data from a port 44444 on the localhost and turn them into Flume events.
  2. An InMemoryChannel that buffers the incoming events in memory to give us some flexibility if events arrive faster than we can process.
  3. A LogSink, which just logs the events it receives as a form of output.

Firing up the Flume Container

Now that we have our Dockerfile in place we can actually execute it and get our container up and running. Assuming you are running on a Mac:

  1. Ensure you are running the `Docker Quickstart Terminal`.
  2. Navigate to the `docker-poc/flume` folder created earlier.
  3. This folder should contain the Dockerfile we just created.
  4. Type `docker build -t flume .` to use the dockerfile in the current directory to create a docker image with the name ‘flume’.
  5. The output should look something like:
Sending build context to Docker daemon 3.072 kB
Step 1 : FROM probablyfine/flume
 ---> dcc1d3a26f17
Step 2 : ADD flume.conf /var/tmp/flume.conf
 ---> f2081cfbe5d5
Removing intermediate container e41cd0d0cfa9
Step 3 : EXPOSE 44444
 ---> Running in 93f57649da85
 ---> 2154236de5e3
Removing intermediate container 93f57649da85
Step 4 : ENTRYPOINT flume-ng agent -c /opt/flume/conf -f /var/tmp/flume.conf -n docker -Dflume.root.logger=INFO,console
 ---> Running in 2cc534aaaece
 ---> 0521c0ecf2b0
Removing intermediate container 2cc534aaaece
Successfully built 0521c0ecf2b0

7. Type `docker images` to list all of the images that exist on host machine. The output will look like:

REPOSITORY  TAG     IMAGE ID       CREATED         SIZE
flume       latest  0521c0ecf2b0   41 seconds ago  418.9 MB
rabbit      latest  68102a356d7d   20 hours ago    300.8 MB
...........

As you can see, the ‘flume’ image that we just built is shown at the top of the list along with the rabbit image we built earlier. Now that the image is in place we can use the image to create create a running Docker container, which is an instance of the image.

8. To start a container using the `flume` image we have created, type:

docker run -e FLUME_AGENT_NAME=docker -e 
FLUME_CONF_FILE=/var/tmp/flume.conf -p 44444:44444 -t flume

The log output will be displayed on your terminal. Something similar to:

Info: Including Hive libraries found via () for Hive access
+ exec /opt/java/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/flume/conf:/opt/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /var/tmp/flume.conf -n docker
2016-05-25 09:07:31,016 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-05-25 09:07:31,024 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/var/tmp/flume.conf
2016-05-25 09:07:31,034 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:logSink
2016-05-25 09:07:31,039 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: logSink Agent: docker
2016-05-25 09:07:31,040 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:logSink
2016-05-25 09:07:31,071 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [docker]
2016-05-25 09:07:31,071 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-05-25 09:07:31,083 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel inMemoryChannel type memory
2016-05-25 09:07:31,087 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel inMemoryChannel
2016-05-25 09:07:31,088 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source netcatSource, type netcat
2016-05-25 09:07:31,111 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: logSink, type: logger
2016-05-25 09:07:31,127 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel inMemoryChannel connected to [netcatSource, logSink]
2016-05-25 09:07:31,145 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{logSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@302477bc counterGroup:{ name:null counters:{} } }} channels:{inMemoryChannel=org.apache.flume.channel.MemoryChannel{name: inMemoryChannel}} }
2016-05-25 09:07:31,155 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel inMemoryChannel
2016-05-25 09:07:31,259 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: inMemoryChannel: Successfully registered new MBean.
2016-05-25 09:07:31,259 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: inMemoryChannel started
2016-05-25 09:07:31,261 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink logSink
2016-05-25 09:07:31,262 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source netcatSource
2016-05-25 09:07:31,263 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
2016-05-25 09:07:31,271 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:44444]

8. Open a new terminal and type `echo foo bar baz | nc 192.168.99.100 44444`
9. You should see ‘OK‘ as a response.
10. Switch back to your original docker terminal and check the last line of the output. It should contain:

2016-05-25 09:14:43,009 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] 
Event: { headers:{} body: 66 6F 6F 20 62 61 72 20 62 61 7A foo bar baz }

What did we just do?

Firing data into the input of our Flume container instance and seeing it appear in the output log proves that we have our flume container up and running. The container is listening on a defined port for input, processing any received messages and then outputting them to it defined output. We have another piece of our system up and running. Next we’ll move on to get our last component, Kafka, into a container.

Docker & Flume with Grandad – Housekeeping

Some Basic Housekeeping

Now that we have our container up and we’ve had a bit of a tinker, we might want to stop the container. Running containers do consume system resources so it’s worth knowing how to stop them. As an additional step, it’s worth knowing how to remove containers once we are done with them. The idea is that it should be easy to recreate a container from an image, so you shouldn’t be wary of removing them.

Listing running containers

To list all of the currently running Docker containers use:

docker ps

The output will be similar to:

CONTAINER ID  IMAGE  COMMAND                CREATED           
15cafdc4760c  rabbit "/docker-entrypoint.s" About an hour ago 

STATUS           PORTS NAMES
Up About an hour ..... big_meitner

(Note I’ve omitted the ports information for the sake of brevity – and apologies for the horrible line wrap).

Notice that the container has a CONTAINER_ID and a NAME. Either of these can be used to specify the container when you want to perform operations on it. The names are supposed to be more memorable than ids, but your mileage may vary!

Stopping a running container

Stopping a running container is easy, use:

docker stop <container_name>

So, in our case this would be:

docker stop big_meitner

If you enter this command you will get a response of ‘big_meitner’ when the container stops.

Starting a container

Clearly when we have stopped a container we might want to actually start it up again to use it some more. You might be tempted to use ‘docker run ….‘ to get things fired up again. But you’d be wrong. ‘docker run‘ starts a new container which is totally separate to the instance you had running previously. The correct way to start up an existing container is:

docker start <container_name>

Obviously, to start a docker container, you need to know its name. The ‘docker ps‘ command only lists running containers and the whole point is that we want to start a container that is not running. To get a list of all containers, running or not, we can use:

docker ps -a

The output will be exactly the same as the ‘docker ps‘ output, there will just be more entries.

CONTAINER ID IMAGE  COMMAND                CREATED           
15cafdc4760c rabbit "/docker-entrypoint.s" About an hour ago 
b01d48197924 jenkins "/bin/tini -- /usr/lo" 10 weeks ago 

STATUS                    PORTS  NAMES
Up About an hour          .....  big_meitner
Exited (143) 10 weeks ago .....  dreamy_wescoff

(again, apologies for the horrible line wrap)

Deleting a container

If you’ve used your docker container for a while and changed a few things in the running container (more on that later) and got your container into an odd state, you can remove it with:

docker rm <container_name>

When the container is removed the command will reply with <container_name> (where ‘container_name’ is replaced with the actual name of your container.
If you get everything wrong, or you have made a change to your Dockerfile and want to make sure it takes effect, you can delete an existing

Listing images

We use our Dockerfile to create images, and we use images to create containers. To view a list of all existing local images that we can make containers from, use:

docker images

The output will be:

REPOSITORY TAG    IMAGE ID     CREATED     SIZE
rabbit     latest ceab5a1ef540 12 days ago 300.8 MB
...

Removing an image

Occassionally you will want to make a change to your Dockerfile and update the image that is used to build your containers. The easy way to remove an image is to use:

docker rmi <image_id>

If an image is being used by a container then you will not be able to remove the image. You will receive an error message similar to:

Failed to remove image (<image_id>): Error response from daemon: 
conflict: unable to delete <image_id> (must be forced) - image is being 
used by stopped container 05434d580fbd.

To force the removal of the image (and the deletion of any containers using that image) you can use:

docker rmi -f <image_id>

The output will tell you which images and containers were deleted as a result of the operation.

Docker & Flume with Grandad – Getting Started

Installing Docker

Installing Docker is an easy process. The easiest way, for Mac and Windows users at least, is to install ‘Docker Toolbox’ from https://www.docker.com/products/docker-toolbox. This will install all the tools necessary to get Docker up and running on your host machine.

Docker files, Docker images and Docker Hub

To define what we want to run inside of our Docker container we use a simple text file called a Dockerfile. A dockerfile is simply a list of commands that will be executed by Docker to build the container. These commands can perform a range of operations. They usually start by defining what base image we want to use for our container and then allow us to modify that image by adding software and making configuration changes. A base image is simply another docker container that has been defined by somebody else that we can resue. Docker has a whole host of these base images that can be reused by anyone, for any purpose.

These base images are shared on ‘Docker Hub‘ (https://hub.docker.com/) and can be downloaded by docker using the instructions in a Dockerfile. Once you have created a useful Docker image you can share it on Docker Hub for others to use, should you choose to. This makes it really easy for us to get a headstart on our Docker use by using images created by other people.

Setting up

Ok, lets get hands on and start creating our first Docker image. Our first docker image will be an image of RabbitMQ that we can fire up and from our host machine we will browse to the web admin UI built into rabbit and send some messages. The image we are going to create makes use of an existing docker image for RabbitMQ. We will use the existing image as a base and make some very simple modifications so that we can use the RabbitMQ container in the way that we want.

We will start slow by getting a basic Docker container up and running. Once this works we’ll modify it a little to make it more useful. Along the way we’ll learn some basic Docker concepts and commands. We will also learn a little about RabbitMQ.

  1. Open a terminal
  2. Create a new folder called ‘docker-poc’ with `mkdir docker-poc`
  3. Navigate into the new directory with `cd docker-poc`
  4. Create a new folder to hold our Dockerfile and associated assets for our RabbitMQ container with `mkdir rabbit`
  5. Navigate into this new folder with `cd rabbit`.
  6. Create an empty Dockerfile with `touch Dockerfile`.
  7. Edit the newly created file with `nano Dockerfile`.
  8. Add the following content to the Dockerfile we just created:
FROM rabbitmq:3-management

EXPOSE 15672

CMD rabbitmq-server start

Save the file.

What does this file do?

As you can probably tell, this Dockerfile contains three commands. If you want to take a look at the offical documentation for Dockerfile commands, head over to (https://docs.docker.com/engine/reference/builder/).

The `FROM` command sets the base image that we want to use in our container. This is basically using somebody else’s Dockerfile as a starting point. Dockerfiles are held in the public DockerHub repository (https://hub.docker.com). Each image in the DockerHub has a unique name. Images can also have tags that allow us to grab different versions of the same image, much like the concept of tags source code repositories. In our case, we are using the `rabbitmq` image and we are specifically using the tagged version that contains the rabbitmq management plugin. You can see it in the repository at (https://hub.docker.com/_/rabbitmq/).

The `EXPOSE` command exposes a port on the container to the outside world. This means that port 15672 in our rabbitmq container will be accessible from the host machine. If we didn’t do this, even if we knew the IP address of the docker container, we wouldn’t be able to hit the port. Port 15672 is the default port that is used by the rabbitmq management plugins web user interface. We’ll connect to this using the web browser on our host machine to confirm that the container is up and running by viewing the rabbit management UI.

The `CMD` command runs the text that follows it on the command line of the running container. This is what our container will do by default when it starts up. In this case, we are running the rabbitmq-server as a service inside the container. Any normal command that can be used in a terminal can be placed here. Its a good way of performing actions on your container just after it has started up.

These three commands pull an image from the Docker Repository, expose a port to the outside world and run rabbitmq as a service. Voila, one rabbitmq server.

Firing up the RabbitMQ Container

Now that we have our Dockerfile in place we can actually execute it and get our container up and running. Assuming you are running on a Mac:

  1. 1. Open the Launcher and locate the `Docker Quickstart Terminal`.
  2. Click the icon. This will fire up a terminal that has Docker configured and running. (On a Mac your Docker instance will run inside a virtual machine. This will have an IP address shown when it starts up. This is usually 192.168.99.100.)
  3. Navigate to the `docker-poc/rabbit` folder created earlier.
  4. This folder should contain the Dockerfile we just created.
  5. Type `docker build -t rabbit .` to use the dockerfile in the current directory to create a docker image with the name ‘rabbit’. (-t tells docker we want to name the resulting image and the ‘.’ is the path to the docker file to build)
  6. The output should look something like:
`Sending build context to Docker daemon 2.048 kB
 Step 1 : FROM rabbitmq:3-management
 ---> f35ba29f7d29
 Step 2 : EXPOSE 15672
 ---> Running in f80cb02caf91
 ---> da643941535c
 Removing intermediate container f80cb02caf91
 Step 3 : CMD service rabbitmq-server
 ---> Running in c4c9d804b41f
 ---> 419f25976a1d
 Removing intermediate container c4c9d804b41f
 Successfully built 419f25976a1d`

7. Type `docker images` to list all of the images that exist on host machine. The output             will look like:

REPOSITORY TAG IMAGE ID CREATED SIZE
 rabbit latest af5eb3695063 5 seconds ago 300.8 MB
 flumecompose_frontend latest dba1f0ee100d 2 weeks ago 419.4 MB
 flumecompose_backend latest 467d59055ebc 2 weeks ago 339 MB
 flumeexample2 latest 6f0089395980 3 weeks ago 419.4 MB
 shedlabshelloworld_web latest 6658786e8a22 4 weeks ago 707 MB
 ...........

As you can see, the ‘rabbit’ image that we just built is shown at the top of the list along with some other relevant details. This list shows all the docker images that are cached locally on the host machine. Having them cached locally means we don’t need to go out to the online Docker Hub if we want to use them. Now that the image is in place we can use the image to create create a running Docker container, which is an instance of the image.

8. To start a container using the `rabbit` image we have created, type                                                 `docker run -d -p 15672:15672 rabbit`.
9. The output will be a unique identifer for the running container, something like                         ‘b01d48197924d57cd24a8a02dbb49e37475bf3a80557361de3296ecc68df7258’.
10. Fire up your browser and navigate to http://192.168.99.100:15672
11. The RabbitMQ Admin UI login screen will appear.
12. Enter a username of ‘guest‘ and a password of ‘guest‘. The Rabbit admin UI will                       appear. This is being served up from within your docker container.

What did we just do?

We glossed over a few things in our instructions, particularly in step 8. Lets take a moment to drill into a little bit more detail. The command we used to start the container was:

docker run -d -p 15672:15672 rabbit

The ‘docker run‘ simply tells docker to execute the run command. Fairly self explanatory.

The next argument, ‘-d‘ tells docker that we want to run the container in a ‘detached’ mode. This simply means that we want it to fire up the container and run it in the background. Once it’s running we should get our command prompt back instead of just hanging while the container runs.

The next argument, ‘-p‘ tells docker that we want to map a port on the docker container to a port on the host machine. The first value, 15672, is the port on the container we want to expose. The second value after the ‘:’ is the port on the host computer we want to map it to. So, in short, when we try to get to port 15672 on the host machine it should actually be redirected to port 15672 on the running docker container.

The last argument is the name of the docker image we want to run in the new container.


NOTE: Exposing and Mapping Ports

You might have noticed that when we started the Docker container we mapped port 15672 on the container to port on 15672 on the host. You might have also noticed that we used the ‘EXPOSE 15672‘ command in our Dockerfile earlier. So, why do we need to use the EXPOSE command in the Dockerfile and also map the ports when we start the container? It looks a little like these two commands do the same thing?

In short, the EXPOSE command tells Docker to expose a port to other docker containers. If you only use EXPOSE then the port will not be available outside of docker. If you use the ‘publish’ command line option (which is what ‘-p’ is) the the port is accessible outside of docker.

There is also another version of publish using an uppercase P. For example:

docker run -d -P rabbit

This will run the docker container and automatically publish all ports, but you don’t get to pick what port number is used on the hosting machine. It’s usually a high, non-memorable number.

As an aside, if you publish a port but forget to use EXPOSE in your Dockerfile, Docker will do an implicit EXPOSE for you.


 

Docker and Flume with Grandad

Introduction

I recently attended an Apache Spark Workshop hosted by the clever guys at DataShed. Ed Thewlis, Managing Director of Datashed, friend and fellow data tinkerer was running the show and invited me to come along and join in. My main objective was to to spend a whole, uninterrupted afternoon getting hands on with Spark and make it do something more interesting that the ubiquitous ‘Word Count’ example.

Spark is one of those technologies that I really love but I’ve not had the chance to do anything meaningful with it in production. I’ve built quite a few personal examples using vehicle and flight tracking data, but nothing that I would consider useful or really interesting. An afternoon with Ed and the team would be the perfect chance to get a no-nonsense look at Hadoop and Spark.

The workshop was, as hoped, very comprehensive and an absolute full throttle ride through building a Hadoop cluster with Spark sat on top to analyse vehicle sales data using a range of Spark features. The workshop made heavy use of Docker to setup and manage the cluster and associated environments.

During the course of the afternoon it became apparent that I didn’t know as much Docker as I thought and had to get a little assistance from my colleague and friend Ben Foster. As Ben was patiently helping me reconfigure Docker to get the best performance out of my slightly struggling baby Mac, Ed wandered over and with a chuckle quipped “Cor! Its like coding with Grandad!” and to be honest, he wasn’t far wrong! The after workshop drinks more than made up for his slight upon my technical honour, but he did have a point.

To fix this very clear gap in my skills I decided, as I usually do, to build ‘something’ with Docker and document the whole thing for three main reasons.

  1. Get some real world experience of Docker and fix all the problems encountered along the way.
  2. Document the whole thing so that I remember it better and can refer back to it later.
  3. Write it up so that I can share my learning with others in the same situation (that’s probably you!)

Overview

By the end of this series you will be able to spin up three Docker containers using Docker compose to allow the containers to see one another and control the order that they start in. One container will be an instance of RabbitMQ that you can access from your machine. Another container will be an instance of Apache Flume that will buffer messages, translate messages from Rabbit to Kafka format and then forward them to Kafka. The last container will be a Kafka instance which will recieve messages and allow you to view them from the terminal on your host machine. It will be possible to spin up all these related containers from a single command and they will all be appropriately configured and working.

The problem

I’ve recently been working on a Global Trading Platform for one of the UK’s leading bookmakers. The purpose of the system is two-fold, the first objective is to remove a rats nest of direct integrations between internal and external systems. The second objective is to create a high performance internal platform that seamlessly integrates data from a range of external systems, standardises that data and distributes it to a range of company systems around the globe. As the platform expands and we integrate more and more systems, the range of technologies that we need to communicate with increases.

The latest addition to the system required us to send the output of the system to another internal system that uses Kafka as its message bus technology. For now, that is our only requirement, get a message from Rabbit and dump it into Kafka in the order that it was received.

The solution

There are a number of ways of tackling this problem, including using Akka and Scala to receive messages from RabbitMQ, translate them and publish them to Kafka, but we’ll talk about that another time. A far simpler approach is to use Apache Flume to simply move messages from Rabbit to Kafka. We don’t want to do any message translation, we simply want to reliably move messages from one message queue to another.

If we take a look at the Apache Flume project (https://flume.apache.org/) we can see a quick description of what it does:

“Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.”

This seems to tick all the boxes for us with ‘distributed’ and ‘reliable’ standing out. A little bit of digging soon tells me that via plugins it can deal with pretty much any type of data, not just log data. Handily there are plugins that allow us to consume and publish data from and to Rabbit and Kafka.

Putting the pieces together

A ‘back of a fag packet’ sketch of what I want to achieve seems to suggest that we need an instance of RabbitMQ exposing a queue that is connected to an instance of Flume, which pushes messages onto a topic on an instance of Kafka. That seems like a lot of stuff to install and configure. It would be useful if I could isolate all that stuff and blow it away and recreate it as needed. This sounds a lot like a job for Docker.

Docker

If we take a look at the Docker homepage (www.docker.com) we get this:

“Docker allows you to package an application with all of its dependencies into a standardized unit for software development.”

Which is slightly cryptic, but basically Docker allows you to host an entire server and any software you want to install on that server inside an isolated container. This container acts like an machine in its own right with its own isolated processes and its own network connections. Docker allows us to stop, start, destroy and recreate that machine whenever we like. From your host machine, the docker container appears just like another machine on the network.

Using Docker we have a nice way of making a totally recreateable instance of the system we are trying to build. Everything is held inside the container and we can blow it away if we get something wrong or we don’t need it any more.

Docker Compose

We could put all of our applications, RabbitMQ, Flume and Kafka, into the same docker container but that feel a little messy. Helpfully, Docker has another component, Docker Compose (https://docs.docker.com/compose/), that helps us run multiple docker containers together as one application. We will make use of Compose to keep things together.

In the next post we’ll actually get started on implementing this stuff.

The Problem Isn’t Agile, it’s probably your team

I’ve recently seen a lot of posts about why Scrum/Kanban/Lean/Whatever does not work and how some of the ceremonies associated with these ways of working are evil and should be abandoned immediately.  It has struck me that many of these discussions are focused on how “Bad managers need Standups” and “Good programmers don’t need ceremonies” and many of these discussions turn into a critique of the process.  However, I don’t believe the problem lies with ‘the process’, whatever your process happens to be.

In my opinion, most processes are inherently tied to the type of people using them.  In what I would consider a terrible organization, a process like waterfall is used to provide ‘guide rails’, ‘training wheels’ or a ‘safety net’ for when staff don’t do their jobs properly.

Processes are used as a guarantee against poorly skilled or poorly committed staff.  They are almost used as a way of spotting when staff aren’t doing what they are told so that we can ‘correct’ their behavior.  In many ways, for this kind of organization, the process is a way of making sure the bare minimum of acceptable work gets done.

In some organizations whole teams of people exist just to manage and regulate the process.  They are simply there to ensure the process is followed.  It’s not an approach that I’m particularly fond of but in a company where there is no trust between the people doing the work and the people who want the work doing it can be one way of getting the bare minimum amount of work done.  There will be no innovation, no initiative but eventually, something will be produced.  It will probably be rubbish, and late, and costly but we can all tick the boxes and praise the process for our forward movement. and crucially, nobody had to think!

The other extreme is where the staff are so engaged, so switched on and so skilled that they live and breathe the ‘inspect and adapt’ mantra.  They know where they need to get to and they know how they want to get there.  They collaborate closely with one another and make sure an honest and open discussion happens between all team members.  The team is probably quite small and alignment is easy, even when the whole team is moving fast.

For these teams, any process other than the lightest of touches is simply a hindrance.  They certainly don’t need managing and they certainly don’t need any ceremony heavy approaches to give a false feeling of alignment and forward motion.  For this team, forward motion happens all the time and they all know why.

The problem is not that the process or any of its associated ceremonies are right or wrong.  The problem usually comes down to the ‘wrong process for the team’.  Sensible organizations ask teams to determine their own rules of engagement and their own ways of working, within some sensible guidelines.  This can work quite well, but there is one additional problem.

A problem that emerges in bigger organizations are that teams are often a mix of differing skills levels, different level of commitment and different levels of engagement.  The problem is that half the team need to be managed.  If they aren’t told to do anything then they won’t do anything, they will sit at their desks and happily fill the day with random things.  These guys love process, process tells them where the bare minimum effort line lies and how little they need to do.  It usually tells them how they need to do it too.

These guys are the equivalent of digital shelf stackers. It’s just a job and when 5pm rolls around they are outta here.  For the other half of the team, process is the thing that slows them down, the pointless meetings, the needless ceremony and the condescending management input.  For them, it’s like training wheels on a superbike… pointless and humiliating.  The team will, at best, perform moderately well.  The fans of process will say that it’s down to process that they are doing well.  The people that don’t like process will say it could be twice as good if the process was removed.  Sadly, if the team moves away from this finely balanced level of process they will probably fall into a pit on unproductivity.

If we kill off the process then half of the team that need process will suffer a drop in productivity.  If you expect them to think and use their initiative, they simply won’t.  You may get some of your more self-motivated staff spend some time trying to mentor the process lovers in how to have more initiative and take more responsibility for their own workload.  This will probably kill their productivity.  Welcome to a death slide into unproductivity.

If we add more process, then the more self-motivated half of the team will be crippled with irrelevant process,  their time spent doing productive things will nose dive and they will slowly start to care less.  In their view, what is the point of working hard when you are always dragging rocks along with you only to then be told ‘you’re not following process’.

Of course, your process loving half of the team will now have much heavier management and be guided much more explicitly.  Their productivity is likely to go up… a little.  This approach means we’re going to need many more managers. usually people that can’t actually do anything apart from manage and report…. you’ve probably got some of those people in a room somewhere… looking busy, now they have an ideal excuse to look even busier while not actually being part of the team.

So, moral of the story is that your team do not all have the same levels of skill, motivation or engagement and because of that they will respond differently to different development methodologies.  You need to get the right methodology for the right team.  It’s about how you execute your process that matters, not what that process is!

Getting Started with Reactive Streams – Part 5

Experiments with Backpressure

Finally! Its been a long haul to get here, but we are finally in a position to be able to conduct some experiments with backpressure.

The inspiration for this set of scenarios to demonstrate backpressure came from an article written Jos Dirksen. You can check it out here.

Scenario – Fast Source, Slowing Sink

We have already seen above a Fast producing source and a fast consuming sink. Things get a little more interesting when we have a sink that consumes messages more slowly over time. This is a classic situation where, normally, the Source would just keep pumping out messages and the Sink would be expected to buffer the messages until it could catch up. As we all know, this rarely happens. Usually the buffer fills and the Sink gets overwhelmed.

To conduct our experiment we need another actor that gets slower at processing messages over time. Introducing the SlowDownActor.scala.

SlowDownActor.scala

package com.datinko.streamstutorial.actors

import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{RequestStrategy, OneByOneRequestStrategy, ActorSubscriber}
import com.typesafe.scalalogging.LazyLogging
import kamon.Kamon

/**
 * An actor that progressively slows as it processes messages.
 */
class SlowDownActor(name: String, delayPerMsg: Long, initialDelay: Long) extends ActorSubscriber with LazyLogging {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  // setup actorname to provided name for better tracing of stats
  val actorName = name
  val consumeCounter = Kamon.metrics.counter("slowdownactor-consumed-counter")

  // default delay is 0
  var delay = 0l

  def this(name: String) {
    this(name, 0, 0)
  }

  def this(name: String, delayPerMsg: Long) {
    this(name, delayPerMsg, 0)
  }

  override def receive: Receive = {

    case OnNext(msg: String) =>
      delay += delayPerMsg
      Thread.sleep(initialDelay + (delay / 1000), delay % 1000 toInt)
      logger.debug(s"Message in slowdown actor sink ${self.path} '$actorName': $msg")
      consumeCounter.increment(1)
    case msg =>
      logger.debug(s"Unknown message $msg in $actorName: ")
  }
}

Once we have this we can add a new scenario to our existing Scenarios.scala object.

Scenarios.scala

package com.datinko.streamstutorial

import akka.actor.Props
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Sink, GraphDSL, RunnableGraph}
import com.datinko.streamstutorial.actors.SlowDownActor
import scala.concurrent.duration._
/**
 * A set of test scenarios to demonstrate Akka Stream back pressure in action.  Metrics are exported
 * to StatsD by Kamon so they can be graphed in Grafana.
 */
object Scenarios {

   ... existing scenarios ...

  def fastPublisherSlowingSubscriber(implicit materializer: ActorMaterializer) = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 20000, "fastProducer"))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 10l)))

      import GraphDSL.Implicits._

      source ~> slowingSink

      ClosedShape
    })
    theGraph
  }
}

If we execute this scenario in Start.scala with

Scenarios.fastPublisherSlowingSubscriber().run()

and take a look at Grafana (having updated our Sink metric to be ‘slowdownactor-consumed-counter’) we get:

This chart shows that as the Sink processes messages an an increasingly slow rate, the producer slows its production rate. These is because of the backpressure in the stream coming from the Sink to the Source. As it struggles to consume messages the Sink signals that it needs messages less often, this causes the Source to slow down. The message production rate is actually controlled by the SlowDownActor (the Sink).

Scenario – Fast Source, Slowing Sink with Drop Buffer

Akka Streams has a built in component, the buffer, that helps us to deal with situations where the Source is going too fast for the Subscriber. This is useful in situations where the Source isn’t reactive and doesn’t respond to backpressure messages.

To illustrate the buffer in action, add the following scenario to Scenarios.scala:

Scenario.scala

def fastPublisherSlowingSubscriberWithDropBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 20000, "fastProducer"))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))

      // create a buffer, with 100 messages, with an overflow
      // strategy that starts dropping the oldest messages when it is getting
      // too far behind.
      val bufferFlow = Flow[String].buffer(100, OverflowStrategy.dropHead)

      import GraphDSL.Implicits._

      source ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario results in the follow chart in Grafana:

From this chart we can see that the Source essentially ignores the Sink as all of its messages are sent to the buffer. Because of the strategy used in the buffer it will never signal any backpressure, it will simply drop messages.

It is interesting to note that once the Source has produced all 6000 messages it was configured to send, the Sink keeps running to empty the buffer of the remaining messages. Bear in mind that the buffer is dropping messages when it contains more than 100 messages.

This in not exactly an ideal arrangement and we can improve upon it.

Scenario – Fast Source, Slowing Sink with Backpressure Buffer

Having a buffer that drops messages is less than ideal. In situations where we have a reactive Source that will respond to backpressure signals but we still want to use a buffer we can simply change the OverflowStrategy of the buffer to OverflowStrategy.backpressure and we get the following chart in Grafana:

As we can see from this chart, The Source ignores the Sink up until the buffer is full. Once the buffer is full it signals backpressure and the Source slows its rate to meet the requirements of the Sink. Once the message run is complete the producer stops and the Sink empties the buffer.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing

So far we have only looked at stream graphs with one Source and one Sink. It would be interesting to look at what happens when we have two Sinks. This also gives us an opportunity to look at another component in the akka stream toolbox.

Let’s create another scenario to test this out:

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriber() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))

      // create the broadcast component
      val broadcast = builder.add(Broadcast[String](2))

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> slowingSink

      ClosedShape
    })
    theGraph
  }

The Broadcast component accepts one input that it then duplicates it to as many outputs as you wish. In this example we have wired two outputs. One output is to our Fast Sink. The other is to our Slowing Sink.

Running this scenario gives us the following Grafana chart:

Looking closely we can see that the whole stream is limited to the rate of the slowest sink. This sink signals that it needs the producer to slow down, so the producer slows, which slows the rate at which it passes messages to the fast sink as well as the slow sink.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Dropping Buffer

Having a stream graph that is limited to the rate of the slowest sink is not ideal. One way of overcoming this limitation is to introduce a buffer between the source and the slow sink. Consider the following scenario.

Scenario.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithDroppingBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val broadcast = builder.add(Broadcast[String](2))

      val bufferFlow = Flow[String].buffer(300, OverflowStrategy.dropHead)

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario we get the following output:

As we can see, the source and the fast sink continue at maximum speed until all the messages are produced. The slow sink proceeds at its own rate because the buffer ensures that it never gets overwhelmed. However, this is at the cost of dropping messages!

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Backpressure Buffer

While buffering is useful when dealing with sinks that are able to processes messages at different rates, dropping messages is far from ideal in most situations. If we use a bigger buffer with a backpressure overflow strategy, we can improve on our previous example.

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithBackpressureBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val broadcast = builder.add(Broadcast[String](2))

      val bufferFlow = Flow[String].buffer(3500, OverflowStrategy.backpressure)

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

This produces the following output:

In this scenario, the source and the fast sink process messages as fast as possible until the buffer for the slow sink fills to its limit. Once this happens, the buffer signals backpressure and the source slows to the rate of the slow sink.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Balancer

Buffering with backpressure is a good solution to our problem but there is another alternative which is a little more adaptive and ensures good utilisation of both subscribers.

The Balancer is a component that sends messages to an available sink. As the slow subscriber slows, more messages will be sent to the fast sink. Eventually the majority of messages will be sent to the fast sink.

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithBalancer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val balancer = builder.add(Balance[String](2))

      import GraphDSL.Implicits._

      source ~> balancer.in
      balancer.out(0) ~> fastSink
      balancer.out(1) ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario gives us the following output:

Although its fairly subtle on this graph, as the slow sink gets slower the fast sinks picks up more of the messages. Throughout this scenario the source never slows as it the balancer never signals any backpressure. It is is worth nothing that if the fast sink gets overloaded, it will signal backpressure to the balancer and the balancer will signal the source, which will slow its rate of message production.

Conclusion

This has been a rather lengthy journey into reactive streams using akka-streams and the supporting tooling. Along the way we’ve learnt a lot about how to create stream graphs, how to monitor them and how to use akka actors to encapsulate the functionality we need.

There are more components to explore in akka-streams and more and more common tools are exposing reactive endpoints. For example, both RabbitMQ and Kafka have reactive implementations that allow us to propogate backpressure from our akka streams to those components.

Reactive streams give us a really effective way of building flexible and adaptive message processing pipelines. Hopefully this tutorial has provided enough information for you do take a look for yourselves.

Other posts in this series:

Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github

Getting Started with Reactive Streams – Part 4

Kamon – Visualising what’s happening

In their own words,

“Kamon is a reactive-friendly toolkit for monitoring applications that run on top of the Java Virtual Machine. We are specially enthusiastic to applications built with the Typesafe Reactive Platform (using Scala, Akka, Spray and/or Play!) but as long as you are on the JVM, Kamon can help get the monitoring information you need from your application.”

this sounds like a great way of monitoring our application and being able to see what is really happening when we run our stream graphs, but to be able visualise the information that Kamon exposes we’ll need to add a few more components to the mix.

Kamon is able to gather metrics about performance and health of our application and export this information regularly to a backend that is able to store and aggregate this information. Other tools are then able to render automatically updating charts based on this information. A very common collection of components to do this work are Kamon, Graphite, ElasticSearch and Grafana.

  • Kamon is a library that uses AspectJ to hook into the method calls made by your ActorSystem and record events of different types. There are a range of actor system metrics that are gathered automatically. You can also add your own metrics to be recorded as the application runs. The default configuration works pretty well but it takes a little digging to get everything up and running, mostly becuase the library is evolving quickly.
  • StatsDGraphite is a network daemon that runs on the Node.js platform and listens for information like counters and timers sent over UDP. StatsD then sends its information to be aggregated by a range of pluggable backend services. In our case, we will use Graphite.
  • Grafana is a frontend dashboard that provides a very attractive and flexible way of presenting the information aggregated by Graphite. All of this updates in realtime.

Because getting all these components up and running can be challenging, we are going to use a pre-configured Docker container where all the hard work has already been done. This builds on the Kamon Docker Image that the Kamon team have already created. An even simpler implementation that we will use is based on a tutorial by Nepomuk Seiler which you can see here if you want more details.

Adding Dependencies for Kamon

To be able to use kamon, we need to add a couple of dependencies, some AspectJ configuration and a SBT plugin and a configuration file to our application.

Add the following dependencies and AspectJ configuration to your build.sbt file.

build.sbt

scalaVersion := "2.11.7"

... existing version numbers ...
val kamonVersion    = "0.5.2"


/* dependencies */
libraryDependencies ++= Seq (

  ... existing dependencies ...

  // -- kamon monitoring dependencies --
  ,"io.kamon" % "kamon-core_2.11" % kamonVersion
  ,"io.kamon" %% "kamon-core" % kamonVersion
  ,"io.kamon" %% "kamon-scala" % kamonVersion
  ,"io.kamon" %% "kamon-akka" % kamonVersion
  ,"io.kamon" %% "kamon-statsd" % kamonVersion
  ,"io.kamon" %% "kamon-log-reporter" % kamonVersion
  ,"io.kamon" %% "kamon-system-metrics" % kamonVersion
  ,"org.aspectj" % "aspectjweaver" % "1.8.5"

  ... existing dependencies ...

)

//configure aspectJ plugin to enable Kamon monitoring
aspectjSettings
javaOptions <++= AspectjKeys.weaverOptions in Aspectj
fork in run := true

Kamon depends on AspectJ to inject its monitoring code in all the right places in our akka actor system. To make sure this can happen we need to add a build plug-in to our plugins.sbt file.

Ensure that your /project/plugins.sbt looks like this:

logLevel := Level.Warn

// The Typesafe repository
resolvers += Resolver.typesafeRepo("releases")

addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.4")

Configuring Kamon

Once we have all the dependencies and SBT plugins configured we need to add some more configuration to our application to tell Kamon what to monitor and where to send its data.

Add the following content to /src/main/resources/application.conf

application.conf

akka {
  loglevel = INFO

  extensions = ["kamon.akka.Akka", "kamon.statsd.StatsD"]
}


# Kamon Metrics
# ~~~~~~~~~~~~~~

kamon {

  metric {

    # Time interval for collecting all metrics and send the snapshots to all subscribed actors.
    tick-interval = 1 seconds

    # Disables a big error message that will be typically logged if your application wasn't started
    # with the -javaagent:/path-to-aspectj-weaver.jar option. If you are only using KamonStandalone
    # it might be ok for you to turn this error off.
    disable-aspectj-weaver-missing-error = false

    # Specify if entities that do not match any include/exclude filter should be tracked.
    track-unmatched-entities = yes

    filters {
      akka-actor {
        includes = ["*/user/*"]
        excludes = [ "*/system/**", "*/user/IO-**", "*kamon*" ]
      }

      akka-router {
        includes = ["*/user/*"]
        excludes = []
      }

      akka-dispatcher {
        includes = ["*/user/*"]
        excludes = []
      }

      trace {
        includes = [ "**" ]
        excludes = [ ]
      }
    }
  }

  # Controls whether the AspectJ Weaver missing warning should be displayed if any Kamon module requiring AspectJ is
  # found in the classpath but the application is started without the AspectJ Weaver.
  show-aspectj-missing-warning = yes

  statsd {

    # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
    # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.  If you're running Docker on Linux this will probably be 127.0.0.1, if you're running Windows or OSX, like me, it will probably be 192.168.99.100
    hostname = "192.168.99.100"
    port = 8125

    # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
    # kamon.metric.tick-interval setting.
    flush-interval = 1 seconds

    # Max packet size for UDP metrics data sent to StatsD.
    max-packet-size = 1024 bytes

    # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
    # collection for your desired entities must be activated under the kamon.metrics.filters settings.
    subscriptions {
      histogram       = [ "**" ]
      min-max-counter = [ "**" ]
      gauge           = [ "**" ]
      counter         = [ "**" ]
      trace           = [ "**" ]
      trace-segment   = [ "**" ]
      akka-actor      = [ "**" ]
      akka-dispatcher = [ "**" ]
      akka-router     = [ "**" ]
      system-metric   = [ "**" ]
      http-server     = [ "**" ]
    }

    # FQCN of the implementation of `kamon.statsd.MetricKeyGenerator` to be instantiated and used for assigning
    # metric names. The implementation must have a single parameter constructor accepting a `com.typesafe.config.Config`.
    metric-key-generator = kamon.statsd.SimpleMetricKeyGenerator

    simple-metric-key-generator {

      # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
      # this pattern:
      #    application.host.entity.entity-name.metric-name
      application = "streamstutorial"

      # Includes the name of the hostname in the generated metric. When set to false, the scheme for the metrics
      # will look as follows:
      #    application.entity.entity-name.metric-name
      include-hostname = true

      # Allow users to override the name of the hostname reported by kamon. When changed, the scheme for the metrics
      # will have the following pattern:
      #   application.hostname-override-value.entity.entity-name.metric-name
      hostname-override = none

      # When the sections that make up the metric names have special characters like dots (very common in dispatcher
      # names) or forward slashes (all actor metrics) we need to sanitize those values before sending them to StatsD
      # with one of the following strategies:
      #   - normalize: changes ': ' to '-' and ' ', '/' and '.' to '_'.
      #   - percent-encode: percent encode the section on the metric name. Please note that StatsD doesn't support
      #     percent encoded metric names, this option is only useful if using our docker image which has a patched
      #     version of StatsD or if you are running your own, customized version of StatsD that supports this.
      metric-name-normalization-strategy = normalize
    }
  }

  # modules can be disabled at startup using yes/no arguments.
  modules {
    kamon-log-reporter.auto-start = no
    kamon-system-metrics.auto-start = no
    #seems like you need to leave this set to 'no' or kamon double reports all statsd metrics..
    kamon-statsd.auto-start = no
    kamon-akka.auto-start = yes
  }
}

This is a fairly standard setup that was lifted straight from the Kamon website. The only points to note are the statsD hostname, which when you’re running Docker on Linux is likely to be 127.0.0.1. If you are running Docker Toolbox on Windows or Mac OSX then its likely to be 192.168.99.100.

Getting the StatsD/Graphite/Grafana Docker Image Running

I’m assuming you have some understanding of Docker and its ability to provide lightweight fully configured images of servers and applications. I wont delve into too much detail here. To get our chosen Docker image running, start the Docker Quickstart Terminal (if you’re on Windows or OSX – if you on Linux you can enter commands directly on the command line). Once its running, enter the following command:

docker run -p 80:80 -p 8125:8125/udp -p 8126:8126 -p 8083:8083 -p 8086:8086 -p 8084:8084 --name kamon-grafana-dashboard muuki88/grafana_graphite:latest

This gets our image running and maps all the ports on the running image to the same ports on our local machine, so as far as local applications are concerned, StatsD, Graphite and Grafana are running locally.

(If you have not run this image before, the image will be pulled from the Docker repository and this may take a little time).

Once the image has been pulled you should be able to open your browser and navigate to:

http://192.168.99.100/ (or http://127.0.0.1/ if you’re running Linux).

If everything worked, you should see something similar to this:

Updating our Application to Report to Kamon

Now we have our Docker container running, we need to make a few minor changes to get our stream graph application to report useful metrics.

Although Kamon is able to expose some standard metrics such as ‘Mailbox Size’ and ‘Time Spent in Mailbox’ I wasn’t able to interpret them usefully so I decided to do something a little more direct and implement my own metrics.

Updating the ThrottledProducer

The first step is to update our ThrottledProducer so that it increments a counter every time it produces a message. Lets take a look at the fully updated class and look at the details afterwards.

ThrottledProducer.scala

import akka.stream.{SourceShape}
import akka.stream.scaladsl.{Flow, Zip, GraphDSL, Source}
import com.datinko.streamstutorial.Tick
import kamon.Kamon

import scala.concurrent.duration.FiniteDuration

/**
 * An Akka Streams Source helper that produces  messages at a defined rate.
 */
object ThrottledProducer {

  def produceThrottled(initialDelay: FiniteDuration, interval: FiniteDuration, numberOfMessages: Int, name: String) = {

    val ticker = Source.tick(initialDelay, interval, Tick)
    val numbers = 1 to numberOfMessages
    val rangeMessageSource = Source(numbers.map(message => s"Message $message"))

    //define a stream to bring it all together..
    val throttledStream = Source.fromGraph(GraphDSL.create() { implicit builder =>

      //1. create a Kamon counter so we can track number of messages produced
      val createCounter = Kamon.metrics.counter("throttledProducer-create-counter")

      //define a zip operation that expects a tuple with a Tick and a Message in it..
      //(Note that the operations must be added to the builder before they can be used)
      val zip = builder.add(Zip[Tick.type, String])

      //create a flow to extract the second element in the tuple (our message - we dont need the tick part after this stage)
      val messageExtractorFlow = builder.add(Flow[(Tick.type, String)].map(_._2))

      //2. create a flow to log performance information to Kamon and pass on the message object unmolested
      val statsDExporterFlow = builder.add(Flow[(String)].map{message => createCounter.increment(1); message})

      //import this so we can use the ~> syntax
      import GraphDSL.Implicits._

      //define the inputs for the zip function - it wont fire until something arrives at both inputs, so we are essentially
      //throttling the output of this steam
      ticker ~> zip.in0
      rangeMessageSource ~> zip.in1

      //send the output of our zip operation to a processing messageExtractorFlow that just allows us to take the second element of each Tuple, in our case
      //this is the message, we dont care about the Tick, it was just for timing and we can throw it away.
      //3. Then we route the output of the extractor to a flow that exports data to StatsD
      //then route that to the 'out' Sink as before.
      zip.out ~> messageExtractorFlow ~> statsDExporterFlow

      //4.  make sure we pass the right output to the SourceShape outlet
      SourceShape(statsDExporterFlow.out)
    })
    throttledStream
  }
}

The first and most obvious change is to import the Kamon dependency, import kamon.Kamon. The other changes in detail are:

  1. We create a named Kamon metrics counter. This needs to have a unique name that helps us and Kamon identify it. The counter ‘recording instrument’ as Kamon call it has two simple methods of increment and decrement. We’ll only be using increment.
  2. We create a graph flow that expects to receieve a message of type String, increments the kamon counter and then passes on the message it received, unchanged. The increment method can take any numeric parameter, or even none but it seems best to be explicit and increment the counter by one for every message it receieves.
  3. We bolt our new statsDExporterFlow onto the end of the existing stream graph so that it actually gets used.
  4. We change the SourceShape call to use the output of the statsDExporterFlow so that the output of our graph stream is exposed to the outside world as the output of our ThrottledProducer.

Updating the DelayingActor

The next step is to update our DelayingActor so that it increments a counter every time it processes a message. Again, lets take a look at the fully updated class and look at the details afterwards.

DelayingActor.scala

package com.datinko.streamstutorial.actors

import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext}
import akka.stream.actor.{OneByOneRequestStrategy, RequestStrategy, ActorSubscriber}
import com.typesafe.scalalogging.LazyLogging
import kamon.Kamon

/**
 * An actor that introduces a fixed delay when processing each message.
 */
//Actor Subscriber trait extension is need so that this actor can be used as part of a stream
class DelayingActor(name: String, delay: Long) extends ActorSubscriber with LazyLogging {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  val actorName = name
  val consumeCounter = Kamon.metrics.counter("delayingactor-consumed-counter")

  def this(name: String) {
    this(name, 0)
  }

  override def receive: Receive = {
    case OnNext(msg: String) =>
      Thread.sleep(delay)
      logger.debug(s"Message in delaying actor sink ${self.path} '$actorName': $msg")
      consumeCounter.increment(1)
    case OnComplete =>
      logger.debug(s"Completed Messgae received in ${self.path} '$actorName'")
    case msg =>
      logger.debug(s"Unknown message $msg in $actorName: ")
  }
}

As before, we have some very minor changes to our DelayingActor to make it report metrics to Kamon.

  1. Add the Kamon dependency, import kamon.Kamon.
  2. Create a counter with a unique name so we can report the number of messages that have been consumed, val consumeCounter = Kamon.metrics.counter("delayingactor-consumed-counter")
  3. Increment the counter every time we actually consume a message from the stream, consumeCounter.increment(1)

Running the Application to Report Metrics

Because Kamon uses AspectJ to inject its monitoring and reporting code, we must run the application so that the AspectJ SBT plugin gets used. The simplest way of doing this is to execute sbt run from the command line. If you run the application from within your IDE you will see a large reminder that Kamon is not running correctly and no metrics are being recorded, a little something like this:

[ERROR] [12/31/2015 17:03:23.195] [main] [ModuleLoader(akka://kamon)] 

  ___                           _      ___   _    _                                 ___  ___ _            _
 / _ \                         | |    |_  | | |  | |                                |  \/  |(_)          (_)
/ /_\ \ ___  _ __    ___   ___ | |_     | | | |  | |  ___   __ _ __   __ ___  _ __  | .  . | _  ___  ___  _  _ __    __ _
|  _  |/ __|| '_ \  / _ \ / __|| __|    | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \  / _` |
| | | |\__ \| |_) ||  __/| (__ | |_ /\__/ / \  /\  /|  __/| (_| | \ V /|  __/| |    | |  | || |\__ \\__ \| || | | || (_| |
\_| |_/|___/| .__/  \___| \___| \__|\____/   \/  \/  \___| \__,_|  \_/  \___||_|    \_|  |_/|_||___/|___/|_||_| |_| \__, |
            | |                                                                                                      __/ |
            |_|                                                                                                     |___/

 It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected
 the following modules which require AspectJ to work properly:

      kamon-akka, kamon-scala

 If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the
 other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message
 by changing the kamon.show-aspectj-missing-warning setting in your configuration file.

When everything is running correctly you should see:

[info] Running com.datinko.asgard.bifrost.Start
[info] [INFO] [12/31/2015 17:12:21.301] [main] [StatsDExtension(akka://Bifrost)] Starting the Kamon(StatsD) extension
[info] 17:12:24.204 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 1
[info] 17:12:24.229 [Bifrost-akka.actor.default-dispatcher-5] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 2
[info] 17:12:24.239 [Bifrost-akka.actor.default-dispatcher-4] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 3
[info] 17:12:24.259 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 4
[info] 17:12:24.294 [Bifrost-akka.actor.default-dispatcher-3] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 5
...

Updating Grafana to Show our Metrics

Now that our application is sending metrics to Graphite, we can customise our Grafana dashboard to show us some pretty charts of that data.

  1. Using your browser, navigate to http://192.168.99.100/ (or 127.0.0.1 on Linux) and this should show you the default Grafana dashboard.
  2. A random line chart will be shown at the bottom of the dashboard. Click on the title of the chart, ‘First Graph (click to edit)’ and click ‘Edit’ from the menu that appears.
  3. The display will change to show some edit controls.
  4. Click on the ‘Metrics’ tab and from the list shown in the bottom half of the screen click on the ‘select metric’ box. This will display a list of all sources of data that are available.
  5. Click the ‘stats’ entry.
  6. Click the ‘select metric’ title that appears in the next box. Click the ‘counters’ entry.
  7. Click the ‘select metric’ title that appears in the next box. Click the ‘streamstutorial’ entry. (If you called your application something else, then you should see it here).
  8. Click the ‘select metric’ title that appears in the next box. Click the entry that is shown (this is usually the hostname of your machine).
  9. Click the ‘select metric’ title that appears in the next box. Click the ‘counter’ entry.
  10. Click the ‘select metric’ title that appears in the next box. You should now see a list of the counters we defined: ‘throttledProducer-create-counter’ and ‘delayingActor-consumed-counter’. Choose ‘throttledProducer-create-counter’ for now.
  11. Click the ‘select metric’ title that appears in the next box. Click the ‘count’ entry.

All being well, you should now see a chart showing the rate at which our ThrottledProducer is creating messages and pushing them into the steam.

Following a similar process we can add a line to show the rate at which our Sink is consuming messages.

  1. Click the ‘Add Query’ button at the bottom right of the screen and follow steps 5 to 9 above.
  2. At step 10, choose ‘delayingActor-consumed-counter’
  3. Click the ‘select metric’ title that appears in the next box. Click the ‘count’ entry.

This will add a series line for our Sink so that we can see the Source and Sink message rates on the same chart. With the default settings you will get something looking like this (which doesn’t look quite right!):

At first glance it looks like our Sink is actually consuming twice as many messages as our Source. This is actually just a ‘Display Style’ setting that we can change:

  1. Click the ‘Display Styles’ tab.
  2. Ensure the ‘Stack’ option in the ‘Multiple Series’ area is unchecked.

Both series lines will now sit on top of each other. To prove that both lines are actually still present, click on the ‘Metrics’ tab and then click the eye icon that is next to each entry. This will toggle the visibility of each series. Despite the annoying habit of Grafana to change the colour of the series when their visibility is toggled we can prove to ourselves that both series are identical.

In other words, our ThrottledProducer (our Source) is creating and sending messages as fast as the DelayingActor (our Sink) is able to consume them. Believe it or not, this is back pressure in action.

Well, okay.. but maybe we’re not totally convinced? We’ll do some more experiments shortly.

Other posts in this series:

Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github

Getting Started with Reactive Streams – Part 3

Creating a Reusable Sink

Now that we have a nice reusable, configurable Source object we could do with having a similarly reusable and configurable Sink object that helps us experiment with reactive streams. While we are doing that we’ll wire in some simple logging just so we can see what’s happening within our system.

Adding Dependencies for Logging

As mentioned earlier, we would like to be able to make use of logging within our system to be able to see what is really happening. This will be some pretty simple logging that we will expand upon in the future, but it will do us for now. To be able to use logging, we need to add a couple of dependencies and a configuration file to our application.

Add the following dependencies to your build.sbt file.

build.sbt

/* dependencies */
libraryDependencies ++= Seq (

  ...

  // -- Logging --
  ,"ch.qos.logback" % "logback-classic" % "1.1.2"
  ,"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"

  ...

)

Because we are using Logback as our logging provider, we also need to add some configuration settings to our application to tell logback where to output its logs and what format to write log entries in.

Add a file to /scr/main/resources/logback.xml with the following contents:

logback.xml

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <!-- path to your log file, where you want to store logs -->
        <file>/Users/yourusername/test.log</file>
        <append>false</append>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="debug">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />
    </root>
</configuration>

We wont dwell on this configuration file, its a pretty standard logback configuration file that does everything we need it to do for now.

Say Hello to the DelayingActor

So far in this tutorial we have made very simple Sink objects that don’t do very much apart from discard the messages they receive. Akka actors are a really good way of making a chunk of configurable processing logic that can be reused in a range of stream graphs.

DelayingActor – Overview

To make a standard Akka Actor able to work within a reactive stream and be able to understand backpressure we need to a few minor modifications to a standard actor. The idea of this DelayingActor is to add a fake processing delay to each message it receives. We’ll also take the opportunity to introduce some logging output so that we can log the processing of each message and see what is happening inside our running system.

The DelayingActor – Code

The code for the DelayingActor is as follows:

DelayingActor.scala

package com.datinko.streamstutorial.actors

import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext}
import akka.stream.actor.{OneByOneRequestStrategy, RequestStrategy, ActorSubscriber}
import com.typesafe.scalalogging.LazyLogging

/**
 * An actor that introduces a fixed delay when processing each message.
 */
//Actor Subscriber trait extension is need so that this actor can be used as part of a stream
class DelayingActor(name: String, delay: Long) extends ActorSubscriber with LazyLogging {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  val actorName = name

  def this(name: String) {
    this(name, 0)
  }

  override def receive: Receive = {
    case OnNext(msg: String) =>
      Thread.sleep(delay)
      logger.debug(s"Message in delaying actor sink ${self.path} '$actorName': $msg")
    case OnComplete =>
      logger.debug(s"Completed Messgae received in ${self.path} '$actorName'")      
    case msg =>
      logger.debug(s"Unknown message $msg in $actorName: ")
  }
}

Its worth discussing the minor changes we have had to make to ensure this actor can be used inside a reactive stream.

  • The LazyLogging trait gives us access to the logback logger inside our actor. The lazy logging trait only creates the logger when it is first called. This saves us some resources by ensuring we dont spin up a logger unless we use one, but the downside is slightly worse performance. Note that this is not needed for reactive streams, its just for us to be able to see what is going on inside the actor in a very simple way.
  • To make the actor part of the reactive stream we need to extend ActorSubscriber. This gives us full control of stream backpressure and means that the actor will recieve ActorSubscriberMessage.OnNext,ActorSubscriberMessage.OnComplete andActorSubscriberMessage.OnError messages as well as any other non-stream messages just like any other actor. The one we are most interested in is the OnNext message which is used to pass the next stream message to this actor.
  • When defining the actor as an ActorSubscriber we must also define a RequestStrategy so the actor is able to signal how it wants to control backpressure in the stream, or, in other words, what does the actor do when it wants more or less messages to be sent to it. We have chosen the OneByOneRequestStrategy so that everytime the actor has 0 messages to process it asks for one more.

For more information on the ActorSubscriber check out the documentation

Putting our ThrottledProducer and DelayingActor Together

Now we have some reuseable, configurable components it would be useful to put them together in a couple of scenarios that allow us to explore backpressure in action. To achieve this we’ll make a simple Scenarios object that has a function to setup and run each configuration we are interested in.

Scenarios.scala

package com.datinko.streamstutorial

import akka.actor.Props
import akka.stream.{ClosedShape}
import akka.stream.scaladsl.{Sink, GraphDSL, RunnableGraph}
import com.datinko.streamstutorial.actors.DelayingActor
import scala.concurrent.duration._

/**
 * A set of test scenarios to demonstrate Akka Stream back pressure in action.
 */
object Scenarios {

  def fastPublisherFastSubscriber() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 20 milliseconds, 20000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[DelayingActor], "fastSink")))

      import GraphDSL.Implicits._

      source ~> fastSink

      ClosedShape
    })
    theGraph
  }
}

This function is very much like the SimpleStreams.printSimpleMessagesToConsole function we defined earlier. We create a RunnableGraph that contains both a source and a sink. The only area of interest is how we create the Sink so that it uses the DelayingActor we just defined.

The Sink.actorSubscriber(...) provides an easy way for us to use an ActorSubscriber as a sink in a stream graph.

To execute this from our Start.scala object (with its implicit actor system and implicit materializers) we use:

Scenarios.fastPublisherFastSubscriber().run()

The output will be something similar to:

14:29:30.344 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 1
14:29:30.353 [Bifrost-akka.actor.default-dispatcher-10] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 2
14:29:30.383 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 3
14:29:30.393 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 4
14:29:30.423 [Bifrost-akka.actor.default-dispatcher-10] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 5
14:29:30.433 [Bifrost-akka.actor.default-dispatcher-10] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 6
14:29:30.463 [Bifrost-akka.actor.default-dispatcher-10] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 7
14:29:30.473 [Bifrost-akka.actor.default-dispatcher-10] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': Message 8

From the debug messages we have output to the log we can see that the materializer creates an actor to process the stream graph which has its own unique path in the akka actor system. As hoped, this gives us some basic information that the Source and the Sink are publishing and receiving messages as expected, however, it would be nice to be able to see whats happening to the message flow in a more visual way.  We’ll cover that next.

Other posts in this series:

Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github

Getting Started with Reactive Streams – Part 2

Stream Graphs

In our simple early example of a stream, we did everything in one function. This is rarely how we would do things in a production environment. Often we would want to split up the parts of our stream into components that we can reuse. Once we have these components we will probably want to wire those components together in a range of ways to get the stream we want. Stream Graphs give us a way to do this.

Splitting up our Sources, Flows and Sinks

Although we said that streams has two essential components for them to work, we failed to mention that there is one other type of component that makes it easier to decompose our streams into reusable chunks of functionality.

  • A Flow is something that has exactly one input and exactly one output. A flow is often used to modify the messages that pass through the stream. A stream can contain many flows and there are a range of common operations that can be used within a flow. You can also implement your own operations within a flow.

Creating a Reusable Source

One of the main objectives we would like to achieve in this tutorial is to create a number of streams that we can use to experiment with backpressure and monitor to observe its effects. To do that we will need a Source object that we can reuse in many different scenarios with a range of different message production rates.

Say Hello to the ThrottledProducer

The first step to building such a system is to create a Source that produces the data we want at the rate we want. We then want to be able to wire that source into a range of other components that will consume the messages that are produced. We will call this source the ThrottledProducer. Bear with, there may be a lot of typing before we see results.

The ThrottledProducer – Overview

In order to get a Source produce message at a rate that we can control, we need to create a Stream Graph and wrap it up as a Source that can be reused in another Stream Graph. The ThrottledProducer has the following components within it.

  • The Ticker is a built is akka-stream component that produces simple messages at a defined rate. This is almost perfect for our needs, except that we want to control the content of the messages that our Source produces.
  • The NumberGenerator is simple a Scala Range that contains the number of messages we want. In our case, the content of each message is an actual number. Message 1 contains the number 1, message 2 contains the number 2 and so on.
  • The Zip operation is a build in akka-stream component and key to controlling the rate at which we produce messages inside our source. The Zip operation waits until it has a value at both inputs before it produces an output. The output takes the form of a tuple containing both inputs. In our example our NumberGenerator produces all the messages we want to output almost immediately. The Ticker produces Tick messages at the controlled rate we specify. The messages from the NumberGenerator wait at the input of the Zip until a Tick arrives at its other input. When this happens the Zip outputs a Tuple of form [Tick, String] and sends it on to its output.
  • The ExtractFlow is a simple flow operation that extracts the String element of the Zip output and passes it on. It discards the Tick as it is not needed. We only needed the Tick to control the rate at which messages were produced.

The ThrottledProducer – Code

Enough theory, lets take a look at the code.

ThrottledProducer.scala

package com.datinko.streamstutorial

import akka.stream.{SourceShape}
import akka.stream.scaladsl.{Flow, Zip, GraphDSL, Source}

import scala.concurrent.duration.FiniteDuration

/**
 * An Akka Streams Source helper that produces  messages at a defined rate.
 */
object ThrottledProducer {

  def produceThrottled(initialDelay: FiniteDuration, interval: FiniteDuration, numberOfMessages: Int, name: String) = {

    val ticker = Source.tick(initialDelay, interval, Tick)
    val numbers = 1 to numberOfMessages
    val rangeMessageSource = Source(numbers.map(message => s"Message $message"))

    //define a stream to bring it all together..
    val throttledStream = Source.fromGraph(GraphDSL.create() { implicit builder =>

      //define a zip operation that expects a tuple with a Tick and a Message in it..
      //(Note that the operations must be added to the builder before they can be used)
      val zip = builder.add(Zip[Tick.type, String])

      //create a flow to extract the second element in the tuple (our message - we dont need the tick part after this stage)
      val messageExtractorFlow = builder.add(Flow[(Tick.type, String)].map(_._2))

      //import this so we can use the ~> syntax
      import GraphDSL.Implicits._

      //define the inputs for the zip function - it wont fire until something arrives at both inputs, so we are essentially
      //throttling the output of this steam
      ticker ~> zip.in0
      rangeMessageSource ~> zip.in1

      //send the output of our zip operation to a processing messageExtractorFlow that just allows us to take the second element of each Tuple, in our case
      //this is the string message, we dont care about the Tick, it was just for timing and we can throw it away.
      //route that to the 'out' Sink.
      zip.out ~> messageExtractorFlow

      SourceShape(messageExtractorFlow.out)
    })
    throttledStream
  }
}

The whole purpose of this function is to create a Source stream that we have called throttledStream that can included as part of another Stream Graph.

We start by defining our two Source objects, the ticker and the collection of number messages. The Source object has handy constructors that allow us to pass in many different types of collections that are automatically turned into valid sources. We’ll use these two sources a little later to feed into our Zip operation.

The definition of the Stream Graph is started with:

val throttledStream = Source.fromGraph(.......)

We signal our intention to create a Graph from scratch using:

GraphDSL.create() { implicit builder =>.......}

All GraphDSL operations need reference to a builder object which acts as a context for the Stream Graph. Sources, sinks, flows and opearations are registered with the builder. Once that is done they can be wired together into a stream definition.

The next steps are to create the operations we want within our graph. First we build define the Zip operation that expects to receive inputs of type Tick and String and therefore produce a Tuple with the same type of elements.

val zip = builder.add(Zip[Tick.type, String])

The next operation to add to the builder is the message extractor flow which ensures that we only take the second element in each tuple and discard the Tick element that we no longer need. (The map Flow defines the type of objects it expects to receive while the map operation simply tells the flow to extract and pass on the second element of each message it receives).

val messageExtractorFlow = builder.add(Flow[(Tick.type, String)].map(_._2))

ThrottledProducer – Building the Graph

Once we have defined all the sources, flows and operations that will be used in the graph, we need to wire those elements together to actually form the graph.

To make this easier the GraphDSL provides a number of implicits we can use. To access these we import them using:

import GraphDSL.Implicits._

The most obvious benefit of this is that it allows us to use the ~> operator to wire components together in the graph, like so:

ticker ~> zip.in0

rangeMessageSource ~> zip.in1

zip.out ~> messageExtractorFlow

(Note that this illustrates that the Zip has a number inputs and a single output).

The final element inside the Graph definition is to indicate what kind of Graph component we are creating (Source, Sink or Flow).

SourceShape(messageExtractorFlow.out)

The SourceShape call indicates that this is a source component and we must tell the Graph which of its internal operations supplies the messages that will be exposed to consumers of this source.

(Note that there are SinkShape and FlowShape elements to support making Sinks and Flows).

Once we have build the graph for the throttledSource we return a reference to the function so that it can be used when building another Graph.

Using the ThrottledProducer in a Graph

Now that we have built our ThrottledProducer we want to use it for something. Believe it or not, the cleanest way of showing the ThrottledProducer in action is to use it in another simple Graph and then run that graph.

If we add a new function to the SimpleStreams object we created earlier we get the following. This uses a lot of the concepts we’ve just covered in the ThrottledProducer to build a stream graph, expect that this builds a runnable (also known as a ‘closed’) graph that is complete with a Source and a Sink.

SimpleStreams.scala

def throttledProducerToConsole() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 20 milliseconds, 20000, "fastProducer"))
      val printFlow = builder.add(Flow[(String)].map{println(_)})
      val sink = builder.add(Sink.ignore)

      import GraphDSL.Implicits._

      source ~> printFlow ~> sink

      ClosedShape
    })
    theGraph
  }

The differences to note about this function is that it creates a RunnableGraph which indicates it should have a Source and aSink. This is matched by the use of ClosedShape at the end of the function rather than than the SourceShape we used in the ThrottledProducer.

As we did previously, we define the components to be used in the graph. In this case we have our ThrottledProducer that is set to produce a message every 20 milliseconds after an initial delay of 1 second and it will produce 20000 messages.

To make it a little easier to see if anything is happening we have created a simple flow that prints every message it sees.

This is all capped off by the Sink.ignore that we have seen earlier. This simply gives the graph the sink it must have to be considered valid but this sink simply discards all messages it receives.

To run this graph we simply add the following to our Start.scala object.

SimpleStreams.throttledProducerToConsole.run()

When you execute this application, either from within your IDE or from the command line using sbt run you should see output like:

Message 1
Message 2
Message 3
Message 4
Message 5
...
Message 999

Still not the most impressive of demos but now we have a very neat and reuseable way of defining the major parts of any stream graph. Next we’ll move on to building some Sinks that can help us experiment with reactive streams.

Other posts in this series:

Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github