Skip to content

Commit be40d66

Browse files
authored
Merge pull request #795 from zalando-incubator/nakadi-filter-support
Add support for filtering nakadi subscriptions
2 parents 30dcc24 + 5418c64 commit be40d66

File tree

4 files changed

+322
-90
lines changed

4 files changed

+322
-90
lines changed

README.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -751,12 +751,12 @@ spec:
751751
- type: External
752752
external:
753753
metric:
754-
name: my-nakadi-consumer
755-
selector:
756-
matchLabels:
757-
type: nakadi
758-
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
759-
metric-type: "consumer-lag-seconds|unconsumed-events"
754+
name: my-nakadi-consumer
755+
selector:
756+
matchLabels:
757+
type: nakadi
758+
subscription-id: "708095f6-cece-4d02-840e-ee488d710b29"
759+
metric-type: "consumer-lag-seconds|unconsumed-events"
760760
target:
761761
# value is compatible with the consumer-lag-seconds metric type.
762762
# It describes the amount of consumer lag in seconds before scaling
@@ -805,6 +805,27 @@ with more consumers.
805805
For this case you should also account for the average time for processing an
806806
event when defining the target.
807807

808+
Alternative to defining `subscription-id` you can also filter based on
809+
`owning_application`, `event-types` and `consumer-group`:
810+
811+
```yaml
812+
metrics:
813+
- type: External
814+
external:
815+
metric:
816+
name: my-nakadi-consumer
817+
selector:
818+
matchLabels:
819+
type: nakadi
820+
owning-application: "example-app"
821+
# comma separated list of event types
822+
event-types: "example-event-type,example-event-type2"
823+
consumer-group: "abcd1234"
824+
metric-type: "consumer-lag-seconds|unconsumed-events"
825+
```
826+
827+
This is useful in dynamic environments where the subscription ID might not be
828+
known before deployment time (e.g. because it's created by the same deployment).
808829

809830
## HTTP Collector
810831

pkg/collector/nakadi_collector.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package collector
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
"github.com/zalando-incubator/kube-metrics-adapter/pkg/nakadi"
@@ -17,6 +18,9 @@ const (
1718
// subscriptions.
1819
NakadiMetricType = "nakadi"
1920
nakadiSubscriptionIDKey = "subscription-id"
21+
nakadiOwningApplicationKey = "owning-application"
22+
nakadiConsumerGroupKey = "consumer-group"
23+
nakadiEventTypesKey = "event-types"
2024
nakadiMetricTypeKey = "metric-type"
2125
nakadiMetricTypeConsumerLagSeconds = "consumer-lag-seconds"
2226
nakadiMetricTypeUnconsumedEvents = "unconsumed-events"
@@ -43,26 +47,21 @@ func (c *NakadiCollectorPlugin) NewCollector(ctx context.Context, hpa *autoscali
4347
// NakadiCollector defines a collector that is able to collect metrics from
4448
// Nakadi.
4549
type NakadiCollector struct {
46-
nakadi nakadi.Nakadi
47-
interval time.Duration
48-
subscriptionID string
49-
nakadiMetricType string
50-
metric autoscalingv2.MetricIdentifier
51-
metricType autoscalingv2.MetricSourceType
52-
namespace string
50+
nakadi nakadi.Nakadi
51+
interval time.Duration
52+
subscriptionFilter *nakadi.SubscriptionFilter
53+
nakadiMetricType string
54+
metric autoscalingv2.MetricIdentifier
55+
metricType autoscalingv2.MetricSourceType
56+
namespace string
5357
}
5458

5559
// NewNakadiCollector initializes a new NakadiCollector.
56-
func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
60+
func NewNakadiCollector(_ context.Context, nakadiClient nakadi.Nakadi, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*NakadiCollector, error) {
5761
if config.Metric.Selector == nil {
5862
return nil, fmt.Errorf("selector for nakadi is not specified")
5963
}
6064

61-
subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]
62-
if !ok {
63-
return nil, fmt.Errorf("subscription-id not specified on metric")
64-
}
65-
6665
metricType, ok := config.Config[nakadiMetricTypeKey]
6766
if !ok {
6867
return nil, fmt.Errorf("metric-type not specified on metric")
@@ -72,14 +71,40 @@ func NewNakadiCollector(_ context.Context, nakadi nakadi.Nakadi, hpa *autoscalin
7271
return nil, fmt.Errorf("metric-type must be either '%s' or '%s', was '%s'", nakadiMetricTypeConsumerLagSeconds, nakadiMetricTypeUnconsumedEvents, metricType)
7372
}
7473

74+
// Either subscription-id or filtering via owning-application,
75+
// event-types, and consumer-group is supported. If all are defined
76+
// then only subscription-id is used and the rest of the fields are
77+
// ignored.
78+
subscriptionFilter := &nakadi.SubscriptionFilter{}
79+
if subscriptionID, ok := config.Config[nakadiSubscriptionIDKey]; ok {
80+
subscriptionFilter.SubscriptionID = subscriptionID
81+
}
82+
83+
if owningApplication, ok := config.Config[nakadiOwningApplicationKey]; ok {
84+
subscriptionFilter.OwningApplication = owningApplication
85+
}
86+
87+
if nakadiEventTypes, ok := config.Config[nakadiEventTypesKey]; ok {
88+
eventTypes := strings.Split(nakadiEventTypes, ",")
89+
subscriptionFilter.EventTypes = eventTypes
90+
}
91+
92+
if consumerGroup, ok := config.Config[nakadiConsumerGroupKey]; ok {
93+
subscriptionFilter.ConsumerGroup = consumerGroup
94+
}
95+
96+
if subscriptionFilter.SubscriptionID == "" && (subscriptionFilter.OwningApplication == "" || len(subscriptionFilter.EventTypes) == 0 || subscriptionFilter.ConsumerGroup == "") {
97+
return nil, fmt.Errorf("either subscription-id or all of [owning-application, event-types, consumer-group] must be specified on the metric")
98+
}
99+
75100
return &NakadiCollector{
76-
nakadi: nakadi,
77-
interval: interval,
78-
subscriptionID: subscriptionID,
79-
nakadiMetricType: metricType,
80-
metric: config.Metric,
81-
metricType: config.Type,
82-
namespace: hpa.Namespace,
101+
nakadi: nakadiClient,
102+
interval: interval,
103+
subscriptionFilter: subscriptionFilter,
104+
nakadiMetricType: metricType,
105+
metric: config.Metric,
106+
metricType: config.Type,
107+
namespace: hpa.Namespace,
83108
}, nil
84109
}
85110

@@ -89,12 +114,12 @@ func (c *NakadiCollector) GetMetrics(ctx context.Context) ([]CollectedMetric, er
89114
var err error
90115
switch c.nakadiMetricType {
91116
case nakadiMetricTypeConsumerLagSeconds:
92-
value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionID)
117+
value, err = c.nakadi.ConsumerLagSeconds(ctx, c.subscriptionFilter)
93118
if err != nil {
94119
return nil, err
95120
}
96121
case nakadiMetricTypeUnconsumedEvents:
97-
value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionID)
122+
value, err = c.nakadi.UnconsumedEvents(ctx, c.subscriptionFilter)
98123
if err != nil {
99124
return nil, err
100125
}

pkg/nakadi/nakadi.go

Lines changed: 140 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212

1313
// Nakadi defines an interface for talking to the Nakadi API.
1414
type Nakadi interface {
15-
ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error)
16-
UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error)
15+
ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error)
16+
UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error)
1717
}
1818

