Skip to content

Commit 84a5674

Browse files
📈 Report traffic dropped metrics to LAPI (#223)
* Initial implementation * fix * fixes * Fixes * xx * progress * xx * xx * xx * fix linter * Progress * Fixes * xx * xx * Remove trace logger * Last fix * fix lint * fix lint * fix lint --------- Co-authored-by: Max Lerebourg <maxlerebourg@gmail.com>
1 parent de7e382 commit 84a5674

File tree

7 files changed

+226
-52
lines changed

7 files changed

+226
-52
lines changed

README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,18 @@ make run
308308

309309
### Note
310310

311-
**/!\ Cache is shared by all services**
312-
_This means if an IP is banned, all services which are protected by an instance of the plugin will deny requests from that IP_
313-
Only one instance of the plugin is _possible_.
314-
315-
**/!\ Appsec maximum body limit is defaulted to 10MB**
316-
_By careful when you upgrade to >1.4.x_
311+
> [!IMPORTANT]
312+
> Some of the behaviours and configuration parameters are shared globally across *all* crowdsec middlewares even if you declare different middlewares with different settings.
313+
>
314+
> **Cache is shared by all services**: This means if an IP is banned, all services which are protected by an instance of the plugin will deny requests from that IP
315+
>
316+
> If you define different caches for different middlewares, only the first one to be instantiated will be bound to the crowdsec stream.
317+
>
318+
> Overall, this middleware is designed in such a way that **only one instance of the plugin is *possible*.** You can have multiple crowdsec middlewares in the same cluster, the key parameters must be aligned (MetricsUpdateIntervalSeconds, CrowdsecMode, CrowdsecAppsecEnabled, etc.)
319+
320+
> [!WARNING]
321+
> **Appsec maximum body limit is defaulted to 10MB**
322+
> *Be careful when you upgrade to >1.4.x*
317323
318324
### Variables
319325

@@ -324,11 +330,16 @@ _By careful when you upgrade to >1.4.x_
324330
- LogLevel
325331
- string
326332
- default: `INFO`, expected values are: `INFO`, `DEBUG`, `ERROR`
327-
- Log are written to `stdout` / `stderr` of file if LogFilePath is provided
333+
- Log are written to `stdout` / `stderr` or file if LogFilePath is provided
328334
- LogFilePath
329335
- string
330336
- default: ""
331337
- File Path to write logs, must be writable by Traefik, Log rotation may require a restart of traefik
338+
- MetricsUpdateIntervalSeconds
339+
- int64
340+
- default: 600
341+
- Interval in seconds between metrics updates to Crowdsec
342+
- If set to zero or less, metrics collection is disabled
332343
- CrowdsecMode
333344
- string
334345
- default: `live`, expected values are: `none`, `live`, `stream`, `alone`, `appsec`
@@ -579,6 +590,7 @@ http:
579590
captchaGracePeriodSeconds: 1800
580591
captchaHTMLFilePath: /captcha.html
581592
banHTMLFilePath: /ban.html
593+
metricsUpdateIntervalSeconds: 600
582594
```
583595
584596
#### Fill variable with value of file

bouncer.go

Lines changed: 135 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"io"
1313
"net/http"
1414
"net/url"
15+
"strconv"
1516
"strings"
17+
"sync/atomic"
1618
"text/template"
1719
"time"
1820

@@ -33,19 +35,40 @@ const (
3335
crowdsecLapiHeader = "X-Api-Key"
3436
crowdsecLapiRoute = "v1/decisions"
3537
crowdsecLapiStreamRoute = "v1/decisions/stream"
38+
crowdsecLapiMetricsRoute = "v1/usage-metrics"
3639
crowdsecCapiHost = "api.crowdsec.net"
3740
crowdsecCapiHeader = "Authorization"
3841
crowdsecCapiLoginRoute = "v2/watchers/login"
3942
crowdsecCapiStreamRoute = "v2/decisions/stream"
4043
cacheTimeoutKey = "updated"
4144
)
4245

46+
// ##############################################################
47+
// Important: traefik creates an instance of the bouncer per route.
48+
// We rely on globals (both here and in the memory cache) to share info between
49+
// routes. This means that some of the plugins parameters will only work "once"
50+
// and will take the values of the first middleware that was instantiated even
51+
// if you have different middlewares with different parameters. This design
52+
// makes it impossible to have multiple crowdsec implementations per cluster (unless you have multiple traefik deployments in it)
53+
// - updateInterval
54+
// - updateMaxFailure
55+
// - defaultDecisionTimeout
56+
// - redisUnreachableBlock
57+
// - appsecEnabled
58+
// - appsecHost
59+
// - metricsUpdateIntervalSeconds
60+
// - others...
61+
// ###################################
62+
4363
//nolint:gochecknoglobals
4464
var (
4565
isStartup = true
4666
isCrowdsecStreamHealthy = true
47-
updateFailure = 0
48-
ticker chan bool
67+
updateFailure int64
68+
streamTicker chan bool
69+
metricsTicker chan bool
70+
lastMetricsPush time.Time
71+
blockedRequests int64
4972
)
5073

5174
// CreateConfig creates the default plugin configuration.
@@ -75,7 +98,7 @@ type Bouncer struct {
7598
crowdsecPassword string
7699
crowdsecScenarios []string
77100
updateInterval int64
78-
updateMaxFailure int
101+
updateMaxFailure int64
79102
defaultDecisionTimeout int64
80103
remediationStatusCode int
81104
remediationCustomHeader string
@@ -93,6 +116,8 @@ type Bouncer struct {
93116
}
94117

95118
// New creates the crowdsec bouncer plugin.
119+
//
120+
//nolint:gocyclo
96121
func New(_ context.Context, next http.Handler, config *configuration.Config, name string) (http.Handler, error) {
97122
config.LogLevel = strings.ToUpper(config.LogLevel)
98123
log := logger.New(config.LogLevel, config.LogFilePath)
@@ -225,7 +250,7 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam
225250
return nil, err
226251
}
227252

228-
if (config.CrowdsecMode == configuration.StreamMode || config.CrowdsecMode == configuration.AloneMode) && ticker == nil {
253+
if (config.CrowdsecMode == configuration.StreamMode || config.CrowdsecMode == configuration.AloneMode) && streamTicker == nil {
229254
if config.CrowdsecMode == configuration.AloneMode {
230255
if err := getToken(bouncer); err != nil {
231256
bouncer.log.Error("New:getToken " + err.Error())
@@ -234,10 +259,20 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam
234259
}
235260
handleStreamTicker(bouncer)
236261
isStartup = false
237-
ticker = startTicker(config, log, func() {
262+
streamTicker = startTicker("stream", config.UpdateIntervalSeconds, log, func() {
238263
handleStreamTicker(bouncer)
239264
})
240265
}
266+
267+
// Start metrics ticker if not already running
268+
if metricsTicker == nil && config.MetricsUpdateIntervalSeconds > 0 {
269+
lastMetricsPush = time.Now() // Initialize lastMetricsPush when starting the metrics ticker
270+
handleMetricsTicker(bouncer)
271+
metricsTicker = startTicker("metrics", config.MetricsUpdateIntervalSeconds, log, func() {
272+
handleMetricsTicker(bouncer)
273+
})
274+
}
275+
241276
bouncer.log.Debug("New initialized mode:" + config.CrowdsecMode)
242277

243278
return bouncer, nil
@@ -353,6 +388,8 @@ type Login struct {
353388

354389
// To append Headers we need to call rw.WriteHeader after set any header.
355390
func handleBanServeHTTP(bouncer *Bouncer, rw http.ResponseWriter) {
391+
atomic.AddInt64(&blockedRequests, 1)
392+
356393
if bouncer.remediationCustomHeader != "" {
357394
rw.Header().Set(bouncer.remediationCustomHeader, "ban")
358395
}
@@ -375,6 +412,7 @@ func handleRemediationServeHTTP(bouncer *Bouncer, remoteIP, remediation string,
375412
handleNextServeHTTP(bouncer, remoteIP, rw, req)
376413
return
377414
}
415+
atomic.AddInt64(&blockedRequests, 1) // If we serve a captcha that should count as a dropped request.
378416
bouncer.captchaClient.ServeHTTP(rw, req, remoteIP)
379417
return
380418
}
@@ -406,11 +444,17 @@ func handleStreamTicker(bouncer *Bouncer) {
406444
}
407445
}
408446

409-
func startTicker(config *configuration.Config, log *logger.Log, work func()) chan bool {
410-
ticker := time.NewTicker(time.Duration(config.UpdateIntervalSeconds) * time.Second)
447+
func handleMetricsTicker(bouncer *Bouncer) {
448+
if err := reportMetrics(bouncer); err != nil {
449+
bouncer.log.Error("handleMetricsTicker:reportMetrics " + err.Error())
450+
}
451+
}
452+
453+
func startTicker(name string, updateInterval int64, log *logger.Log, work func()) chan bool {
454+
ticker := time.NewTicker(time.Duration(updateInterval) * time.Second)
411455
stop := make(chan bool, 1)
412456
go func() {
413-
defer log.Debug("ticker:stopped")
457+
defer log.Debug(name + "_ticker:stopped")
414458
for {
415459
select {
416460
case <-ticker.C:
@@ -432,7 +476,7 @@ func handleNoStreamCache(bouncer *Bouncer, remoteIP string) (string, error) {
432476
Path: bouncer.crowdsecPath + crowdsecLapiRoute,
433477
RawQuery: fmt.Sprintf("ip=%v", remoteIP),
434478
}
435-
body, err := crowdsecQuery(bouncer, routeURL.String(), false)
479+
body, err := crowdsecQuery(bouncer, routeURL.String(), nil)
436480
if err != nil {
437481
return cache.BannedValue, err
438482
}
@@ -491,7 +535,16 @@ func getToken(bouncer *Bouncer) error {
491535
Host: bouncer.crowdsecHost,
492536
Path: crowdsecCapiLoginRoute,
493537
}
494-
body, err := crowdsecQuery(bouncer, loginURL.String(), true)
538+
539+
// Move the login-specific payload here
540+
loginData := []byte(fmt.Sprintf(
541+
`{"machine_id": "%v","password": "%v","scenarios": ["%v"]}`,
542+
bouncer.crowdsecMachineID,
543+
bouncer.crowdsecPassword,
544+
strings.Join(bouncer.crowdsecScenarios, `","`),
545+
))
546+
547+
body, err := crowdsecQuery(bouncer, loginURL.String(), loginData)
495548
if err != nil {
496549
return err
497550
}
@@ -528,7 +581,7 @@ func handleStreamCache(bouncer *Bouncer) error {
528581
Path: bouncer.crowdsecPath + bouncer.crowdsecStreamRoute,
529582
RawQuery: fmt.Sprintf("startup=%t", !isCrowdsecStreamHealthy || isStartup),
530583
}
531-
body, err := crowdsecQuery(bouncer, streamRouteURL.String(), false)
584+
body, err := crowdsecQuery(bouncer, streamRouteURL.String(), nil)
532585
if err != nil {
533586
return err
534587
}
@@ -559,15 +612,9 @@ func handleStreamCache(bouncer *Bouncer) error {
559612
return nil
560613
}
561614

562-
func crowdsecQuery(bouncer *Bouncer, stringURL string, isPost bool) ([]byte, error) {
615+
func crowdsecQuery(bouncer *Bouncer, stringURL string, data []byte) ([]byte, error) {
563616
var req *http.Request
564-
if isPost {
565-
data := []byte(fmt.Sprintf(
566-
`{"machine_id": "%v","password": "%v","scenarios": ["%v"]}`,
567-
bouncer.crowdsecMachineID,
568-
bouncer.crowdsecPassword,
569-
strings.Join(bouncer.crowdsecScenarios, `","`),
570-
))
617+
if len(data) > 0 {
571618
req, _ = http.NewRequest(http.MethodPost, stringURL, bytes.NewBuffer(data))
572619
} else {
573620
req, _ = http.NewRequest(http.MethodGet, stringURL, nil)
@@ -588,13 +635,16 @@ func crowdsecQuery(bouncer *Bouncer, stringURL string, isPost bool) ([]byte, err
588635
if errToken := getToken(bouncer); errToken != nil {
589636
return nil, fmt.Errorf("crowdsecQuery:renewToken url:%s %w", stringURL, errToken)
590637
}
591-
return crowdsecQuery(bouncer, stringURL, false)
638+
return crowdsecQuery(bouncer, stringURL, nil)
592639
}
593-
if res.StatusCode != http.StatusOK {
594-
return nil, fmt.Errorf("crowdsecQuery url:%s, statusCode:%d", stringURL, res.StatusCode)
640+
641+
// Check if the status code starts with 2
642+
statusStr := strconv.Itoa(res.StatusCode)
643+
if len(statusStr) < 1 || statusStr[0] != '2' {
644+
return nil, fmt.Errorf("crowdsecQuery method:%s url:%s, statusCode:%d (expected: 2xx)", req.Method, stringURL, res.StatusCode)
595645
}
596-
body, err := io.ReadAll(res.Body)
597646

647+
body, err := io.ReadAll(res.Body)
598648
if err != nil {
599649
return nil, fmt.Errorf("crowdsecQuery:readBody %w", err)
600650
}
@@ -664,3 +714,65 @@ func appsecQuery(bouncer *Bouncer, ip string, httpReq *http.Request) error {
664714
}
665715
return nil
666716
}
717+
718+
func reportMetrics(bouncer *Bouncer) error {
719+
now := time.Now()
720+
currentCount := atomic.LoadInt64(&blockedRequests)
721+
windowSizeSeconds := int(now.Sub(lastMetricsPush).Seconds())
722+
723+
bouncer.log.Debug(fmt.Sprintf("reportMetrics: blocked_requests=%d window_size=%ds", currentCount, windowSizeSeconds))
724+
725+
metrics := map[string]interface{}{
726+
"remediation_components": []map[string]interface{}{
727+
{
728+
"version": "1.X.X",
729+
"type": "bouncer",
730+
"name": "traefik_plugin",
731+
"metrics": []map[string]interface{}{
732+
{
733+
"items": []map[string]interface{}{
734+
{
735+
"name": "dropped",
736+
"value": currentCount,
737+
"unit": "request",
738+
"labels": map[string]string{
739+
"type": "traefik_plugin",
740+
},
741+
},
742+
},
743+
"meta": map[string]interface{}{
744+
"window_size_seconds": windowSizeSeconds,
745+
"utc_now_timestamp": now.Unix(),
746+
},
747+
},
748+
},
749+
"utc_startup_timestamp": time.Now().Unix(),
750+
"feature_flags": []string{},
751+
"os": map[string]string{
752+
"name": "unknown",
753+
"version": "unknown",
754+
},
755+
},
756+
},
757+
}
758+
759+
data, err := json.Marshal(metrics)
760+
if err != nil {
761+
return fmt.Errorf("reportMetrics:marshal %w", err)
762+
}
763+
764+
metricsURL := url.URL{
765+
Scheme: bouncer.crowdsecScheme,
766+
Host: bouncer.crowdsecHost,
767+
Path: bouncer.crowdsecPath + crowdsecLapiMetricsRoute,
768+
}
769+
770+
_, err = crowdsecQuery(bouncer, metricsURL.String(), data)
771+
if err != nil {
772+
return fmt.Errorf("reportMetrics:query %w", err)
773+
}
774+
775+
atomic.StoreInt64(&blockedRequests, 0)
776+
lastMetricsPush = now
777+
return nil
778+
}

bouncer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func Test_crowdsecQuery(t *testing.T) {
163163
type args struct {
164164
bouncer *Bouncer
165165
stringURL string
166-
isPost bool
166+
data []byte
167167
}
168168
tests := []struct {
169169
name string
@@ -175,7 +175,7 @@ func Test_crowdsecQuery(t *testing.T) {
175175
}
176176
for _, tt := range tests {
177177
t.Run(tt.name, func(t *testing.T) {
178-
got, err := crowdsecQuery(tt.args.bouncer, tt.args.stringURL, tt.args.isPost)
178+
got, err := crowdsecQuery(tt.args.bouncer, tt.args.stringURL, tt.args.data)
179179
if (err != nil) != tt.wantErr {
180180
t.Errorf("crowdsecQuery() error = %v, wantErr %v", err, tt.wantErr)
181181
return

0 commit comments

Comments
 (0)