Skip to content

feat(metrics): support OpenMetrics from applications #7125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 174 additions & 17 deletions app/kuma-dp/pkg/dataplane/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"
"io"
"math"
"mime"
"net"
"net/http"
"net/url"
Expand All @@ -30,6 +32,23 @@ var (
var (
prometheusRequestHeaders = []string{"accept", "accept-encoding", "user-agent", "x-prometheus-scrape-timeout-seconds"}
logger = core.Log.WithName("metrics-hijacker")

// holds prometheus content types in order of priority.
prometheusPriorityContentType = []expfmt.Format{
expfmt.FmtOpenMetrics_1_0_0,
expfmt.FmtOpenMetrics_0_0_1,
expfmt.FmtText,
expfmt.FmtUnknown,
}

// Reverse mapping of prometheusPriorityContentType for faster lookup.
prometheusPriorityContentTypeLookup = func(expformats []expfmt.Format) map[expfmt.Format]int32 {
reverseMapping := map[expfmt.Format]int32{}
for priority, format := range expformats {
reverseMapping[format] = int32(priority)
}
return reverseMapping
}(prometheusPriorityContentType)
)

var _ component.Component = &Hijacker{}
Expand Down Expand Up @@ -153,44 +172,136 @@ func rewriteMetricsURL(address string, port uint32, path string, queryModifier Q
func (s *Hijacker) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
out := make(chan []byte, len(s.applicationsToScrape))
contentTypes := make(chan expfmt.Format, len(s.applicationsToScrape))
var wg sync.WaitGroup
done := make(chan []byte)
wg.Add(len(s.applicationsToScrape))
go func() {
wg.Wait()
close(done)
close(out)
close(contentTypes)
close(done)
}()

for _, app := range s.applicationsToScrape {
go func(app ApplicationToScrape) {
defer wg.Done()
out <- s.getStats(ctx, req, app)
content, contentType := s.getStats(ctx, req, app)
out <- content

// It's possible to track the highest priority content type seen,
// but that would require mutex.
// I would prefer to calculate it later at one go
contentTypes <- contentType
}(app)
}

select {
case <-ctx.Done():
return
case <-done:
// default format returned by prometheus
writer.Header().Set("content-type", string(expfmt.FmtText))
for resp := range out {
if _, err := writer.Write(resp); err != nil {
logger.Error(err, "error while writing the response")
}
if _, err := writer.Write([]byte("\n")); err != nil {
logger.Error(err, "error while writing the response")
}
selectedCt := selectContentType(contentTypes, req.Header)
writer.Header().Set(hdrContentType, string(selectedCt))

// aggregate metrics of target applications and attempt to make them
// compatible with FmtOpenMetrics if it is the selected content type.
metrics := processMetrics(out, selectedCt)
if _, err := writer.Write(metrics); err != nil {
logger.Error(err, "error while writing the response")
}
}
}

func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) []byte {
func processMetrics(contents <-chan []byte, contentType expfmt.Format) []byte {
buf := new(bytes.Buffer)

for metrics := range contents {
// remove the EOF marker from the metrics, because we are
// merging multiple metrics into one response.
metrics = bytes.ReplaceAll(metrics, []byte("# EOF"), []byte(""))

buf.Write(metrics)
buf.Write([]byte("\n"))
}

processedMetrics := processNewlineChars(buf.Bytes())
processedMetrics = append(processedMetrics, '\n')
buf.Reset()
buf.Write(processedMetrics)

if contentType == expfmt.FmtOpenMetrics_1_0_0 || contentType == expfmt.FmtOpenMetrics_0_0_1 {
// make metrics OpenMetrics compliant
buf.Write([]byte("# EOF\n"))
return buf.Bytes()
}

return buf.Bytes()
}

// processNewlineChars takes byte data and returns a new byte slice
// after trimming and deduplicating the newline characters.
func processNewlineChars(input []byte) []byte {
var deduped []byte

var last byte
if len(input) > 0 {
last = input[0]
}
for i := 1; i < len(input); i++ {
if last == '\n' && input[i] == last {
continue
}
deduped = append(deduped, last)
last = input[i]
}
deduped = append(deduped, last)

return bytes.TrimSpace(deduped)
}

// selectContentType selects the highest priority content type supported by the applications.
// If no valid content type is returned by the applications, it negotiates content type based
// on Accept header of the scraper.
func selectContentType(contentTypes <-chan expfmt.Format, reqHeader http.Header) expfmt.Format {
// Tracks highest negotiated content type priority.
// Lower number means higher priority
//
// We can not simply use the highest priority content type i.e. `application/openmetrics-text`
// and try to mutate the metrics to make it compatible with this type,
// because:
// - if the application is not supporting this type,
// custom metrics might not be compatible (more prone to failure).
// - the user might be using older prom scraper.
//
// So it's better to choose the highest negotiated content type between the
// target apps and the scraper.
var ctPriority int32 = math.MaxInt32
ct := expfmt.FmtUnknown
for contentType := range contentTypes {
priority, valid := prometheusPriorityContentTypeLookup[contentType]
if !valid {
continue
}
if priority < ctPriority {
ctPriority = priority
ct = contentType
}
}

// If no valid content type is returned by the target applications,
// negotitate content type based on Accept header of the scraper.
if ct == expfmt.FmtUnknown {
ct = expfmt.Negotiate(reqHeader)
}

return ct
}

func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app ApplicationToScrape) ([]byte, expfmt.Format) {
req, err := http.NewRequest("GET", rewriteMetricsURL(app.Address, app.Port, app.Path, app.QueryModifier, initReq.URL), nil)
if err != nil {
logger.Error(err, "failed to create request")
return nil
return nil, ""
}
s.passRequestHeaders(req.Header, initReq.Header)
req = req.WithContext(ctx)
Expand All @@ -208,25 +319,27 @@ func (s *Hijacker) getStats(ctx context.Context, initReq *http.Request, app Appl
}
if err != nil {
logger.Error(err, "failed call", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}

respContentType := responseFormat(resp.Header)

var bodyBytes []byte
if app.Mutator != nil {
buf := new(bytes.Buffer)
if err := app.Mutator(resp.Body, buf); err != nil {
logger.Error(err, "failed while mutating data", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}
bodyBytes = buf.Bytes()
} else {
bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
logger.Error(err, "failed while writing", "name", app.Name, "path", app.Path, "port", app.Port)
return nil
return nil, ""
}
}
return bodyBytes
return bodyBytes, respContentType
}

func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
Expand All @@ -243,3 +356,47 @@ func (s *Hijacker) passRequestHeaders(into http.Header, from http.Header) {
func (s *Hijacker) NeedLeaderElection() bool {
return false
}

const (
hdrContentType = "Content-Type"
textType = "text/plain"
)

// responseFormat extracts the correct format from a HTTP response header.
// If no matching format can be found FormatUnknown is returned.
func responseFormat(h http.Header) expfmt.Format {
ct := h.Get(hdrContentType)

mediatype, params, err := mime.ParseMediaType(ct)
if err != nil {
return expfmt.FmtUnknown
}

version := params["version"]

switch mediatype {
case expfmt.ProtoType:
p := params["proto"]
e := params["encoding"]
// only delimited encoding is supported by prometheus scraper
if p == expfmt.ProtoProtocol && e == "delimited" {
return expfmt.FmtProtoDelim
}

// if mediatype is `text/plain`, return Prometheus text format
// without checking the version, as there are few exporters
// which don't set the version param in the content-type header. ex: Envoy
case textType:
return expfmt.FmtText

case expfmt.OpenMetricsType:
if version == expfmt.OpenMetricsVersion_0_0_1 {
return expfmt.FmtOpenMetrics_0_0_1
}
if version == expfmt.OpenMetricsVersion_1_0_0 {
return expfmt.FmtOpenMetrics_1_0_0
}
}

return expfmt.FmtUnknown
}
Loading