Skip to content

Commit 7f7f96a

Browse files
feat(metrics): support OpenMetrics from applications (#7125)
Signed-off-by: AyushSenapati <a.p.senapati008@gmail.com>
1 parent c3b98e7 commit 7f7f96a

File tree

8 files changed

+681
-17
lines changed

8 files changed

+681
-17
lines changed

app/kuma-dp/pkg/dataplane/metrics/server.go

Lines changed: 175 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"math"
9+
"mime"
810
"net"
911
"net/http"
1012
"net/url"
@@ -28,6 +30,23 @@ var (
2830
var (
2931
prometheusRequestHeaders = []string{"accept", "accept-encoding", "user-agent", "x-prometheus-scrape-timeout-seconds"}
3032
logger = core.Log.WithName("metrics-hijacker")
33+
34+
// holds prometheus content types in order of priority.
35+
prometheusPriorityContentType = []expfmt.Format{
36+
expfmt.FmtOpenMetrics_1_0_0,
37+
expfmt.FmtOpenMetrics_0_0_1,
38+
expfmt.FmtText,
39+
expfmt.FmtUnknown,
40+
}
41+
42+
// Reverse mapping of prometheusPriorityContentType for faster lookup.
43+
prometheusPriorityContentTypeLookup = func(expformats []expfmt.Format) map[expfmt.Format]int32 {
44+
reverseMapping := map[expfmt.Format]int32{}
45+
for priority, format := range expformats {
46+
reverseMapping[format] = int32(priority)
47+
}
48+
return reverseMapping
49+
}(prometheusPriorityContentType)
3150
)
3251

3352
var _ component.Component = &Hijacker{}
@@ -151,44 +170,135 @@ func rewriteMetricsURL(address string, port uint32, path string, queryModifier Q
151170
func (s *Hijacker) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
152171
ctx := req.Context()
153172
out := make(chan []byte, len(s.applicationsToScrape))
173+
contentTypes := make(chan expfmt.Format, len(s.applicationsToScrape))
154174
var wg sync.WaitGroup
155175
done := make(chan []byte)
156176
wg.Add(len(s.applicationsToScrape))
157177
go func() {
158178
wg.Wait()
159-
close(done)
160179
close(out)
180+
close(contentTypes)
181+
close(done)
161182
}()
162183

163184
for _, app := range s.applicationsToScrape {
164185
go func(app ApplicationToScrape) {
165186
defer wg.Done()
166-
out <- s.getStats(ctx, req, app)
187+
content, contentType := s.getStats(ctx, req, app)
188+
out <- content
189+
190+
// It's possible to track the highest priority content type seen,
191+
// but that would require mutex.
192+
// I would prefer to calculate it later at one go
193+
contentTypes <- contentType
167194
}(app)
168195
}
169196

170197
select {
171198
case <-ctx.Done():
172199
return
173200
case <-done:
174-
// default format returned by prometheus
175-
writer.Header().Set("content-type", string(expfmt.FmtText))
176-
for resp := range out {
177-
if _, err := writer.Write(resp); err != nil {
178-
logger.Error(err, "error while writing the response")
179-
}
180-
if _, err := writer.Write([]byte("\n")); err != nil {
181-
logger.Error(err, "error while writing the response")
182-
}
201+
selectedCt := selectContentType(contentTypes, req.Header)
202+
writer.Header().Set(hdrContentType, string(selectedCt))
203+
204+
// aggregate metrics of target applications and attempt to make them
205+
// compatible with FmtOpenMetrics if it is the selected content type.
206+
metrics := processMetrics(out, selectedCt)
207+
if _, err := writer.Write(metrics); err != nil {
208+
logger.Error(err, "error while writing the response")
183209
}
184210
}
185211
}
186212

187-
func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) []byte {
213+
func processMetrics(contents <-chan []byte, contentType expfmt.Format) []byte {
214+
buf := new(bytes.Buffer)
215+
216+
for metrics := range contents {
217+
// remove the EOF marker from the metrics, because we are
218+
// merging multiple metrics into one response.
219+
metrics = bytes.ReplaceAll(metrics, []byte("# EOF"), []byte(""))
220+
221+
buf.Write(metrics)
222+
buf.Write([]byte("\n"))
223+
}
224+
225+
processedMetrics := append(processNewlineChars(buf.Bytes()), '\n')
226+
buf.Reset()
227+
buf.Write(processedMetrics)
228+
229+
if contentType == expfmt.FmtOpenMetrics_1_0_0 || contentType == expfmt.FmtOpenMetrics_0_0_1 {
230+
// make metrics OpenMetrics compliant
231+
buf.Write([]byte("# EOF\n"))
232+
}
233+
234+
return buf.Bytes()
235+
}
236+
237+
// processNewlineChars takes byte data and returns a new byte slice
238+
// after trimming and deduplicating the newline characters.
239+
func processNewlineChars(input []byte) []byte {
240+
var deduped []byte
241+
242+
if len(input) == 0 {
243+
return nil
244+
}
245+
last := input[0]
246+
247+
for i := 1; i < len(input); i++ {
248+
if last == '\n' && input[i] == last {
249+
continue
250+
}
251+
deduped = append(deduped, last)
252+
last = input[i]
253+
}
254+
deduped = append(deduped, last)
255+
256+
return bytes.TrimSpace(deduped)
257+
}
258+
259+
// selectContentType selects the highest priority content type supported by the applications.
260+
// If no valid content type is returned by the applications, it negotiates content type based
261+
// on Accept header of the scraper.
262+
func selectContentType(contentTypes <-chan expfmt.Format, reqHeader http.Header) expfmt.Format {
263+
// Tracks highest negotiated content type priority.
264+
// Lower number means higher priority
265+
//
266+
// We can not simply use the highest priority content type i.e. `application/openmetrics-text`
267+
// and try to mutate the metrics to make it compatible with this type,
268+
// because:
269+
// - if the application is not supporting this type,
270+
// custom metrics might not be compatible (more prone to failure).
271+
// - the user might be using older prom scraper.
272+
//
273+
// So it's better to choose the highest negotiated content type between the
274+
// target apps and the scraper.
275+
var ctPriority int32 = math.MaxInt32
276+
ct := expfmt.FmtUnknown
277+
for contentType := range contentTypes {
278+
priority, valid := prometheusPriorityContentTypeLookup[contentType]
279+
if !valid {
280+
continue
281+
}
282+
if priority < ctPriority {
283+
ctPriority = priority
284+
ct = contentType
285+
}
286+
}
287+
288+
// If no valid content type is returned by the target applications,
289+
// negotitate content type based on Accept header of the scraper.
290+
if ct == expfmt.FmtUnknown {
291+
ct = expfmt.Negotiate(reqHeader)
292+
}
293+
294+
return ct
295+
}
296+
297+
func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) ([]byte, expfmt.Format) {
188298
req, err := http.NewRequest("GET", rewriteMetricsURL(app.Address, app.Port, app.Path, app.QueryModifier, initReq.URL), nil)
189299
if err != nil {
190300
logger.Error(err, "failed to create request")
191-
return nil
301+
return nil, ""
192302
}
193303
s.passRequestHeaders(req.Header, initReq.Header)
194304
req = req.WithContext(ctx)
@@ -206,25 +316,27 @@ func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app Appl
206316
}
207317
if err != nil {
208318
logger.Error(err, "failed call", "name", app.Name, "path", app.Path, "port", app.Port)
209-
return nil
319+
return nil, ""
210320
}
211321

322+
respContentType := responseFormat(resp.Header)
323+
212324
var bodyBytes []byte
213325
if app.Mutator != nil {
214326
buf := new(bytes.Buffer)
215327
if err := app.Mutator(resp.Body, buf); err != nil {
216328
logger.Error(err, "failed while mutating data", "name", app.Name, "path", app.Path, "port", app.Port)
217-
return nil
329+
return nil, ""
218330
}
219331
bodyBytes = buf.Bytes()
220332
} else {
221333
bodyBytes, err = io.ReadAll(resp.Body)
222334
if err != nil {
223335
logger.Error(err, "failed while writing", "name", app.Name, "path", app.Path, "port", app.Port)
224-
return nil
336+
return nil, ""
225337
}
226338
}
227-
return bodyBytes
339+
return bodyBytes, respContentType
228340
}
229341

230342
func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
@@ -241,3 +353,49 @@ func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
241353
func (s *Hijacker) NeedLeaderElection() bool {
242354
return false
243355
}
356+
357+
const (
358+
hdrContentType = "Content-Type"
359+
textType = "text/plain"
360+
)
361+
362+
// responseFormat extracts the correct format from a HTTP response header.
363+
// If no matching format can be found FormatUnknown is returned.
364+
func responseFormat(h http.Header) expfmt.Format {
365+
ct := h.Get(hdrContentType)
366+
367+
mediatype, params, err := mime.ParseMediaType(ct)
368+
if err != nil {
369+
return expfmt.FmtUnknown
370+
}
371+
372+
version := params["version"]
373+
374+
switch mediatype {
375+
case expfmt.ProtoType:
376+
p := params["proto"]
377+
e := params["encoding"]
378+
// only delimited encoding is supported by prometheus scraper
379+
if p == expfmt.ProtoProtocol && e == "delimited" {
380+
return expfmt.FmtProtoDelim
381+
}
382+
383+
// if mediatype is `text/plain`, return Prometheus text format
384+
// without checking the version, as there are few exporters
385+
// which don't set the version param in the content-type header. ex: Envoy
386+
case textType:
387+
return expfmt.FmtText
388+
389+
// if mediatype is OpenMetricsType, return FmtUnknown for any version
390+
// other than "0.0.1", "1.0.0" and "".
391+
case expfmt.OpenMetricsType:
392+
if version == expfmt.OpenMetricsVersion_0_0_1 || version == "" {
393+
return expfmt.FmtOpenMetrics_0_0_1
394+
}
395+
if version == expfmt.OpenMetricsVersion_1_0_0 {
396+
return expfmt.FmtOpenMetrics_1_0_0
397+
}
398+
}
399+
400+
return expfmt.FmtUnknown
401+
}

0 commit comments

Comments
 (0)