Skip to content
This repository was archived by the owner on Mar 16, 2025. It is now read-only.

Commit 67b6d4d

Browse files
author
Michel Davit
authored
Graphite carbon support (#23)
* Stub for graphite carbon * Proto fo carbon client * Finalization of the carbon client * Add documentation * Fix typo in doc
1 parent 8f309ed commit 67b6d4d

File tree

9 files changed

+325
-3
lines changed

9 files changed

+325
-3
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The following implementations are supported:
1010

1111
* [datadog](#datadog) (via StatsD)
1212
* [dropwizard](#dropwizard)
13+
* [graphite](#graphite) (via Carbon)
1314
* [prometheus](#prometheus)
1415

1516
## Versions
@@ -230,6 +231,36 @@ import fr.davit.akka.http.metrics.dropwizard.marshalling.DropwizardMarshallers._
230231
val route = (get & path("metrics"))(metrics(registry))
231232
```
232233

234+
### [Graphite](https://graphiteapp.org/)
235+
236+
| metric | dropwizard |
237+
|--------------------|------------------------------|
238+
| requests | akka.http.requests |
239+
| active requests | akka.http.requests.active |
240+
| request sizes | akka.http.requests.bytes |
241+
| responses | akka.http.responses |
242+
| errors | akka.http.responses.errors |
243+
| durations | akka.http.responses.duration |
244+
| response sizes | akka.http.responses.bytes |
245+
| connections | akka.http.connections |
246+
| active connections | akka.http.connections.active |
247+
248+
Add to your `build.sbt`:
249+
250+
```scala
251+
libraryDependencies += "fr.davit" %% "akka-http-metrics-graphite" % <version>
252+
```
253+
254+
Create your carbon client and your registry
255+
256+
```scala
257+
import fr.davit.akka.http.metrics.graphite.{CarbonClient, GraphiteRegistry}
258+
259+
val carbonClient: CarbonClient = CarbonClient("hostname", 2003)
260+
261+
val registry = GraphiteRegistry(carbonClient)
262+
```
263+
233264
### [Prometheus](http://prometheus.io/)
234265

235266
| metric | prometheus |

akka-http-metrics-datadog/src/it/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<configuration debug="false">
44
<appender name="file" class="ch.qos.logback.core.FileAppender">
5-
<file>./target/akka-http-metrics-dropwizard.test.log</file>
5+
<file>./target/akka-http-metrics-datadog.it-test.log</file>
66
<append>false</append>
77
<encoder>
88
<pattern>%date{HH:mm:ss} %-5level %logger{0} {%class %method} - %msg%n</pattern>

akka-http-metrics-dorpwizard/src/main/scala/fr/davit/akka/http/metrics/dropwizard/DropwizardRegistry.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import scala.concurrent.duration.FiniteDuration
77

88
object DropwizardRegistry {
99

10-
val AkkaPrefix = Seq("akka", "http")
10+
private val AkkaPrefix = Seq("akka", "http")
1111

1212
private implicit class RichMetricsRegistry(underlying: MetricRegistry) {
1313

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
akka {
2+
loggers = ["akka.event.slf4j.Slf4jLogger"]
3+
loglevel = "DEBUG"
4+
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
5+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<configuration debug="false">
4+
<appender name="file" class="ch.qos.logback.core.FileAppender">
5+
<file>./target/akka-http-metrics-graphite.it-test.log</file>
6+
<append>false</append>
7+
<encoder>
8+
<pattern>%date{HH:mm:ss} %-5level %logger{0} {%class %method} - %msg%n</pattern>
9+
</encoder>
10+
</appender>
11+
12+
<root level="DEBUG">
13+
<appender-ref ref="file"/>
14+
</root>
15+
</configuration>
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package fr.davit.akka.http.metrics.graphite
2+
3+
import java.net.InetSocketAddress
4+
import java.time.{Clock, Instant, ZoneId}
5+
6+
import akka.actor.{ActorRef, ActorSystem}
7+
import akka.http.scaladsl.model.StatusCodes
8+
import akka.io.{IO, Tcp}
9+
import akka.testkit.{TestActor, TestKit, TestProbe}
10+
import fr.davit.akka.http.metrics.core.HttpMetricsRegistry.{PathDimension, StatusGroupDimension}
11+
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
12+
13+
import scala.concurrent.duration._
14+
15+
class GraphiteRegistrySpec
16+
extends TestKit(ActorSystem("GraphiteRegistrySpec"))
17+
with FlatSpecLike
18+
with Matchers
19+
with BeforeAndAfterAll {
20+
21+
val dimensions = Seq(PathDimension("/api"), StatusGroupDimension(StatusCodes.OK))
22+
val timestamp = Instant.ofEpochSecond(1234)
23+
24+
def withFixture(test: (TestProbe, GraphiteRegistry) => Any) = {
25+
val carbon = TestProbe()
26+
val handler = TestProbe()
27+
carbon.send(IO(Tcp), Tcp.Bind(carbon.ref, new InetSocketAddress(0)))
28+
val port = carbon.expectMsgType[Tcp.Bound].localAddress.getPort
29+
val socket = carbon.sender()
30+
carbon.setAutoPilot(new TestActor.AutoPilot {
31+
override def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match {
32+
case _: Tcp.Connected =>
33+
sender ! Tcp.Register(handler.ref)
34+
TestActor.KeepRunning
35+
}
36+
})
37+
38+
val client = new CarbonClient("localhost", port) {
39+
override val clock: Clock = Clock.fixed(timestamp, ZoneId.systemDefault())
40+
}
41+
val registry = GraphiteRegistry(client)
42+
try {
43+
test(handler, registry)
44+
} finally {
45+
// client.close()
46+
socket ! Tcp.Unbind
47+
}
48+
}
49+
50+
override def afterAll(): Unit = {
51+
shutdown()
52+
super.afterAll()
53+
}
54+
55+
"GraphiteRegistry" should "send active datagrams to the carbon server" in withFixture { (carbon, registry) =>
56+
registry.active.inc()
57+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.requests.active 1 1234\n"
58+
}
59+
60+
it should "send requests datagrams to the carbon server" in withFixture { (carbon, registry) =>
61+
registry.requests.inc()
62+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.requests 1 1234\n"
63+
}
64+
65+
it should "send receivedBytes datagrams to the carbon server" in withFixture { (carbon, registry) =>
66+
registry.receivedBytes.update(3)
67+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.requests.bytes 3 1234\n"
68+
69+
registry.receivedBytes.update(3, dimensions)
70+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.requests.bytes;path=/api;status=2xx 3 1234\n"
71+
}
72+
73+
it should "send responses datagrams to the carbon server" in withFixture { (carbon, registry) =>
74+
registry.responses.inc()
75+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses 1 1234\n"
76+
77+
registry.responses.inc(dimensions)
78+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses;path=/api;status=2xx 1 1234\n"
79+
}
80+
81+
it should "send errors datagrams to the carbon server" in withFixture { (carbon, registry) =>
82+
registry.errors.inc()
83+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses.errors 1 1234\n"
84+
85+
registry.errors.inc(dimensions)
86+
carbon
87+
.expectMsgType[Tcp.Received]
88+
.data
89+
.utf8String shouldBe "akka.http.responses.errors;path=/api;status=2xx 1 1234\n"
90+
}
91+
92+
it should "send duration datagrams to the carbon server" in withFixture { (carbon, registry) =>
93+
registry.duration.observe(3.seconds)
94+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses.duration 3000 1234\n"
95+
96+
registry.duration.observe(3.seconds, dimensions)
97+
carbon
98+
.expectMsgType[Tcp.Received]
99+
.data
100+
.utf8String shouldBe "akka.http.responses.duration;path=/api;status=2xx 3000 1234\n"
101+
}
102+
103+
it should "send sentBytes datagrams to the carbon server" in withFixture { (carbon, registry) =>
104+
registry.sentBytes.update(3)
105+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses.bytes 3 1234\n"
106+
107+
registry.sentBytes.update(3, dimensions)
108+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.responses.bytes;path=/api;status=2xx 3 1234\n"
109+
}
110+
111+
it should "send connected datagrams to the carbon server" in withFixture { (carbon, registry) =>
112+
registry.connected.inc()
113+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.connections.active 1 1234\n"
114+
}
115+
it should "send connections datagrams to the carbon server" in withFixture { (carbon, registry) =>
116+
registry.connections.inc()
117+
carbon.expectMsgType[Tcp.Received].data.utf8String shouldBe "akka.http.connections 1 1234\n"
118+
}
119+
120+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package fr.davit.akka.http.metrics.graphite
2+
3+
import java.time.{Clock, Instant}
4+
5+
import akka.NotUsed
6+
import akka.actor.ActorSystem
7+
import akka.event.Logging
8+
import akka.stream.scaladsl.{Flow, Keep, RestartFlow, Sink, Source, Tcp}
9+
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy, QueueOfferResult}
10+
import akka.util.ByteString
11+
12+
import scala.concurrent.Await
13+
import scala.concurrent.duration.Duration
14+
import scala.concurrent.duration._
15+
16+
object CarbonClient {
17+
18+
def apply(host: String, port: Int)(implicit system: ActorSystem): CarbonClient = new CarbonClient(host, port)
19+
}
20+
21+
class CarbonClient(host: String, port: Int)(implicit system: ActorSystem) extends AutoCloseable {
22+
23+
implicit private lazy val materializer: Materializer = ActorMaterializer()
24+
25+
private val logger = Logging(system.eventStream, classOf[CarbonClient])
26+
protected val clock: Clock = Clock.systemUTC()
27+
28+
private def serialize[T](name: String, value: T, ts: Instant): ByteString = {
29+
ByteString(s"$name $value ${ts.getEpochSecond}\n")
30+
}
31+
32+
// TODO read backoff from config
33+
private def connection: Flow[ByteString, ByteString, NotUsed] = RestartFlow.withBackoff(
34+
minBackoff = 3.seconds,
35+
maxBackoff = 30.seconds,
36+
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
37+
maxRestarts = -1 // keep retrying forever
38+
)(() => Tcp().outgoingConnection(host, port))
39+
40+
private val queue = Source.queue[ByteString](19, OverflowStrategy.dropHead)
41+
.via(connection)
42+
.toMat(Sink.ignore)(Keep.left)
43+
.run()
44+
45+
def publish[T](name: String, value: T, ts: Instant = Instant.now(clock)): Unit = {
46+
// it's reasonable to block until the message in enqueued
47+
Await.result(queue.offer(serialize(name, value, ts)), Duration.Inf) match {
48+
case QueueOfferResult.Enqueued => logger.debug("Metric {} enqueued", name)
49+
case QueueOfferResult.Dropped => logger.debug("Metric {} dropped", name)
50+
case QueueOfferResult.Failure(e) => logger.error(e, s"Failed publishing metric $name")
51+
case QueueOfferResult.QueueClosed => throw new Exception("Failed publishing metric to closed carbon client")
52+
}
53+
}
54+
55+
override def close(): Unit = {
56+
queue.complete()
57+
Await.result(queue.watchCompletion(), Duration.Inf)
58+
}
59+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package fr.davit.akka.http.metrics.graphite
2+
3+
import fr.davit.akka.http.metrics.core.{Counter, Dimension, Gauge, Histogram, HttpMetricsRegistry, Timer}
4+
5+
import scala.concurrent.duration.FiniteDuration
6+
7+
object GraphiteRegistry {
8+
9+
val AkkaPrefix = "akka.http."
10+
11+
private implicit class RichCarbonClient(client: CarbonClient) {
12+
13+
private def metricName(name: String, dimensions: Seq[Dimension]): String = {
14+
val tags = dimensions.map(d => d.key + "=" + d.value).toList
15+
(AkkaPrefix + name :: tags).mkString(";")
16+
}
17+
18+
def counter[T](name: String): Counter[T] = new Counter[T] {
19+
override def inc(dimensions: Seq[Dimension] = Seq.empty): Unit = {
20+
client.publish(metricName(name, dimensions), 1)
21+
}
22+
}
23+
24+
def gauge[T](name: String): Gauge[T] = new Gauge[T] {
25+
override def inc(dimensions: Seq[Dimension] = Seq.empty): Unit = {
26+
client.publish(metricName(name, dimensions), 1)
27+
}
28+
29+
override def dec(dimensions: Seq[Dimension] = Seq.empty): Unit = {
30+
client.publish(metricName(name, dimensions), -1)
31+
}
32+
}
33+
34+
def timer(name: String): Timer = new Timer {
35+
override def observe(duration: FiniteDuration, dimensions: Seq[Dimension] = Seq.empty): Unit = {
36+
client.publish(metricName(name, dimensions), duration.toMillis)
37+
}
38+
}
39+
40+
def histogram[T](name: String): Histogram[T] = new Histogram[T] {
41+
override def update(value: T, dimensions: Seq[Dimension] = Seq.empty): Unit = {
42+
client.publish(metricName(name, dimensions), value)
43+
}
44+
}
45+
}
46+
47+
def apply(client: CarbonClient): GraphiteRegistry = new GraphiteRegistry(client)
48+
}
49+
50+
class GraphiteRegistry(client: CarbonClient) extends HttpMetricsRegistry {
51+
52+
import GraphiteRegistry._
53+
54+
override def active: Gauge[Long] = client.gauge("requests.active")
55+
56+
override def requests: Counter[Long] = client.counter("requests")
57+
58+
override def receivedBytes: Histogram[Long] = client.histogram("requests.bytes")
59+
60+
override def responses: Counter[Long] = client.counter("responses")
61+
62+
override def errors: Counter[Long] = client.counter("responses.errors")
63+
64+
override def duration: Timer = client.timer("responses.duration")
65+
66+
override def sentBytes: Histogram[Long] = client.histogram("responses.bytes")
67+
68+
override def connected: Gauge[Long] = client.gauge("connections.active")
69+
70+
override def connections: Counter[Long] = client.counter("connections")
71+
}

build.sbt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ lazy val commonSettings = Defaults.itSettings ++ Seq(
2525
)
2626

2727
lazy val `akka-http-metrics` = (project in file("."))
28-
.aggregate(`akka-http-metrics-core`, `akka-http-metrics-datadog`, `akka-http-metrics-dropwizard`, `akka-http-metrics-prometheus`)
28+
.aggregate(
29+
`akka-http-metrics-core`,
30+
`akka-http-metrics-datadog`,
31+
`akka-http-metrics-graphite`,
32+
`akka-http-metrics-dropwizard`,
33+
`akka-http-metrics-prometheus`
34+
)
2935
.settings(commonSettings: _*)
3036
.settings(
3137
publishArtifact := false
@@ -81,6 +87,21 @@ lazy val `akka-http-metrics-dropwizard` = (project in file("akka-http-metrics-do
8187
)
8288
)
8389

90+
lazy val `akka-http-metrics-graphite` = (project in file("akka-http-metrics-graphite"))
91+
.configs(IntegrationTest)
92+
.dependsOn(`akka-http-metrics-core`)
93+
.settings(commonSettings: _*)
94+
.settings(
95+
libraryDependencies ++= Seq(
96+
Dependencies.Provided.akkaStream,
97+
Dependencies.Test.akkaHttpTestkit,
98+
Dependencies.Test.akkaStreamTestkit,
99+
Dependencies.Test.akkaSlf4j,
100+
Dependencies.Test.logback,
101+
Dependencies.Test.scalaTest
102+
),
103+
)
104+
84105
lazy val `akka-http-metrics-prometheus` = (project in file("akka-http-metrics-prometheus"))
85106
.configs(IntegrationTest)
86107
.dependsOn(`akka-http-metrics-core`)

0 commit comments

Comments
 (0)