Skip to content

Commit d39005f

Browse files
committed
refactor manager
1 parent 47bb58d commit d39005f

File tree

6 files changed

+1105
-92
lines changed

6 files changed

+1105
-92
lines changed

hitless/example_hooks.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package hitless
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9/internal"
10+
)
11+
12+
// MetricsHook collects metrics about notification processing.
13+
type MetricsHook struct {
14+
NotificationCounts map[string]int64
15+
ProcessingTimes map[string]time.Duration
16+
ErrorCounts map[string]int64
17+
}
18+
19+
// NewMetricsHook creates a new metrics collection hook.
20+
func NewMetricsHook() *MetricsHook {
21+
return &MetricsHook{
22+
NotificationCounts: make(map[string]int64),
23+
ProcessingTimes: make(map[string]time.Duration),
24+
ErrorCounts: make(map[string]int64),
25+
}
26+
}
27+
28+
// PreHook records the start time for processing metrics.
29+
func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
30+
mh.NotificationCounts[notificationType]++
31+
32+
// Store start time in context for duration calculation
33+
startTime := time.Now()
34+
ctx = context.WithValue(ctx, "start_time", startTime)
35+
36+
return notification, true
37+
}
38+
39+
// PostHook records processing completion and any errors.
40+
func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
41+
// Calculate processing duration
42+
if startTime, ok := ctx.Value("start_time").(time.Time); ok {
43+
duration := time.Since(startTime)
44+
mh.ProcessingTimes[notificationType] = duration
45+
}
46+
47+
// Record errors
48+
if result != nil {
49+
mh.ErrorCounts[notificationType]++
50+
}
51+
}
52+
53+
// GetMetrics returns a summary of collected metrics.
54+
func (mh *MetricsHook) GetMetrics() map[string]interface{} {
55+
return map[string]interface{}{
56+
"notification_counts": mh.NotificationCounts,
57+
"processing_times": mh.ProcessingTimes,
58+
"error_counts": mh.ErrorCounts,
59+
}
60+
}
61+
62+
// EndpointRewriteHook rewrites endpoints based on configured rules.
63+
type EndpointRewriteHook struct {
64+
RewriteRules map[string]string // old -> new endpoint mappings
65+
}
66+
67+
// NewEndpointRewriteHook creates a new endpoint rewrite hook.
68+
func NewEndpointRewriteHook(rules map[string]string) *EndpointRewriteHook {
69+
return &EndpointRewriteHook{
70+
RewriteRules: rules,
71+
}
72+
}
73+
74+
// PreHook rewrites endpoints in MOVING notifications based on configured rules.
75+
func (erh *EndpointRewriteHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
76+
if notificationType == NotificationMoving && len(notification) > 3 {
77+
if endpoint, ok := notification[3].(string); ok {
78+
if newEndpoint, exists := erh.RewriteRules[endpoint]; exists {
79+
// Create a copy of the notification with rewritten endpoint
80+
modifiedNotification := make([]interface{}, len(notification))
81+
copy(modifiedNotification, notification)
82+
modifiedNotification[3] = newEndpoint
83+
84+
internal.Logger.Printf(ctx, "hitless: rewriting endpoint %s -> %s", endpoint, newEndpoint)
85+
return modifiedNotification, true
86+
}
87+
}
88+
}
89+
return notification, true
90+
}
91+
92+
// PostHook does nothing for endpoint rewrite hook.
93+
func (erh *EndpointRewriteHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
94+
// No post-processing needed
95+
}
96+
97+
// ThrottleHook limits the rate of notification processing.
98+
type ThrottleHook struct {
99+
MaxNotificationsPerSecond int
100+
lastNotificationTime time.Time
101+
notificationCount int
102+
}
103+
104+
// NewThrottleHook creates a new throttling hook.
105+
func NewThrottleHook(maxPerSecond int) *ThrottleHook {
106+
return &ThrottleHook{
107+
MaxNotificationsPerSecond: maxPerSecond,
108+
lastNotificationTime: time.Now(),
109+
}
110+
}
111+
112+
// PreHook implements rate limiting for notifications.
113+
func (th *ThrottleHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
114+
now := time.Now()
115+
116+
// Reset counter if more than a second has passed
117+
if now.Sub(th.lastNotificationTime) >= time.Second {
118+
th.notificationCount = 0
119+
th.lastNotificationTime = now
120+
}
121+
122+
// Check if we've exceeded the rate limit
123+
if th.notificationCount >= th.MaxNotificationsPerSecond {
124+
internal.Logger.Printf(ctx, "hitless: throttling %s notification (rate limit: %d/sec)",
125+
notificationType, th.MaxNotificationsPerSecond)
126+
return notification, false // Skip processing
127+
}
128+
129+
th.notificationCount++
130+
return notification, true
131+
}
132+
133+
// PostHook does nothing for throttle hook.
134+
func (th *ThrottleHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
135+
// No post-processing needed
136+
}
137+
138+
// ValidationHook validates notification format and content.
139+
type ValidationHook struct {
140+
StrictMode bool
141+
}
142+
143+
// NewValidationHook creates a new validation hook.
144+
func NewValidationHook(strictMode bool) *ValidationHook {
145+
return &ValidationHook{
146+
StrictMode: strictMode,
147+
}
148+
}
149+
150+
// PreHook validates notification format and content.
151+
func (vh *ValidationHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
152+
switch notificationType {
153+
case NotificationMoving:
154+
if len(notification) < 3 {
155+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - insufficient fields")
156+
return notification, false
157+
}
158+
159+
// Validate sequence ID
160+
if seqIDStr, ok := notification[1].(string); ok {
161+
if seqIDStr == "" {
162+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - empty sequence ID")
163+
return notification, false
164+
}
165+
} else {
166+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - sequence ID not a string")
167+
return notification, false
168+
}
169+
170+
// Validate timeout
171+
if timeStr, ok := notification[2].(string); ok {
172+
if timeStr == "" || timeStr == "0" {
173+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - invalid timeout")
174+
return notification, false
175+
}
176+
} else {
177+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - timeout not a string")
178+
return notification, false
179+
}
180+
181+
// In strict mode, validate endpoint format
182+
if vh.StrictMode && len(notification) > 3 {
183+
if endpoint, ok := notification[3].(string); ok && endpoint != "" {
184+
if !strings.Contains(endpoint, ":") {
185+
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification - malformed endpoint: %s", endpoint)
186+
return notification, false
187+
}
188+
}
189+
}
190+
191+
case NotificationMigrating, NotificationMigrated, NotificationFailingOver, NotificationFailedOver:
192+
if len(notification) < 2 {
193+
internal.Logger.Printf(ctx, "hitless: invalid %s notification - insufficient fields", notificationType)
194+
return notification, false
195+
}
196+
}
197+
198+
return notification, true
199+
}
200+
201+
// PostHook does nothing for validation hook.
202+
func (vh *ValidationHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
203+
// No post-processing needed
204+
}
205+
206+
// ExampleUsage demonstrates how to use the hooks and notification types with HitlessManager.
207+
func ExampleUsage() {
208+
// This is just an example - in real usage, you'd have actual client and config
209+
fmt.Println("Example of using hooks and notification types with HitlessManager:")
210+
fmt.Println()
211+
212+
fmt.Println("1. Using notification type constants:")
213+
fmt.Printf(" MOVING: %s\n", NotificationMoving)
214+
fmt.Printf(" MIGRATING: %s\n", NotificationMigrating)
215+
fmt.Printf(" MIGRATED: %s\n", NotificationMigrated)
216+
fmt.Printf(" FAILING_OVER: %s\n", NotificationFailingOver)
217+
fmt.Printf(" FAILED_OVER: %s\n", NotificationFailedOver)
218+
fmt.Println()
219+
220+
fmt.Println("2. Using notification type sets:")
221+
fmt.Println(" // Register handlers for all notification types")
222+
fmt.Println(" manager.RegisterSelectiveHandlers(AllNotificationTypes())")
223+
fmt.Println()
224+
fmt.Println(" // Register handlers only for MOVING notifications")
225+
fmt.Println(" manager.RegisterSelectiveHandlers(MovingOnlyNotifications())")
226+
fmt.Println()
227+
fmt.Println(" // Register handlers only for migration-related notifications")
228+
fmt.Println(" manager.RegisterSelectiveHandlers(MigrationNotifications())")
229+
fmt.Println()
230+
fmt.Println(" // Register handlers only for failover-related notifications")
231+
fmt.Println(" manager.RegisterSelectiveHandlers(FailoverNotifications())")
232+
fmt.Println()
233+
fmt.Println(" // Register handlers for custom set of notifications")
234+
fmt.Println(" customTypes := NewNotificationTypeSet(NotificationMoving, NotificationMigrated)")
235+
fmt.Println(" manager.RegisterSelectiveHandlers(customTypes)")
236+
fmt.Println()
237+
238+
fmt.Println("3. Create hooks:")
239+
fmt.Println(" metricsHook := NewMetricsHook()")
240+
fmt.Println(" rewriteHook := NewEndpointRewriteHook(map[string]string{")
241+
fmt.Println(" \"old-redis:6379\": \"new-redis:6379\",")
242+
fmt.Println(" })")
243+
fmt.Println(" throttleHook := NewThrottleHook(10) // 10 notifications per second")
244+
fmt.Println(" validationHook := NewValidationHook(true) // strict mode")
245+
fmt.Println()
246+
247+
fmt.Println("4. Add hooks to manager:")
248+
fmt.Println(" manager.AddHook(validationHook) // Validate first")
249+
fmt.Println(" manager.AddHook(throttleHook) // Then throttle")
250+
fmt.Println(" manager.AddHook(rewriteHook) // Then rewrite")
251+
fmt.Println(" manager.AddHook(metricsHook) // Finally collect metrics")
252+
fmt.Println()
253+
254+
fmt.Println("5. Hooks will be called in order for each notification:")
255+
fmt.Println(" - ValidationHook validates the notification")
256+
fmt.Println(" - ThrottleHook may skip processing if rate limit exceeded")
257+
fmt.Println(" - EndpointRewriteHook may modify the endpoint")
258+
fmt.Println(" - MetricsHook collects processing statistics")
259+
fmt.Println()
260+
261+
fmt.Println("6. Remove hooks when no longer needed:")
262+
fmt.Println(" manager.RemoveHook(throttleHook)")
263+
}

0 commit comments

Comments
 (0)