1919
// Client defines client for interfacing with the Nakadi API.
@@ -30,8 +30,8 @@ func NewNakadiClient(nakadiEndpoint string, client *http.Client) *Client {
3030
}
3131
}
3232

33-
func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string) (int64, error) {
34-
stats, err := c.stats(ctx, subscriptionID)
33+
func (c *Client) ConsumerLagSeconds(ctx context.Context, filter *SubscriptionFilter) (int64, error) {
34+
stats, err := c.stats(ctx, filter)
3535
if err != nil {
3636
return 0, err
3737
}
@@ -46,8 +46,8 @@ func (c *Client) ConsumerLagSeconds(ctx context.Context, subscriptionID string)
4646
return maxConsumerLagSeconds, nil
4747
}
4848

49-
func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (int64, error) {
50-
stats, err := c.stats(ctx, subscriptionID)
49+
func (c *Client) UnconsumedEvents(ctx context.Context, filter *SubscriptionFilter) (int64, error) {
50+
stats, err := c.stats(ctx, filter)
5151
if err != nil {
5252
return 0, err
5353
}
@@ -62,6 +62,90 @@ func (c *Client) UnconsumedEvents(ctx context.Context, subscriptionID string) (i
6262
return unconsumedEvents, nil
6363
}
6464

65+
type SubscriptionFilter struct {
66+
SubscriptionID string
67+
OwningApplication string
68+
EventTypes []string
69+
ConsumerGroup string
70+
}
71+
72+
func (c *Client) subscriptions(ctx context.Context, filter *SubscriptionFilter, href string) ([]string, error) {
73+
endpoint, err := url.Parse(c.nakadiEndpoint)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
if href != "" {
79+
endpoint, err = url.Parse(c.nakadiEndpoint + href)
80+
if err != nil {
81+
return nil, fmt.Errorf("[nakadi subscriptions] failed to parse URL with href: %w", err)
82+
}
83+
} else {
84+
endpoint.Path = "/subscriptions"
85+
q := endpoint.Query()
86+
if filter.OwningApplication != "" {
87+
q.Set("owning_application", filter.OwningApplication)
88+
}
89+
for _, eventType := range filter.EventTypes {
90+
q.Add("event_type", eventType)
91+
}
92+
if filter.ConsumerGroup != "" {
93+
q.Set("consumer_group", filter.ConsumerGroup)
94+
}
95+
endpoint.RawQuery = q.Encode()
96+
}
97+
98+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
99+
if err != nil {
100+
return nil, fmt.Errorf("[nakadi subscriptions] failed to create request: %w", err)
101+
}
102+
103+
resp, err := c.http.Do(req)
104+
if err != nil {
105+
return nil, fmt.Errorf("[nakadi subscriptions] failed to make request: %w", err)
106+
}
107+
defer resp.Body.Close()
108+
109+
d, err := io.ReadAll(resp.Body)
110+
if err != nil {
111+
return nil, err
112+
}
113+
114+
if resp.StatusCode != http.StatusOK {
115+
return nil, fmt.Errorf("[nakadi subscriptions] unexpected response code: %d (%s)", resp.StatusCode, string(d))
116+
}
117+
118+
var subscriptionsResp struct {
119+
Items []struct {
120+
ID string `json:"id"`
121+
}
122+
Links struct {
123+
Next struct {
124+
Href string `json:"href"`
125+
} `json:"next"`
126+
} `json:"_links"`
127+
}
128+
err = json.Unmarshal(d, &subscriptionsResp)
129+
if err != nil {
130+
return nil, err
131+
}
132+
133+
var subscriptions []string
134+
for _, item := range subscriptionsResp.Items {
135+
subscriptions = append(subscriptions, item.ID)
136+
}
137+
138+
if subscriptionsResp.Links.Next.Href != "" {
139+
nextSubscriptions, err := c.subscriptions(ctx, nil, subscriptionsResp.Links.Next.Href)
140+
if err != nil {
141+
return nil, fmt.Errorf("[nakadi subscriptions] failed to get next subscriptions: %w", err)
142+
}
143+
subscriptions = append(subscriptions, nextSubscriptions...)
144+
}
145+
146+
return subscriptions, nil
147+
}
148+
65149
type statsResp struct {
66150
Items []statsEventType `json:"items"`
67151
}
@@ -80,45 +164,68 @@ type statsPartition struct {
80164
AssignmentType string `json:"assignment_type"`
81165
}
82166

83-
// stats returns the Nakadi stats for a given subscription ID.
167+
// stats returns the Nakadi stats for a given a subscription filter which can
168+
// include the subscription ID or a filter combination of [owning-applicaiton,
169+
// event-types, consumer-group]..
84170
//
85171
// https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get
86-
func (c *Client) stats(ctx context.Context, subscriptionID string) ([]statsEventType, error) {
172+
func (c *Client) stats(ctx context.Context, filter *SubscriptionFilter) ([]statsEventType, error) {
173+
var subscriptionIDs []string
174+
if filter.SubscriptionID == "" {
175+
subscriptions, err := c.subscriptions(ctx, filter, "")
176+
if err != nil {
177+
return nil, fmt.Errorf("[nakadi stats] failed to get subscriptions: %w", err)
178+
}
179+
subscriptionIDs = subscriptions
180+
} else {
181+
subscriptionIDs = []string{filter.SubscriptionID}
182+
}
183+
87184
endpoint, err := url.Parse(c.nakadiEndpoint)
88185
if err != nil {
89-
return nil, err
186+
return nil, fmt.Errorf("[nakadi stats] failed to parse URL %q: %w", c.nakadiEndpoint, err)
90187
}
91188

92-
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
189+
var stats []statsEventType
190+
for _, subscriptionID := range subscriptionIDs {
191+
endpoint.Path = fmt.Sprintf("/subscriptions/%s/stats", subscriptionID)
93192

94-
q := endpoint.Query()
95-
q.Set("show_time_lag", "true")
96-
endpoint.RawQuery = q.Encode()
193+
q := endpoint.Query()
194+
q.Set("show_time_lag", "true")
195+
endpoint.RawQuery = q.Encode()
97196

98-
resp, err := c.http.Get(endpoint.String())
99-
if err != nil {
100-
return nil, err
101-
}
102-
defer resp.Body.Close()
197+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
198+
if err != nil {
199+
return nil, fmt.Errorf("[nakadi stats] failed to create request: %w", err)
200+
}
103201

104-
d, err := io.ReadAll(resp.Body)
105-
if err != nil {
106-
return nil, err
107-
}
202+
resp, err := c.http.Do(req)
203+
if err != nil {
204+
return nil, fmt.Errorf("[nakadi stats] failed to make request: %w", err)
205+
}
206+
defer resp.Body.Close()
108207

109-
if resp.StatusCode != http.StatusOK {
110-
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
111-
}
208+
d, err := io.ReadAll(resp.Body)
209+
if err != nil {
210+
return nil, err
211+
}
112212

113-
var result statsResp
114-
err = json.Unmarshal(d, &result)
115-
if err != nil {
116-
return nil, err
117-
}
213+
if resp.StatusCode != http.StatusOK {
214+
return nil, fmt.Errorf("[nakadi stats] unexpected response code: %d (%s)", resp.StatusCode, string(d))
215+
}
216+
217+
var result statsResp
218+
err = json.Unmarshal(d, &result)
219+
if err != nil {
220+
return nil, err
221+
}
222+
223+
if len(result.Items) == 0 {
224+
return nil, errors.New("[nakadi stats] expected at least 1 event-type, 0 returned")
225+
}
118226

119-
if len(result.Items) == 0 {
120-
return nil, errors.New("expected at least 1 event-type, 0 returned")
227+
stats = append(stats, result.Items...)
121228
}
122229

123-
return result.Items, nil
230+
return stats, nil
124231
}

0 commit comments

Comments
 (0)