Kamon – Visualising what’s happening
In their own words,
“Kamon is a reactive-friendly toolkit for monitoring applications that run on top of the Java Virtual Machine. We are specially enthusiastic to applications built with the Typesafe Reactive Platform (using Scala, Akka, Spray and/or Play!) but as long as you are on the JVM, Kamon can help get the monitoring information you need from your application.”
this sounds like a great way of monitoring our application and being able to see what is really happening when we run our stream graphs, but to be able visualise the information that Kamon exposes we’ll need to add a few more components to the mix.
Kamon is able to gather metrics about performance and health of our application and export this information regularly to a backend that is able to store and aggregate this information. Other tools are then able to render automatically updating charts based on this information. A very common collection of components to do this work are Kamon, Graphite, ElasticSearch and Grafana.

- Kamon is a library that uses AspectJ to hook into the method calls made by your ActorSystem and record events of different types. There are a range of actor system metrics that are gathered automatically. You can also add your own metrics to be recorded as the application runs. The default configuration works pretty well but it takes a little digging to get everything up and running, mostly becuase the library is evolving quickly.
- StatsD – Graphite is a network daemon that runs on the Node.js platform and listens for information like counters and timers sent over UDP. StatsD then sends its information to be aggregated by a range of pluggable backend services. In our case, we will use Graphite.
- Grafana is a frontend dashboard that provides a very attractive and flexible way of presenting the information aggregated by Graphite. All of this updates in realtime.
Because getting all these components up and running can be challenging, we are going to use a pre-configured Docker container where all the hard work has already been done. This builds on the Kamon Docker Image that the Kamon team have already created. An even simpler implementation that we will use is based on a tutorial by Nepomuk Seiler which you can see here if you want more details.
Adding Dependencies for Kamon
To be able to use kamon, we need to add a couple of dependencies, some AspectJ configuration and a SBT plugin and a configuration file to our application.
Add the following dependencies and AspectJ configuration to your build.sbt
file.
build.sbt
scalaVersion := "2.11.7"
... existing version numbers ...
val kamonVersion = "0.5.2"
/* dependencies */
libraryDependencies ++= Seq (
... existing dependencies ...
// -- kamon monitoring dependencies --
,"io.kamon" % "kamon-core_2.11" % kamonVersion
,"io.kamon" %% "kamon-core" % kamonVersion
,"io.kamon" %% "kamon-scala" % kamonVersion
,"io.kamon" %% "kamon-akka" % kamonVersion
,"io.kamon" %% "kamon-statsd" % kamonVersion
,"io.kamon" %% "kamon-log-reporter" % kamonVersion
,"io.kamon" %% "kamon-system-metrics" % kamonVersion
,"org.aspectj" % "aspectjweaver" % "1.8.5"
... existing dependencies ...
)
//configure aspectJ plugin to enable Kamon monitoring
aspectjSettings
javaOptions <++= AspectjKeys.weaverOptions in Aspectj
fork in run := true
Kamon depends on AspectJ to inject its monitoring code in all the right places in our akka actor system. To make sure this can happen we need to add a build plug-in to our plugins.sbt
file.
Ensure that your /project/plugins.sbt
looks like this:
logLevel := Level.Warn
// The Typesafe repository
resolvers += Resolver.typesafeRepo("releases")
addSbtPlugin("com.typesafe.sbt" % "sbt-aspectj" % "0.9.4")
Configuring Kamon
Once we have all the dependencies and SBT plugins configured we need to add some more configuration to our application to tell Kamon what to monitor and where to send its data.
Add the following content to /src/main/resources/application.conf
application.conf
akka {
loglevel = INFO
extensions = ["kamon.akka.Akka", "kamon.statsd.StatsD"]
}
# Kamon Metrics
# ~~~~~~~~~~~~~~
kamon {
metric {
# Time interval for collecting all metrics and send the snapshots to all subscribed actors.
tick-interval = 1 seconds
# Disables a big error message that will be typically logged if your application wasn't started
# with the -javaagent:/path-to-aspectj-weaver.jar option. If you are only using KamonStandalone
# it might be ok for you to turn this error off.
disable-aspectj-weaver-missing-error = false
# Specify if entities that do not match any include/exclude filter should be tracked.
track-unmatched-entities = yes
filters {
akka-actor {
includes = ["*/user/*"]
excludes = [ "*/system/**", "*/user/IO-**", "*kamon*" ]
}
akka-router {
includes = ["*/user/*"]
excludes = []
}
akka-dispatcher {
includes = ["*/user/*"]
excludes = []
}
trace {
includes = [ "**" ]
excludes = [ ]
}
}
}
# Controls whether the AspectJ Weaver missing warning should be displayed if any Kamon module requiring AspectJ is
# found in the classpath but the application is started without the AspectJ Weaver.
show-aspectj-missing-warning = yes
statsd {
# Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
# setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere. If you're running Docker on Linux this will probably be 127.0.0.1, if you're running Windows or OSX, like me, it will probably be 192.168.99.100
hostname = "192.168.99.100"
port = 8125
# Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
# kamon.metric.tick-interval setting.
flush-interval = 1 seconds
# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes
# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
subscriptions {
histogram = [ "**" ]
min-max-counter = [ "**" ]
gauge = [ "**" ]
counter = [ "**" ]
trace = [ "**" ]
trace-segment = [ "**" ]
akka-actor = [ "**" ]
akka-dispatcher = [ "**" ]
akka-router = [ "**" ]
system-metric = [ "**" ]
http-server = [ "**" ]
}
# FQCN of the implementation of `kamon.statsd.MetricKeyGenerator` to be instantiated and used for assigning
# metric names. The implementation must have a single parameter constructor accepting a `com.typesafe.config.Config`.
metric-key-generator = kamon.statsd.SimpleMetricKeyGenerator
simple-metric-key-generator {
# Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
# this pattern:
# application.host.entity.entity-name.metric-name
application = "streamstutorial"
# Includes the name of the hostname in the generated metric. When set to false, the scheme for the metrics
# will look as follows:
# application.entity.entity-name.metric-name
include-hostname = true
# Allow users to override the name of the hostname reported by kamon. When changed, the scheme for the metrics
# will have the following pattern:
# application.hostname-override-value.entity.entity-name.metric-name
hostname-override = none
# When the sections that make up the metric names have special characters like dots (very common in dispatcher
# names) or forward slashes (all actor metrics) we need to sanitize those values before sending them to StatsD
# with one of the following strategies:
# - normalize: changes ': ' to '-' and ' ', '/' and '.' to '_'.
# - percent-encode: percent encode the section on the metric name. Please note that StatsD doesn't support
# percent encoded metric names, this option is only useful if using our docker image which has a patched
# version of StatsD or if you are running your own, customized version of StatsD that supports this.
metric-name-normalization-strategy = normalize
}
}
# modules can be disabled at startup using yes/no arguments.
modules {
kamon-log-reporter.auto-start = no
kamon-system-metrics.auto-start = no
#seems like you need to leave this set to 'no' or kamon double reports all statsd metrics..
kamon-statsd.auto-start = no
kamon-akka.auto-start = yes
}
}
This is a fairly standard setup that was lifted straight from the Kamon website. The only points to note are the statsD hostname, which when you’re running Docker on Linux is likely to be 127.0.0.1. If you are running Docker Toolbox on Windows or Mac OSX then its likely to be 192.168.99.100.
Getting the StatsD/Graphite/Grafana Docker Image Running
I’m assuming you have some understanding of Docker and its ability to provide lightweight fully configured images of servers and applications. I wont delve into too much detail here. To get our chosen Docker image running, start the Docker Quickstart Terminal (if you’re on Windows or OSX – if you on Linux you can enter commands directly on the command line). Once its running, enter the following command:
docker run -p 80:80 -p 8125:8125/udp -p 8126:8126 -p 8083:8083 -p 8086:8086 -p 8084:8084 --name kamon-grafana-dashboard muuki88/grafana_graphite:latest
This gets our image running and maps all the ports on the running image to the same ports on our local machine, so as far as local applications are concerned, StatsD, Graphite and Grafana are running locally.
(If you have not run this image before, the image will be pulled from the Docker repository and this may take a little time).
Once the image has been pulled you should be able to open your browser and navigate to:
http://192.168.99.100/
(or http://127.0.0.1/ if you’re running Linux).
If everything worked, you should see something similar to this:

Updating our Application to Report to Kamon
Now we have our Docker container running, we need to make a few minor changes to get our stream graph application to report useful metrics.
Although Kamon is able to expose some standard metrics such as ‘Mailbox Size’ and ‘Time Spent in Mailbox’ I wasn’t able to interpret them usefully so I decided to do something a little more direct and implement my own metrics.
Updating the ThrottledProducer
The first step is to update our ThrottledProducer
so that it increments a counter every time it produces a message. Lets take a look at the fully updated class and look at the details afterwards.
ThrottledProducer.scala
import akka.stream.{SourceShape}
import akka.stream.scaladsl.{Flow, Zip, GraphDSL, Source}
import com.datinko.streamstutorial.Tick
import kamon.Kamon
import scala.concurrent.duration.FiniteDuration
/**
* An Akka Streams Source helper that produces messages at a defined rate.
*/
object ThrottledProducer {
def produceThrottled(initialDelay: FiniteDuration, interval: FiniteDuration, numberOfMessages: Int, name: String) = {
val ticker = Source.tick(initialDelay, interval, Tick)
val numbers = 1 to numberOfMessages
val rangeMessageSource = Source(numbers.map(message => s"Message $message"))
//define a stream to bring it all together..
val throttledStream = Source.fromGraph(GraphDSL.create() { implicit builder =>
//1. create a Kamon counter so we can track number of messages produced
val createCounter = Kamon.metrics.counter("throttledProducer-create-counter")
//define a zip operation that expects a tuple with a Tick and a Message in it..
//(Note that the operations must be added to the builder before they can be used)
val zip = builder.add(Zip[Tick.type, String])
//create a flow to extract the second element in the tuple (our message - we dont need the tick part after this stage)
val messageExtractorFlow = builder.add(Flow[(Tick.type, String)].map(_._2))
//2. create a flow to log performance information to Kamon and pass on the message object unmolested
val statsDExporterFlow = builder.add(Flow[(String)].map{message => createCounter.increment(1); message})
//import this so we can use the ~> syntax
import GraphDSL.Implicits._
//define the inputs for the zip function - it wont fire until something arrives at both inputs, so we are essentially
//throttling the output of this steam
ticker ~> zip.in0
rangeMessageSource ~> zip.in1
//send the output of our zip operation to a processing messageExtractorFlow that just allows us to take the second element of each Tuple, in our case
//this is the message, we dont care about the Tick, it was just for timing and we can throw it away.
//3. Then we route the output of the extractor to a flow that exports data to StatsD
//then route that to the 'out' Sink as before.
zip.out ~> messageExtractorFlow ~> statsDExporterFlow
//4. make sure we pass the right output to the SourceShape outlet
SourceShape(statsDExporterFlow.out)
})
throttledStream
}
}
The first and most obvious change is to import the Kamon dependency, import kamon.Kamon
. The other changes in detail are:
- We create a named Kamon metrics counter. This needs to have a unique name that helps us and Kamon identify it. The counter ‘recording instrument’ as Kamon call it has two simple methods of
increment
and decrement
. We’ll only be using increment
.
- We create a graph flow that expects to receieve a message of type
String
, increments the kamon counter and then passes on the message it received, unchanged. The increment method can take any numeric parameter, or even none but it seems best to be explicit and increment the counter by one for every message it receieves.
- We bolt our new
statsDExporterFlow
onto the end of the existing stream graph so that it actually gets used.
- We change the
SourceShape
call to use the output of the statsDExporterFlow
so that the output of our graph stream is exposed to the outside world as the output of our ThrottledProducer
.
Updating the DelayingActor
The next step is to update our DelayingActor
so that it increments a counter every time it processes a message. Again, lets take a look at the fully updated class and look at the details afterwards.
DelayingActor.scala
package com.datinko.streamstutorial.actors
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnNext}
import akka.stream.actor.{OneByOneRequestStrategy, RequestStrategy, ActorSubscriber}
import com.typesafe.scalalogging.LazyLogging
import kamon.Kamon
/**
* An actor that introduces a fixed delay when processing each message.
*/
//Actor Subscriber trait extension is need so that this actor can be used as part of a stream
class DelayingActor(name: String, delay: Long) extends ActorSubscriber with LazyLogging {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
val actorName = name
val consumeCounter = Kamon.metrics.counter("delayingactor-consumed-counter")
def this(name: String) {
this(name, 0)
}
override def receive: Receive = {
case OnNext(msg: String) =>
Thread.sleep(delay)
logger.debug(s"Message in delaying actor sink ${self.path} '$actorName': $msg")
consumeCounter.increment(1)
case OnComplete =>
logger.debug(s"Completed Messgae received in ${self.path} '$actorName'")
case msg =>
logger.debug(s"Unknown message $msg in $actorName: ")
}
}
As before, we have some very minor changes to our DelayingActor
to make it report metrics to Kamon.
- Add the Kamon dependency,
import kamon.Kamon
.
- Create a counter with a unique name so we can report the number of messages that have been consumed,
val consumeCounter = Kamon.metrics.counter("delayingactor-consumed-counter")
- Increment the counter every time we actually consume a message from the stream,
consumeCounter.increment(1)
Running the Application to Report Metrics
Because Kamon uses AspectJ to inject its monitoring and reporting code, we must run the application so that the AspectJ SBT plugin gets used. The simplest way of doing this is to execute sbt run
from the command line. If you run the application from within your IDE you will see a large reminder that Kamon is not running correctly and no metrics are being recorded, a little something like this:
[ERROR] [12/31/2015 17:03:23.195] [main] [ModuleLoader(akka://kamon)]
___ _ ___ _ _ ___ ___ _ _
/ _ \ | | |_ | | | | | | \/ |(_) (_)
/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _
| _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` |
| | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| |
\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, |
| | __/ |
|_| |___/
It seems like your application was not started with the -javaagent:/path-to-aspectj-weaver.jar option but Kamon detected
the following modules which require AspectJ to work properly:
kamon-akka, kamon-scala
If you need help on setting up the aspectj weaver go to http://kamon.io/introduction/get-started/ for more info. On the
other hand, if you are sure that you do not need or do not want to use the weaver then you can disable this error message
by changing the kamon.show-aspectj-missing-warning setting in your configuration file.
When everything is running correctly you should see:
[info] Running com.datinko.asgard.bifrost.Start
[info] [INFO] [12/31/2015 17:12:21.301] [main] [StatsDExtension(akka://Bifrost)] Starting the Kamon(StatsD) extension
[info] 17:12:24.204 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 1
[info] 17:12:24.229 [Bifrost-akka.actor.default-dispatcher-5] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 2
[info] 17:12:24.239 [Bifrost-akka.actor.default-dispatcher-4] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 3
[info] 17:12:24.259 [Bifrost-akka.actor.default-dispatcher-6] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 4
[info] 17:12:24.294 [Bifrost-akka.actor.default-dispatcher-3] DEBUG c.d.a.b.t.actors.DelayingActor - Message in delaying actor sink akka://Bifrost/user/StreamSupervisor-0/flow-0-1-actorSubscriberSink 'fastSink': M
essage 5
...
Updating Grafana to Show our Metrics
Now that our application is sending metrics to Graphite, we can customise our Grafana dashboard to show us some pretty charts of that data.
- Using your browser, navigate to
http://192.168.99.100/
(or 127.0.0.1 on Linux) and this should show you the default Grafana dashboard.
- A random line chart will be shown at the bottom of the dashboard. Click on the title of the chart, ‘First Graph (click to edit)’ and click ‘Edit’ from the menu that appears.
- The display will change to show some edit controls.
- Click on the ‘Metrics’ tab and from the list shown in the bottom half of the screen click on the ‘select metric’ box. This will display a list of all sources of data that are available.
- Click the ‘stats’ entry.
- Click the ‘select metric’ title that appears in the next box. Click the ‘counters’ entry.
- Click the ‘select metric’ title that appears in the next box. Click the ‘streamstutorial’ entry. (If you called your application something else, then you should see it here).
- Click the ‘select metric’ title that appears in the next box. Click the entry that is shown (this is usually the hostname of your machine).
- Click the ‘select metric’ title that appears in the next box. Click the ‘counter’ entry.
- Click the ‘select metric’ title that appears in the next box. You should now see a list of the counters we defined: ‘throttledProducer-create-counter’ and ‘delayingActor-consumed-counter’. Choose ‘throttledProducer-create-counter’ for now.
- Click the ‘select metric’ title that appears in the next box. Click the ‘count’ entry.
All being well, you should now see a chart showing the rate at which our ThrottledProducer is creating messages and pushing them into the steam.

Following a similar process we can add a line to show the rate at which our Sink is consuming messages.
- Click the ‘Add Query’ button at the bottom right of the screen and follow steps 5 to 9 above.
- At step 10, choose ‘delayingActor-consumed-counter’
- Click the ‘select metric’ title that appears in the next box. Click the ‘count’ entry.
This will add a series line for our Sink so that we can see the Source and Sink message rates on the same chart. With the default settings you will get something looking like this (which doesn’t look quite right!):

At first glance it looks like our Sink is actually consuming twice as many messages as our Source. This is actually just a ‘Display Style’ setting that we can change:
- Click the ‘Display Styles’ tab.
- Ensure the ‘Stack’ option in the ‘Multiple Series’ area is unchecked.

Both series lines will now sit on top of each other. To prove that both lines are actually still present, click on the ‘Metrics’ tab and then click the eye icon that is next to each entry. This will toggle the visibility of each series. Despite the annoying habit of Grafana to change the colour of the series when their visibility is toggled we can prove to ourselves that both series are identical.
In other words, our ThrottledProducer (our Source) is creating and sending messages as fast as the DelayingActor (our Sink) is able to consume them. Believe it or not, this is back pressure in action.
Well, okay.. but maybe we’re not totally convinced? We’ll do some more experiments shortly.
Other posts in this series:
Part One
Part Two
Part Three
Part Four
Part Five
Code can be found on Github