Concourse CI

I know ‘Docker and Flume with Grandad’ isn’t finished yet but I’ve been tinkering with Concourse CI recently and wanted to understand the bare basics.

As usual, I’ve written about the experience and rather than wait until I’ve got it all formatted for publishing here, I thought I’d offer up a direct link to the github readme for those that are interested.

So, you can check out my notes on how to get Concourse CI up and running for a simple Java project over here (https://github.com/neildunlop/simplejava).

Docker & Flume with Grandad – Defining and Running our Flume Container

We have already seen how we write a Dockerfile that will create an image for us. This image can then be used to make instances of containers that run the image. We have successfully started the RabbitMQ container that will act as an input to our system, but this tutorial is called ‘Docker and Flume with Grandad‘ so we should build our Apache Flume image next and get a container up and running with that.

Note that this section is built upon the tutorial I found at (http://probablyfine.co.uk/2014/05/05/using-docker-with-apache-flume-1/), and the ‘probablyfine/flume‘ base image seems to be one of the more popular flume images available on Docker Hub. If you want more details check out the link.

To create a simple Flume image that we’ll build on and modify, do the following:

  1. Open a terminal
  2. Navigate to the ‘docker-poc’ folder we created earlier, it should already contain our ‘rabbit’ folder we created earlier.
  3. Create a new folder to hold our Dockerfile and associated assets for our Apache Flume container with `mkdir flume`
  4. Navigate into this new folder with `cd flume`.
  5. Create an empty Dockerfile with `touch Dockerfile`.
  6. Edit the newly created file with `nano Dockerfile`.
  7. Add the following content to the Dockerfile we just created:
FROM probablyfine/flume

ADD flume.conf /var/tmp/flume.conf

EXPOSE 44444

ENTRYPOINT [ "flume-ng", "agent", "-c", "/opt/flume/conf", "-f", 
"/var/tmp/flume.conf", "-n", "docker", 
"-Dflume.root.logger=INFO,console" ]

Remember to save the file.

What does this file do?

Like the last Dockerfile we created, this file contains some commands that create the base of our image and perform some operations.

As before, rhe `FROM` command sets the base image that we want to use in our container. This is basically using the ‘probablyfine/flume‘ image as a starting point. You can always go and check out its contents. This particular image has a basic OS and all the software needed to get Flume working.

The `ADD` command is one we have not used before. This command takes a named file from the same directory of the Dockerfile and copies to a specified location and filename in the image. This is an easy way for us to add files to the image. I guess the command name gives that away! Since Flume uses a config file to tell it where data is coming from, how it should process that data and where it should send it to, we will need to create this file before we build this image.

As before, the `EXPOSE‘ command makes a port, 44444 in this case, accessible to other docker machines and allows them to be mapped to ports on the host machine.

The `ENTRYPONT` command allows us to pass commands to the running docker container. Crucially, it allows us to specify what program to use to run those commands. Docker has a default entrypoint of ‘/bin/sh -c’ which means in our rabbitmq image Dockerfile, when we used ‘CMD rabbitmq-server start‘ what docker was doing behind the scenes was ‘/bin/sh -c rabbitmq-server start‘. Docker was passing our commands to the command line.

In this Dockerfile we actually want to pass commands to a different program called ‘flume-ng‘. ‘flume-ng‘ is the program we want to run, everything else are arguments to be passed to ‘flume-ng‘. Specifically, the arguments that Flume expects are: the configuration directory, the name of the configuration file and the name to be given to the running agent. (A Flume instance can run more than one data processing pipeline and each pipeline is managed by an agent).

So, if you want to run things on the command line inside your docker container, use CMD, if you want to pass commands to a specific program inside your docker container, use ENTRYPOINT. See http://blog.stapps.io/docker-cmd-vs-entrypoint/ for a good overview of CMD and ENTRYPOINT.

These four commands pull an image from the Docker Repository, add a config file, expose a port to the outside world and run flume as a service. Voila, one flume server.

Creating our Flume config file

As noted above, our Dockerfile contains a command to ADD a file to the container. The file being added is ‘flume.conf’ and it is used to tell flume where it will get data from, how it will process that data and where it should send that data once it has been processed. To make this ‘flume.conf‘ file perform the following steps:

  1. Navigate to ‘/docker-poc/flume’
  2. Create the file with `touch flume.conf`.
  3. Edit the file using `nano flume.conf`.
  4. Add the following to the file contents:
docker.sources = netcatSource
docker.channels = inMemoryChannel
docker.sinks = logSink

docker.sources.netcatSource.type = netcat
docker.sources.netcatSource.bind = 0.0.0.0
docker.sources.netcatSource.port = 44444
docker.sources.netcatSource.channels = inMemoryChannel

docker.channels.inMemoryChannel.type = memory
docker.channels.inMemoryChannel.capacity = 1000
docker.channels.inMemoryChannel.transactionCapacity = 100

docker.sinks.logSink.type = logger
docker.sinks.logSink.channel = inMemoryChannel

Remember to save the file.

Flume Configuration Overview

While it’s not strictly related to Docker, having a broad overview of how Flume is configured might help us to get things up and running. Flume has three main concepts:

Sources – Places where data comes into the system. It contains a large range of adapter to get data from external systems.
Channels – Pathways data travels down within the system, being processed by various components along the way.
Sinks – Places where data exits the system. Again, a large range of adapters exist to help to connect to external systems.

A Flume instance can run more than one data processing pipeline. Each pipeline is managed by an ‘agent‘. Each agent has a name. When we launch Flume inside our docker container, one of the arguments we pass to Flume in our ENTRYPOINT arguments is the name we want to give the agent. In our case, the name of the agent is ‘docker’.

We use the same name as the starting element in the flume configuration. This is how each agent knows how to identify its configuration elements within the configuration file.

In our configuration we setup a pretty simple pipeline that has a source to get data in, a channel to process that data and a sink to get the data out. The key elements in the configuration are:

  1. A NetcatSource that uses a built in adapter to read data from a port 44444 on the localhost and turn them into Flume events.
  2. An InMemoryChannel that buffers the incoming events in memory to give us some flexibility if events arrive faster than we can process.
  3. A LogSink, which just logs the events it receives as a form of output.

Firing up the Flume Container

Now that we have our Dockerfile in place we can actually execute it and get our container up and running. Assuming you are running on a Mac:

  1. Ensure you are running the `Docker Quickstart Terminal`.
  2. Navigate to the `docker-poc/flume` folder created earlier.
  3. This folder should contain the Dockerfile we just created.
  4. Type `docker build -t flume .` to use the dockerfile in the current directory to create a docker image with the name ‘flume’.
  5. The output should look something like:
Sending build context to Docker daemon 3.072 kB
Step 1 : FROM probablyfine/flume
 ---> dcc1d3a26f17
Step 2 : ADD flume.conf /var/tmp/flume.conf
 ---> f2081cfbe5d5
Removing intermediate container e41cd0d0cfa9
Step 3 : EXPOSE 44444
 ---> Running in 93f57649da85
 ---> 2154236de5e3
Removing intermediate container 93f57649da85
Step 4 : ENTRYPOINT flume-ng agent -c /opt/flume/conf -f /var/tmp/flume.conf -n docker -Dflume.root.logger=INFO,console
 ---> Running in 2cc534aaaece
 ---> 0521c0ecf2b0
Removing intermediate container 2cc534aaaece
Successfully built 0521c0ecf2b0

7. Type `docker images` to list all of the images that exist on host machine. The output will look like:

REPOSITORY  TAG     IMAGE ID       CREATED         SIZE
flume       latest  0521c0ecf2b0   41 seconds ago  418.9 MB
rabbit      latest  68102a356d7d   20 hours ago    300.8 MB
...........

As you can see, the ‘flume’ image that we just built is shown at the top of the list along with the rabbit image we built earlier. Now that the image is in place we can use the image to create create a running Docker container, which is an instance of the image.

8. To start a container using the `flume` image we have created, type:

docker run -e FLUME_AGENT_NAME=docker -e 
FLUME_CONF_FILE=/var/tmp/flume.conf -p 44444:44444 -t flume

The log output will be displayed on your terminal. Something similar to:

Info: Including Hive libraries found via () for Hive access
+ exec /opt/java/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/flume/conf:/opt/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /var/tmp/flume.conf -n docker
2016-05-25 09:07:31,016 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-05-25 09:07:31,024 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/var/tmp/flume.conf
2016-05-25 09:07:31,034 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:logSink
2016-05-25 09:07:31,039 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: logSink Agent: docker
2016-05-25 09:07:31,040 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:logSink
2016-05-25 09:07:31,071 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [docker]
2016-05-25 09:07:31,071 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-05-25 09:07:31,083 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel inMemoryChannel type memory
2016-05-25 09:07:31,087 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel inMemoryChannel
2016-05-25 09:07:31,088 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source netcatSource, type netcat
2016-05-25 09:07:31,111 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: logSink, type: logger
2016-05-25 09:07:31,127 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel inMemoryChannel connected to [netcatSource, logSink]
2016-05-25 09:07:31,145 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{logSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@302477bc counterGroup:{ name:null counters:{} } }} channels:{inMemoryChannel=org.apache.flume.channel.MemoryChannel{name: inMemoryChannel}} }
2016-05-25 09:07:31,155 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel inMemoryChannel
2016-05-25 09:07:31,259 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: inMemoryChannel: Successfully registered new MBean.
2016-05-25 09:07:31,259 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: inMemoryChannel started
2016-05-25 09:07:31,261 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink logSink
2016-05-25 09:07:31,262 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source netcatSource
2016-05-25 09:07:31,263 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
2016-05-25 09:07:31,271 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:44444]

8. Open a new terminal and type `echo foo bar baz | nc 192.168.99.100 44444`
9. You should see ‘OK‘ as a response.
10. Switch back to your original docker terminal and check the last line of the output. It should contain:

2016-05-25 09:14:43,009 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] 
Event: { headers:{} body: 66 6F 6F 20 62 61 72 20 62 61 7A foo bar baz }

What did we just do?

Firing data into the input of our Flume container instance and seeing it appear in the output log proves that we have our flume container up and running. The container is listening on a defined port for input, processing any received messages and then outputting them to it defined output. We have another piece of our system up and running. Next we’ll move on to get our last component, Kafka, into a container.

Docker & Flume with Grandad – Housekeeping

Some Basic Housekeeping

Now that we have our container up and we’ve had a bit of a tinker, we might want to stop the container. Running containers do consume system resources so it’s worth knowing how to stop them. As an additional step, it’s worth knowing how to remove containers once we are done with them. The idea is that it should be easy to recreate a container from an image, so you shouldn’t be wary of removing them.

Listing running containers

To list all of the currently running Docker containers use:

docker ps

The output will be similar to:

CONTAINER ID  IMAGE  COMMAND                CREATED           
15cafdc4760c  rabbit "/docker-entrypoint.s" About an hour ago 

STATUS           PORTS NAMES
Up About an hour ..... big_meitner

(Note I’ve omitted the ports information for the sake of brevity – and apologies for the horrible line wrap).

Notice that the container has a CONTAINER_ID and a NAME. Either of these can be used to specify the container when you want to perform operations on it. The names are supposed to be more memorable than ids, but your mileage may vary!

Stopping a running container

Stopping a running container is easy, use:

docker stop <container_name>

So, in our case this would be:

docker stop big_meitner

If you enter this command you will get a response of ‘big_meitner’ when the container stops.

Starting a container

Clearly when we have stopped a container we might want to actually start it up again to use it some more. You might be tempted to use ‘docker run ….‘ to get things fired up again. But you’d be wrong. ‘docker run‘ starts a new container which is totally separate to the instance you had running previously. The correct way to start up an existing container is:

docker start <container_name>

Obviously, to start a docker container, you need to know its name. The ‘docker ps‘ command only lists running containers and the whole point is that we want to start a container that is not running. To get a list of all containers, running or not, we can use:

docker ps -a

The output will be exactly the same as the ‘docker ps‘ output, there will just be more entries.

CONTAINER ID IMAGE  COMMAND                CREATED           
15cafdc4760c rabbit "/docker-entrypoint.s" About an hour ago 
b01d48197924 jenkins "/bin/tini -- /usr/lo" 10 weeks ago 

STATUS                    PORTS  NAMES
Up About an hour          .....  big_meitner
Exited (143) 10 weeks ago .....  dreamy_wescoff

(again, apologies for the horrible line wrap)

Deleting a container

If you’ve used your docker container for a while and changed a few things in the running container (more on that later) and got your container into an odd state, you can remove it with:

docker rm <container_name>

When the container is removed the command will reply with <container_name> (where ‘container_name’ is replaced with the actual name of your container.
If you get everything wrong, or you have made a change to your Dockerfile and want to make sure it takes effect, you can delete an existing

Listing images

We use our Dockerfile to create images, and we use images to create containers. To view a list of all existing local images that we can make containers from, use:

docker images

The output will be:

REPOSITORY TAG    IMAGE ID     CREATED     SIZE
rabbit     latest ceab5a1ef540 12 days ago 300.8 MB
...

Removing an image

Occassionally you will want to make a change to your Dockerfile and update the image that is used to build your containers. The easy way to remove an image is to use:

docker rmi <image_id>

If an image is being used by a container then you will not be able to remove the image. You will receive an error message similar to:

Failed to remove image (<image_id>): Error response from daemon: 
conflict: unable to delete <image_id> (must be forced) - image is being 
used by stopped container 05434d580fbd.

To force the removal of the image (and the deletion of any containers using that image) you can use:

docker rmi -f <image_id>

The output will tell you which images and containers were deleted as a result of the operation.

Docker & Flume with Grandad – Getting Started

Installing Docker

Installing Docker is an easy process. The easiest way, for Mac and Windows users at least, is to install ‘Docker Toolbox’ from https://www.docker.com/products/docker-toolbox. This will install all the tools necessary to get Docker up and running on your host machine.

Docker files, Docker images and Docker Hub

To define what we want to run inside of our Docker container we use a simple text file called a Dockerfile. A dockerfile is simply a list of commands that will be executed by Docker to build the container. These commands can perform a range of operations. They usually start by defining what base image we want to use for our container and then allow us to modify that image by adding software and making configuration changes. A base image is simply another docker container that has been defined by somebody else that we can resue. Docker has a whole host of these base images that can be reused by anyone, for any purpose.

These base images are shared on ‘Docker Hub‘ (https://hub.docker.com/) and can be downloaded by docker using the instructions in a Dockerfile. Once you have created a useful Docker image you can share it on Docker Hub for others to use, should you choose to. This makes it really easy for us to get a headstart on our Docker use by using images created by other people.

Setting up

Ok, lets get hands on and start creating our first Docker image. Our first docker image will be an image of RabbitMQ that we can fire up and from our host machine we will browse to the web admin UI built into rabbit and send some messages. The image we are going to create makes use of an existing docker image for RabbitMQ. We will use the existing image as a base and make some very simple modifications so that we can use the RabbitMQ container in the way that we want.

We will start slow by getting a basic Docker container up and running. Once this works we’ll modify it a little to make it more useful. Along the way we’ll learn some basic Docker concepts and commands. We will also learn a little about RabbitMQ.

  1. Open a terminal
  2. Create a new folder called ‘docker-poc’ with `mkdir docker-poc`
  3. Navigate into the new directory with `cd docker-poc`
  4. Create a new folder to hold our Dockerfile and associated assets for our RabbitMQ container with `mkdir rabbit`
  5. Navigate into this new folder with `cd rabbit`.
  6. Create an empty Dockerfile with `touch Dockerfile`.
  7. Edit the newly created file with `nano Dockerfile`.
  8. Add the following content to the Dockerfile we just created:
FROM rabbitmq:3-management

EXPOSE 15672

CMD rabbitmq-server start

Save the file.

What does this file do?

As you can probably tell, this Dockerfile contains three commands. If you want to take a look at the offical documentation for Dockerfile commands, head over to (https://docs.docker.com/engine/reference/builder/).

The `FROM` command sets the base image that we want to use in our container. This is basically using somebody else’s Dockerfile as a starting point. Dockerfiles are held in the public DockerHub repository (https://hub.docker.com). Each image in the DockerHub has a unique name. Images can also have tags that allow us to grab different versions of the same image, much like the concept of tags source code repositories. In our case, we are using the `rabbitmq` image and we are specifically using the tagged version that contains the rabbitmq management plugin. You can see it in the repository at (https://hub.docker.com/_/rabbitmq/).

The `EXPOSE` command exposes a port on the container to the outside world. This means that port 15672 in our rabbitmq container will be accessible from the host machine. If we didn’t do this, even if we knew the IP address of the docker container, we wouldn’t be able to hit the port. Port 15672 is the default port that is used by the rabbitmq management plugins web user interface. We’ll connect to this using the web browser on our host machine to confirm that the container is up and running by viewing the rabbit management UI.

The `CMD` command runs the text that follows it on the command line of the running container. This is what our container will do by default when it starts up. In this case, we are running the rabbitmq-server as a service inside the container. Any normal command that can be used in a terminal can be placed here. Its a good way of performing actions on your container just after it has started up.

These three commands pull an image from the Docker Repository, expose a port to the outside world and run rabbitmq as a service. Voila, one rabbitmq server.

Firing up the RabbitMQ Container

Now that we have our Dockerfile in place we can actually execute it and get our container up and running. Assuming you are running on a Mac:

  1. 1. Open the Launcher and locate the `Docker Quickstart Terminal`.
  2. Click the icon. This will fire up a terminal that has Docker configured and running. (On a Mac your Docker instance will run inside a virtual machine. This will have an IP address shown when it starts up. This is usually 192.168.99.100.)
  3. Navigate to the `docker-poc/rabbit` folder created earlier.
  4. This folder should contain the Dockerfile we just created.
  5. Type `docker build -t rabbit .` to use the dockerfile in the current directory to create a docker image with the name ‘rabbit’. (-t tells docker we want to name the resulting image and the ‘.’ is the path to the docker file to build)
  6. The output should look something like:
`Sending build context to Docker daemon 2.048 kB
 Step 1 : FROM rabbitmq:3-management
 ---> f35ba29f7d29
 Step 2 : EXPOSE 15672
 ---> Running in f80cb02caf91
 ---> da643941535c
 Removing intermediate container f80cb02caf91
 Step 3 : CMD service rabbitmq-server
 ---> Running in c4c9d804b41f
 ---> 419f25976a1d
 Removing intermediate container c4c9d804b41f
 Successfully built 419f25976a1d`

7. Type `docker images` to list all of the images that exist on host machine. The output             will look like:

REPOSITORY TAG IMAGE ID CREATED SIZE
 rabbit latest af5eb3695063 5 seconds ago 300.8 MB
 flumecompose_frontend latest dba1f0ee100d 2 weeks ago 419.4 MB
 flumecompose_backend latest 467d59055ebc 2 weeks ago 339 MB
 flumeexample2 latest 6f0089395980 3 weeks ago 419.4 MB
 shedlabshelloworld_web latest 6658786e8a22 4 weeks ago 707 MB
 ...........

As you can see, the ‘rabbit’ image that we just built is shown at the top of the list along with some other relevant details. This list shows all the docker images that are cached locally on the host machine. Having them cached locally means we don’t need to go out to the online Docker Hub if we want to use them. Now that the image is in place we can use the image to create create a running Docker container, which is an instance of the image.

8. To start a container using the `rabbit` image we have created, type                                                 `docker run -d -p 15672:15672 rabbit`.
9. The output will be a unique identifer for the running container, something like                         ‘b01d48197924d57cd24a8a02dbb49e37475bf3a80557361de3296ecc68df7258’.
10. Fire up your browser and navigate to http://192.168.99.100:15672
11. The RabbitMQ Admin UI login screen will appear.
12. Enter a username of ‘guest‘ and a password of ‘guest‘. The Rabbit admin UI will                       appear. This is being served up from within your docker container.

What did we just do?

We glossed over a few things in our instructions, particularly in step 8. Lets take a moment to drill into a little bit more detail. The command we used to start the container was:

docker run -d -p 15672:15672 rabbit

The ‘docker run‘ simply tells docker to execute the run command. Fairly self explanatory.

The next argument, ‘-d‘ tells docker that we want to run the container in a ‘detached’ mode. This simply means that we want it to fire up the container and run it in the background. Once it’s running we should get our command prompt back instead of just hanging while the container runs.

The next argument, ‘-p‘ tells docker that we want to map a port on the docker container to a port on the host machine. The first value, 15672, is the port on the container we want to expose. The second value after the ‘:’ is the port on the host computer we want to map it to. So, in short, when we try to get to port 15672 on the host machine it should actually be redirected to port 15672 on the running docker container.

The last argument is the name of the docker image we want to run in the new container.


NOTE: Exposing and Mapping Ports

You might have noticed that when we started the Docker container we mapped port 15672 on the container to port on 15672 on the host. You might have also noticed that we used the ‘EXPOSE 15672‘ command in our Dockerfile earlier. So, why do we need to use the EXPOSE command in the Dockerfile and also map the ports when we start the container? It looks a little like these two commands do the same thing?

In short, the EXPOSE command tells Docker to expose a port to other docker containers. If you only use EXPOSE then the port will not be available outside of docker. If you use the ‘publish’ command line option (which is what ‘-p’ is) the the port is accessible outside of docker.

There is also another version of publish using an uppercase P. For example:

docker run -d -P rabbit

This will run the docker container and automatically publish all ports, but you don’t get to pick what port number is used on the hosting machine. It’s usually a high, non-memorable number.

As an aside, if you publish a port but forget to use EXPOSE in your Dockerfile, Docker will do an implicit EXPOSE for you.


 

Docker and Flume with Grandad

Introduction

I recently attended an Apache Spark Workshop hosted by the clever guys at DataShed. Ed Thewlis, Managing Director of Datashed, friend and fellow data tinkerer was running the show and invited me to come along and join in. My main objective was to to spend a whole, uninterrupted afternoon getting hands on with Spark and make it do something more interesting that the ubiquitous ‘Word Count’ example.

Spark is one of those technologies that I really love but I’ve not had the chance to do anything meaningful with it in production. I’ve built quite a few personal examples using vehicle and flight tracking data, but nothing that I would consider useful or really interesting. An afternoon with Ed and the team would be the perfect chance to get a no-nonsense look at Hadoop and Spark.

The workshop was, as hoped, very comprehensive and an absolute full throttle ride through building a Hadoop cluster with Spark sat on top to analyse vehicle sales data using a range of Spark features. The workshop made heavy use of Docker to setup and manage the cluster and associated environments.

During the course of the afternoon it became apparent that I didn’t know as much Docker as I thought and had to get a little assistance from my colleague and friend Ben Foster. As Ben was patiently helping me reconfigure Docker to get the best performance out of my slightly struggling baby Mac, Ed wandered over and with a chuckle quipped “Cor! Its like coding with Grandad!” and to be honest, he wasn’t far wrong! The after workshop drinks more than made up for his slight upon my technical honour, but he did have a point.

To fix this very clear gap in my skills I decided, as I usually do, to build ‘something’ with Docker and document the whole thing for three main reasons.

  1. Get some real world experience of Docker and fix all the problems encountered along the way.
  2. Document the whole thing so that I remember it better and can refer back to it later.
  3. Write it up so that I can share my learning with others in the same situation (that’s probably you!)

Overview

By the end of this series you will be able to spin up three Docker containers using Docker compose to allow the containers to see one another and control the order that they start in. One container will be an instance of RabbitMQ that you can access from your machine. Another container will be an instance of Apache Flume that will buffer messages, translate messages from Rabbit to Kafka format and then forward them to Kafka. The last container will be a Kafka instance which will recieve messages and allow you to view them from the terminal on your host machine. It will be possible to spin up all these related containers from a single command and they will all be appropriately configured and working.

The problem

I’ve recently been working on a Global Trading Platform for one of the UK’s leading bookmakers. The purpose of the system is two-fold, the first objective is to remove a rats nest of direct integrations between internal and external systems. The second objective is to create a high performance internal platform that seamlessly integrates data from a range of external systems, standardises that data and distributes it to a range of company systems around the globe. As the platform expands and we integrate more and more systems, the range of technologies that we need to communicate with increases.

The latest addition to the system required us to send the output of the system to another internal system that uses Kafka as its message bus technology. For now, that is our only requirement, get a message from Rabbit and dump it into Kafka in the order that it was received.

The solution

There are a number of ways of tackling this problem, including using Akka and Scala to receive messages from RabbitMQ, translate them and publish them to Kafka, but we’ll talk about that another time. A far simpler approach is to use Apache Flume to simply move messages from Rabbit to Kafka. We don’t want to do any message translation, we simply want to reliably move messages from one message queue to another.

If we take a look at the Apache Flume project (https://flume.apache.org/) we can see a quick description of what it does:

“Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.”

This seems to tick all the boxes for us with ‘distributed’ and ‘reliable’ standing out. A little bit of digging soon tells me that via plugins it can deal with pretty much any type of data, not just log data. Handily there are plugins that allow us to consume and publish data from and to Rabbit and Kafka.

Putting the pieces together

A ‘back of a fag packet’ sketch of what I want to achieve seems to suggest that we need an instance of RabbitMQ exposing a queue that is connected to an instance of Flume, which pushes messages onto a topic on an instance of Kafka. That seems like a lot of stuff to install and configure. It would be useful if I could isolate all that stuff and blow it away and recreate it as needed. This sounds a lot like a job for Docker.

Docker

If we take a look at the Docker homepage (www.docker.com) we get this:

“Docker allows you to package an application with all of its dependencies into a standardized unit for software development.”

Which is slightly cryptic, but basically Docker allows you to host an entire server and any software you want to install on that server inside an isolated container. This container acts like an machine in its own right with its own isolated processes and its own network connections. Docker allows us to stop, start, destroy and recreate that machine whenever we like. From your host machine, the docker container appears just like another machine on the network.

Using Docker we have a nice way of making a totally recreateable instance of the system we are trying to build. Everything is held inside the container and we can blow it away if we get something wrong or we don’t need it any more.

Docker Compose

We could put all of our applications, RabbitMQ, Flume and Kafka, into the same docker container but that feel a little messy. Helpfully, Docker has another component, Docker Compose (https://docs.docker.com/compose/), that helps us run multiple docker containers together as one application. We will make use of Compose to keep things together.

In the next post we’ll actually get started on implementing this stuff.

The Problem Isn’t Agile, it’s probably your team

I’ve recently seen a lot of posts about why Scrum/Kanban/Lean/Whatever does not work and how some of the ceremonies associated with these ways of working are evil and should be abandoned immediately.  It has struck me that many of these discussions are focused on how “Bad managers need Standups” and “Good programmers don’t need ceremonies” and many of these discussions turn into a critique of the process.  However, I don’t believe the problem lies with ‘the process’, whatever your process happens to be.

In my opinion, most processes are inherently tied to the type of people using them.  In what I would consider a terrible organization, a process like waterfall is used to provide ‘guide rails’, ‘training wheels’ or a ‘safety net’ for when staff don’t do their jobs properly.

Processes are used as a guarantee against poorly skilled or poorly committed staff.  They are almost used as a way of spotting when staff aren’t doing what they are told so that we can ‘correct’ their behavior.  In many ways, for this kind of organization, the process is a way of making sure the bare minimum of acceptable work gets done.

In some organizations whole teams of people exist just to manage and regulate the process.  They are simply there to ensure the process is followed.  It’s not an approach that I’m particularly fond of but in a company where there is no trust between the people doing the work and the people who want the work doing it can be one way of getting the bare minimum amount of work done.  There will be no innovation, no initiative but eventually, something will be produced.  It will probably be rubbish, and late, and costly but we can all tick the boxes and praise the process for our forward movement. and crucially, nobody had to think!

The other extreme is where the staff are so engaged, so switched on and so skilled that they live and breathe the ‘inspect and adapt’ mantra.  They know where they need to get to and they know how they want to get there.  They collaborate closely with one another and make sure an honest and open discussion happens between all team members.  The team is probably quite small and alignment is easy, even when the whole team is moving fast.

For these teams, any process other than the lightest of touches is simply a hindrance.  They certainly don’t need managing and they certainly don’t need any ceremony heavy approaches to give a false feeling of alignment and forward motion.  For this team, forward motion happens all the time and they all know why.

The problem is not that the process or any of its associated ceremonies are right or wrong.  The problem usually comes down to the ‘wrong process for the team’.  Sensible organizations ask teams to determine their own rules of engagement and their own ways of working, within some sensible guidelines.  This can work quite well, but there is one additional problem.

A problem that emerges in bigger organizations are that teams are often a mix of differing skills levels, different level of commitment and different levels of engagement.  The problem is that half the team need to be managed.  If they aren’t told to do anything then they won’t do anything, they will sit at their desks and happily fill the day with random things.  These guys love process, process tells them where the bare minimum effort line lies and how little they need to do.  It usually tells them how they need to do it too.

These guys are the equivalent of digital shelf stackers. It’s just a job and when 5pm rolls around they are outta here.  For the other half of the team, process is the thing that slows them down, the pointless meetings, the needless ceremony and the condescending management input.  For them, it’s like training wheels on a superbike… pointless and humiliating.  The team will, at best, perform moderately well.  The fans of process will say that it’s down to process that they are doing well.  The people that don’t like process will say it could be twice as good if the process was removed.  Sadly, if the team moves away from this finely balanced level of process they will probably fall into a pit on unproductivity.

If we kill off the process then half of the team that need process will suffer a drop in productivity.  If you expect them to think and use their initiative, they simply won’t.  You may get some of your more self-motivated staff spend some time trying to mentor the process lovers in how to have more initiative and take more responsibility for their own workload.  This will probably kill their productivity.  Welcome to a death slide into unproductivity.

If we add more process, then the more self-motivated half of the team will be crippled with irrelevant process,  their time spent doing productive things will nose dive and they will slowly start to care less.  In their view, what is the point of working hard when you are always dragging rocks along with you only to then be told ‘you’re not following process’.

Of course, your process loving half of the team will now have much heavier management and be guided much more explicitly.  Their productivity is likely to go up… a little.  This approach means we’re going to need many more managers. usually people that can’t actually do anything apart from manage and report…. you’ve probably got some of those people in a room somewhere… looking busy, now they have an ideal excuse to look even busier while not actually being part of the team.

So, moral of the story is that your team do not all have the same levels of skill, motivation or engagement and because of that they will respond differently to different development methodologies.  You need to get the right methodology for the right team.  It’s about how you execute your process that matters, not what that process is!

Getting Started with Reactive Streams – Part 5

Experiments with Backpressure

Finally! Its been a long haul to get here, but we are finally in a position to be able to conduct some experiments with backpressure.

The inspiration for this set of scenarios to demonstrate backpressure came from an article written Jos Dirksen. You can check it out here.

Scenario – Fast Source, Slowing Sink

We have already seen above a Fast producing source and a fast consuming sink. Things get a little more interesting when we have a sink that consumes messages more slowly over time. This is a classic situation where, normally, the Source would just keep pumping out messages and the Sink would be expected to buffer the messages until it could catch up. As we all know, this rarely happens. Usually the buffer fills and the Sink gets overwhelmed.

To conduct our experiment we need another actor that gets slower at processing messages over time. Introducing the SlowDownActor.scala.

SlowDownActor.scala

package com.datinko.streamstutorial.actors

import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{RequestStrategy, OneByOneRequestStrategy, ActorSubscriber}
import com.typesafe.scalalogging.LazyLogging
import kamon.Kamon

/**
 * An actor that progressively slows as it processes messages.
 */
class SlowDownActor(name: String, delayPerMsg: Long, initialDelay: Long) extends ActorSubscriber with LazyLogging {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  // setup actorname to provided name for better tracing of stats
  val actorName = name
  val consumeCounter = Kamon.metrics.counter("slowdownactor-consumed-counter")

  // default delay is 0
  var delay = 0l

  def this(name: String) {
    this(name, 0, 0)
  }

  def this(name: String, delayPerMsg: Long) {
    this(name, delayPerMsg, 0)
  }

  override def receive: Receive = {

    case OnNext(msg: String) =>
      delay += delayPerMsg
      Thread.sleep(initialDelay + (delay / 1000), delay % 1000 toInt)
      logger.debug(s"Message in slowdown actor sink ${self.path} '$actorName': $msg")
      consumeCounter.increment(1)
    case msg =>
      logger.debug(s"Unknown message $msg in $actorName: ")
  }
}

Once we have this we can add a new scenario to our existing Scenarios.scala object.

Scenarios.scala

package com.datinko.streamstutorial

import akka.actor.Props
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Sink, GraphDSL, RunnableGraph}
import com.datinko.streamstutorial.actors.SlowDownActor
import scala.concurrent.duration._
/**
 * A set of test scenarios to demonstrate Akka Stream back pressure in action.  Metrics are exported
 * to StatsD by Kamon so they can be graphed in Grafana.
 */
object Scenarios {

   ... existing scenarios ...

  def fastPublisherSlowingSubscriber(implicit materializer: ActorMaterializer) = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 20000, "fastProducer"))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 10l)))

      import GraphDSL.Implicits._

      source ~> slowingSink

      ClosedShape
    })
    theGraph
  }
}

If we execute this scenario in Start.scala with

Scenarios.fastPublisherSlowingSubscriber().run()

and take a look at Grafana (having updated our Sink metric to be ‘slowdownactor-consumed-counter’) we get:

This chart shows that as the Sink processes messages an an increasingly slow rate, the producer slows its production rate. These is because of the backpressure in the stream coming from the Sink to the Source. As it struggles to consume messages the Sink signals that it needs messages less often, this causes the Source to slow down. The message production rate is actually controlled by the SlowDownActor (the Sink).

Scenario – Fast Source, Slowing Sink with Drop Buffer

Akka Streams has a built in component, the buffer, that helps us to deal with situations where the Source is going too fast for the Subscriber. This is useful in situations where the Source isn’t reactive and doesn’t respond to backpressure messages.

To illustrate the buffer in action, add the following scenario to Scenarios.scala:

Scenario.scala

def fastPublisherSlowingSubscriberWithDropBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 20000, "fastProducer"))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))

      // create a buffer, with 100 messages, with an overflow
      // strategy that starts dropping the oldest messages when it is getting
      // too far behind.
      val bufferFlow = Flow[String].buffer(100, OverflowStrategy.dropHead)

      import GraphDSL.Implicits._

      source ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario results in the follow chart in Grafana:

From this chart we can see that the Source essentially ignores the Sink as all of its messages are sent to the buffer. Because of the strategy used in the buffer it will never signal any backpressure, it will simply drop messages.

It is interesting to note that once the Source has produced all 6000 messages it was configured to send, the Sink keeps running to empty the buffer of the remaining messages. Bear in mind that the buffer is dropping messages when it contains more than 100 messages.

This in not exactly an ideal arrangement and we can improve upon it.

Scenario – Fast Source, Slowing Sink with Backpressure Buffer

Having a buffer that drops messages is less than ideal. In situations where we have a reactive Source that will respond to backpressure signals but we still want to use a buffer we can simply change the OverflowStrategy of the buffer to OverflowStrategy.backpressure and we get the following chart in Grafana:

As we can see from this chart, The Source ignores the Sink up until the buffer is full. Once the buffer is full it signals backpressure and the Source slows its rate to meet the requirements of the Sink. Once the message run is complete the producer stops and the Sink empties the buffer.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing

So far we have only looked at stream graphs with one Source and one Sink. It would be interesting to look at what happens when we have two Sinks. This also gives us an opportunity to look at another component in the akka stream toolbox.

Let’s create another scenario to test this out:

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriber() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))

      // create the broadcast component
      val broadcast = builder.add(Broadcast[String](2))

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> slowingSink

      ClosedShape
    })
    theGraph
  }

The Broadcast component accepts one input that it then duplicates it to as many outputs as you wish. In this example we have wired two outputs. One output is to our Fast Sink. The other is to our Slowing Sink.

Running this scenario gives us the following Grafana chart:

Looking closely we can see that the whole stream is limited to the rate of the slowest sink. This sink signals that it needs the producer to slow down, so the producer slows, which slows the rate at which it passes messages to the fast sink as well as the slow sink.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Dropping Buffer

Having a stream graph that is limited to the rate of the slowest sink is not ideal. One way of overcoming this limitation is to introduce a buffer between the source and the slow sink. Consider the following scenario.

Scenario.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithDroppingBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val broadcast = builder.add(Broadcast[String](2))

      val bufferFlow = Flow[String].buffer(300, OverflowStrategy.dropHead)

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario we get the following output:

As we can see, the source and the fast sink continue at maximum speed until all the messages are produced. The slow sink proceeds at its own rate because the buffer ensures that it never gets overwhelmed. However, this is at the cost of dropping messages!

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Backpressure Buffer

While buffering is useful when dealing with sinks that are able to processes messages at different rates, dropping messages is far from ideal in most situations. If we use a bigger buffer with a backpressure overflow strategy, we can improve on our previous example.

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithBackpressureBuffer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val broadcast = builder.add(Broadcast[String](2))

      val bufferFlow = Flow[String].buffer(3500, OverflowStrategy.backpressure)

      import GraphDSL.Implicits._

      source ~> broadcast.in
      broadcast.out(0) ~> fastSink
      broadcast.out(1) ~> bufferFlow ~> slowingSink

      ClosedShape
    })
    theGraph
  }

This produces the following output:

In this scenario, the source and the fast sink process messages as fast as possible until the buffer for the slow sink fills to its limit. Once this happens, the buffer signals backpressure and the source slows to the rate of the slow sink.

Scenario – Fast Source With Two Sinks, One Fast, One Slowing with a Balancer

Buffering with backpressure is a good solution to our problem but there is another alternative which is a little more adaptive and ensures good utilisation of both subscribers.

The Balancer is a component that sends messages to an available sink. As the slow subscriber slows, more messages will be sent to the fast sink. Eventually the majority of messages will be sent to the fast sink.

Scenarios.scala

def fastPublisherFastSubscriberAndSlowingSubscriberWithBalancer() = {

    val theGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>

      val source = builder.add(ThrottledProducer.produceThrottled(1 second, 30 milliseconds, 9000, "fastProducer"))
      val fastSink = builder.add(Sink.actorSubscriber(Props(classOf[actors.DelayingActor], "fastSink")))
      val slowingSink = builder.add(Sink.actorSubscriber(Props(classOf[SlowDownActor], "slowingDownSink", 50l)))
      val balancer = builder.add(Balance[String](2))

      import GraphDSL.Implicits._

      source ~> balancer.in
      balancer.out(0) ~> fastSink
      balancer.out(1) ~> slowingSink

      ClosedShape
    })
    theGraph
  }

Running this scenario gives us the following output:

Although its fairly subtle on this graph, as the slow sink gets slower the fast sinks picks up more of the messages. Throughout this scenario the source never slows as it the balancer never signals any backpressure. It is is worth nothing that if the fast sink gets overloaded, it will signal backpressure to the balancer and the balancer will signal the source, which will slow its rate of message production.

Conclusion

This has been a rather lengthy journey into reactive streams using akka-streams and the supporting tooling. Along the way we’ve learnt a lot about how to create stream graphs, how to monitor them and how to use akka actors to encapsulate the functionality we need.

There are more components to explore in akka-streams and more and more common tools are exposing reactive endpoints. For example, both RabbitMQ and Kafka have reactive implementations that allow us to propogate backpressure from our akka streams to those components.

Reactive streams give us a really effective way of building flexible and adaptive message processing pipelines. Hopefully this tutorial has provided enough information for you do take a look for yourselves.

Other posts in this series:

Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github