Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (

// ResponseError is the record about that error.
type ResponseError struct {
Method string `json:"method"`
// URL indicates target resource.
URL string `json:"url"`
// Timestamp indicates when this error was received.
Expand Down
17 changes: 10 additions & 7 deletions metrics/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package metrics

import (
"container/list"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,9 +16,9 @@ import (
// ResponseMetric is a measurement related to http response.
type ResponseMetric interface {
// ObserveLatency observes latency.
ObserveLatency(url string, seconds float64)
ObserveLatency(method string, url string, seconds float64)
// ObserveFailure observes failure response.
ObserveFailure(url string, now time.Time, seconds float64, err error)
ObserveFailure(method string, url string, now time.Time, seconds float64, err error)
// ObserveReceivedBytes observes the bytes read from apiserver.
ObserveReceivedBytes(bytes int64)
// Gather returns the summary.
Expand All @@ -39,20 +40,21 @@ func NewResponseMetric() ResponseMetric {
}

// ObserveLatency implements ResponseMetric.
func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) {
func (m *responseMetricImpl) ObserveLatency(method string, url string, seconds float64) {
m.mu.Lock()
defer m.mu.Unlock()

l, ok := m.latenciesByURLs[url]
key := fmt.Sprintf("%s %s", method, url)
l, ok := m.latenciesByURLs[key]
if !ok {
m.latenciesByURLs[url] = list.New()
l = m.latenciesByURLs[url]
m.latenciesByURLs[key] = list.New()
l = m.latenciesByURLs[key]
}
l.PushBack(seconds)
}

// ObserveFailure implements ResponseMetric.
func (m *responseMetricImpl) ObserveFailure(url string, now time.Time, seconds float64, err error) {
func (m *responseMetricImpl) ObserveFailure(method string, url string, now time.Time, seconds float64, err error) {
if err == nil {
return
}
Expand All @@ -61,6 +63,7 @@ func (m *responseMetricImpl) ObserveFailure(url string, now time.Time, seconds f
defer m.mu.Unlock()

oerr := types.ResponseError{
Method: method,
URL: url,
Timestamp: now,
Duration: seconds,
Expand Down
17 changes: 16 additions & 1 deletion metrics/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,104 +25,119 @@ func TestResponseMetric_ObserveFailure(t *testing.T) {

expectedErrors := []types.ResponseError{
{
Method: "GET",
URL: "0",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 429,
},
{
Method: "GET",
URL: "1",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 500,
},
{
Method: "GET",
URL: "2",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP,
Code: 504,
},
{
Method: "GET",
URL: "3",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=",
},
{
Method: "GET",
URL: "4",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=",
},
{
Method: "GET",
URL: "5",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: client connection lost",
},
{
Method: "GET",
URL: "6",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: "http2: client connection lost",
},
{
Method: "GET",
URL: "7",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeHTTP2Protocol,
Message: http2.ErrCode(10).String(),
},
{
Method: "GET",
URL: "8",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "net/http: TLS handshake timeout",
},
{
Method: "GET",
URL: "9",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "net/http: TLS handshake timeout",
},
{
Method: "GET",
URL: "10",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: "context deadline exceeded",
},
{
Method: "GET",
URL: "11",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: syscall.ECONNRESET.Error(),
},
{
Method: "GET",
URL: "12",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: syscall.ECONNREFUSED.Error(),
},
{
Method: "GET",
URL: "13",
Timestamp: observedAt,
Duration: dur.Seconds(),
Type: types.ResponseErrorTypeConnection,
Message: io.ErrUnexpectedEOF.Error(),
},
{
Method: "GET",
URL: "14",
Timestamp: observedAt,
Duration: dur.Seconds(),
Expand Down Expand Up @@ -166,7 +181,7 @@ func TestResponseMetric_ObserveFailure(t *testing.T) {

m := NewResponseMetric()
for idx, err := range errs {
m.ObserveFailure(fmt.Sprintf("%d", idx), observedAt, dur.Seconds(), err)
m.ObserveFailure("GET", fmt.Sprintf("%d", idx), observedAt, dur.Seconds(), err)
}
errors := m.Gather().Errors
assert.Equal(t, expectedErrors, errors)
Expand Down
19 changes: 19 additions & 0 deletions request/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"path"
"reflect"
"time"
_ "unsafe" // unsafe to use internal function from client-go
Expand All @@ -19,6 +21,7 @@ import (
type Requester interface {
Method() string
URL() *url.URL
MaskedURL() *url.URL
Timeout(time.Duration)
Do(context.Context) (bytes int64, err error)
}
Expand All @@ -36,6 +39,22 @@ func (reqr *BaseRequester) URL() *url.URL {
return reqr.req.URL()
}

// MaskedURL returns a masked URL for DELETE and PATCH methods to enable aggregation in metrics
func (reqr *BaseRequester) MaskedURL() *url.URL {
originalURL := reqr.req.URL()

// Aggregates for DELETE and PATCH methods, replaces the last path segment
// for DELETE and PATCH requests so they can be aggregated (e.g. in metrics)
if reqr.method == http.MethodDelete || reqr.method == http.MethodPatch {
if u, err := url.Parse(originalURL.String()); err == nil {
u.Path = path.Join(path.Dir(u.Path), ":name")
return u // String() will keep ":name" as-is
}
}

return originalURL
}

func (reqr *BaseRequester) Timeout(timeout time.Duration) {
reqr.req.Timeout(timeout)
}
Expand Down
4 changes: 2 additions & 2 deletions request/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I

respMetric.ObserveReceivedBytes(bytes)
if err != nil {
respMetric.ObserveFailure(req.URL().String(), end, latency, err)
respMetric.ObserveFailure(req.Method(), req.MaskedURL().String(), end, latency, err)
klog.V(5).Infof("Request stream failed: %v", err)
return
}
respMetric.ObserveLatency(req.URL().String(), latency)
respMetric.ObserveLatency(req.Method(), req.MaskedURL().String(), latency)
}()
}
}(cli)
Expand Down
Loading