Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions metrics/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package metrics

import (
"container/list"
"fmt"
"regexp" // Add this import
"sync"
"sync/atomic"
"time"
Expand All @@ -15,9 +17,9 @@ import (
// ResponseMetric is a measurement related to http response.
type ResponseMetric interface {
// ObserveLatency observes latency.
ObserveLatency(url string, seconds float64)
ObserveLatency(method string, url string, seconds float64)
// ObserveFailure observes failure response.
ObserveFailure(url string, now time.Time, seconds float64, err error)
ObserveFailure(method string, url string, now time.Time, seconds float64, err error)
// ObserveReceivedBytes observes the bytes read from apiserver.
ObserveReceivedBytes(bytes int64)
// Gather returns the summary.
Expand All @@ -38,21 +40,34 @@ func NewResponseMetric() ResponseMetric {
}
}

// Aggregates for DELETE and PATCH methods
func normalizeURL(method string, url string) string {
if method != "DELETE" && method != "PATCH" {
return url
}
// Aggregated into https://api.../namespaces/{namespace}/{resourceType}/{name}?timeout=1m0s
re := regexp.MustCompile(`/([^/]+)/([^/?]+)(\?|$)`)

return re.ReplaceAllString(url, "/$1/{name}$3")
}

// ObserveLatency implements ResponseMetric.
func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) {
func (m *responseMetricImpl) ObserveLatency(method string, url string, seconds float64) {
m.mu.Lock()
defer m.mu.Unlock()

l, ok := m.latenciesByURLs[url]
normalizedURL := normalizeURL(method, url)
key := fmt.Sprintf("%s %s", method, normalizedURL)
l, ok := m.latenciesByURLs[key]
if !ok {
m.latenciesByURLs[url] = list.New()
l = m.latenciesByURLs[url]
m.latenciesByURLs[key] = list.New()
l = m.latenciesByURLs[key]
}
l.PushBack(seconds)
}

// ObserveFailure implements ResponseMetric.
func (m *responseMetricImpl) ObserveFailure(url string, now time.Time, seconds float64, err error) {
func (m *responseMetricImpl) ObserveFailure(method string, url string, now time.Time, seconds float64, err error) {
if err == nil {
return
}
Expand All @@ -61,7 +76,7 @@ func (m *responseMetricImpl) ObserveFailure(url string, now time.Time, seconds f
defer m.mu.Unlock()

oerr := types.ResponseError{
URL: url,
URL: fmt.Sprintf("%s %s", method, normalizeURL(method, url)),
Timestamp: now,
Duration: seconds,
}
Expand Down
32 changes: 16 additions & 16 deletions metrics/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,105 +25,105 @@ func TestResponseMetric_ObserveFailure(t *testing.T) {

expectedErrors := []types.ResponseError{
{
URL: "0",
URL: "GET 0",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 429,
},
{
URL: "1",
URL: "GET 1",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 500,
},
{
URL: "2",
URL: "GET 2",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 504,
},
{
URL: "3",
URL: "GET 3",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=",
},
{
URL: "4",
URL: "GET 4",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=",
},
{
URL: "5",
URL: "GET 5",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: client connection lost",
},
{
URL: "6",
URL: "GET 6",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: client connection lost",
},
{
URL: "7",
URL: "GET 7",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: http2.ErrCode(10).String(),
},
{
URL: "8",
URL: "GET 8",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "net/http: TLS handshake timeout",
},
{
URL: "9",
URL: "GET 9",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "net/http: TLS handshake timeout",
},
{
URL: "10",
URL: "GET 10",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "context deadline exceeded",
},
{
URL: "11",
URL: "GET 11",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: syscall.ECONNRESET.Error(),
},
{
URL: "12",
URL: "GET 12",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: syscall.ECONNREFUSED.Error(),
},
{
URL: "13",
URL: "GET 13",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: io.ErrUnexpectedEOF.Error(),
},
{
URL: "14",
URL: "GET 14",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeUnknown,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestResponseMetric_ObserveFailure(t *testing.T) {

m := NewResponseMetric()
for idx, err := range errs {
m.ObserveFailure(fmt.Sprintf("%d", idx), observedAt, dur.Seconds(), err)
m.ObserveFailure("GET", fmt.Sprintf("%d", idx), observedAt, dur.Seconds(), err)
}
errors := m.Gather().Errors
assert.Equal(t, expectedErrors, errors)
Expand Down
4 changes: 2 additions & 2 deletions request/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I

respMetric.ObserveReceivedBytes(bytes)
if err != nil {
respMetric.ObserveFailure(req.URL().String(), end, latency, err)
respMetric.ObserveFailure(req.Method(), req.URL().String(), end, latency, err)
klog.V(5).Infof("Request stream failed: %v", err)
return
}
respMetric.ObserveLatency(req.URL().String(), latency)
respMetric.ObserveLatency(req.Method(), req.URL().String(), latency)
}()
}
}(cli)
Expand Down
Loading