1
- # resource_watcher.py
2
-
3
1
import logging
4
2
import threading
5
3
import queue
@@ -33,44 +31,44 @@ def __init__(self, k8s_client, topology_manager, event_logger):
33
31
self .k8s_client = k8s_client
34
32
self .topology = topology_manager
35
33
self .event_logger = event_logger
36
-
34
+
37
35
self .watch_threads : Dict [str , threading .Thread ] = {}
38
36
self .resource_versions : Dict [str , str ] = {}
39
37
40
38
self .stop_event = threading .Event ()
41
-
39
+
42
40
# Bounded queue to prevent unbounded growth
43
41
self .event_queue = queue .Queue (maxsize = 10000 )
44
-
42
+
45
43
self .processor_thread = None
46
-
44
+
47
45
# Tuning parameters for batch refresh:
48
46
self .REFRESH_EVENT_COUNT = 100 # e.g., refresh after 100 relevant events
49
47
self .REFRESH_TIME_LIMIT = 30.0 # or 30 seconds
50
48
51
49
def start (self ):
52
50
"""Start the watch threads and event processor."""
53
51
self .stop_event .clear ()
54
-
52
+
55
53
# Start consumer thread to process queued events
56
54
self .processor_thread = threading .Thread (
57
- target = self ._process_events ,
58
- daemon = True ,
55
+ target = self ._process_events ,
56
+ daemon = True ,
59
57
name = "event-processor"
60
58
)
61
59
self .processor_thread .start ()
62
-
60
+
63
61
# Start watches for each resource type
64
62
self ._start_resource_watches ()
65
63
66
64
def stop (self ):
67
65
"""Stop all watch threads and gracefully shut down the processor."""
68
66
self .logger .info ("Stopping resource watcher..." )
69
67
self .stop_event .set ()
70
-
68
+
71
69
# Put sentinel in the queue so _process_events will exit
72
70
self .event_queue .put (None )
73
-
71
+
74
72
# Join the processor thread
75
73
if self .processor_thread :
76
74
self .processor_thread .join (timeout = 5 )
@@ -94,12 +92,12 @@ def _start_resource_watches(self):
94
92
'Pod' , 'Service' , 'ConfigMap' , 'Secret' , 'PersistentVolumeClaim' ,
95
93
'PersistentVolume' , 'Node' , 'Namespace' , 'ServiceAccount' , 'Endpoints'
96
94
}
97
-
95
+
98
96
# Start core resources first
99
97
for kind in core_resources :
100
98
self ._start_watch ("v1" , kind )
101
99
time .sleep (0.5 ) # small delay to avoid spamming the API
102
-
100
+
103
101
# Then watch API group resources
104
102
for api_version , kind in self .k8s_client .get_api_resources ():
105
103
if kind not in core_resources :
@@ -109,14 +107,14 @@ def _start_resource_watches(self):
109
107
def _start_watch (self , api_version : str , kind : str ):
110
108
"""Start a watch thread for a specific resource type."""
111
109
watch_key = f"{ api_version } /{ kind } "
112
-
110
+
113
111
# Avoid duplicates
114
112
if watch_key in self .watch_threads :
115
113
if self .watch_threads [watch_key ].is_alive ():
116
114
return
117
115
else :
118
116
self .logger .info (f"Restarting dead watch thread for { kind } " )
119
-
117
+
120
118
self .logger .info (f"Starting watch for { kind } " )
121
119
thread = threading .Thread (
122
120
target = self ._watch_resource ,
@@ -142,16 +140,16 @@ def _watch_resource(self, api_version: str, kind: str):
142
140
try :
143
141
list_response = resource .get ()
144
142
resource_version = list_response .metadata .resourceVersion
145
-
143
+
146
144
watch_iter = resource .watch (resource_version = resource_version )
147
145
start_time = time .time ()
148
146
for event in watch_iter :
149
147
if self .stop_event .is_set () or (time .time () - start_time > 3600 ):
150
148
break
151
-
149
+
152
150
event_type = event ['type' ]
153
151
obj = event ['object' ]
154
-
152
+
155
153
# If queue is full, this blocks
156
154
self .event_queue .put ({
157
155
'type' : event_type ,
@@ -188,10 +186,10 @@ def _get_resource_info(self, obj) -> Dict[str, Any]:
188
186
189
187
namespace = getattr (obj .metadata , 'namespace' , '' )
190
188
name = getattr (obj .metadata , 'name' , '' )
191
-
189
+
192
190
# Use the TopologyManager to get a stable ID (pre-existing function)
193
191
stable_id = self .topology ._get_stable_node_id (group , version , obj .kind , namespace , name )
194
-
192
+
195
193
# Extract owners
196
194
owners = []
197
195
if getattr (obj .metadata , 'ownerReferences' , None ):
@@ -201,7 +199,7 @@ def _get_resource_info(self, obj) -> Dict[str, Any]:
201
199
'name' : ref .name ,
202
200
'uid' : ref .uid
203
201
})
204
-
202
+
205
203
return {
206
204
'kind' : obj .kind ,
207
205
'group' : group ,
@@ -216,7 +214,7 @@ def _get_resource_info(self, obj) -> Dict[str, Any]:
216
214
def _process_events (self ):
217
215
"""
218
216
Main loop: pop events from the queue, log them (with ID/UID/owners),
219
- and occasionally refresh topology.
217
+ and occasionally refresh topology.
220
218
"""
221
219
self .logger .info ("Event processing thread started." )
222
220
@@ -232,7 +230,7 @@ def _process_events(self):
232
230
event = self .event_queue .get (timeout = 1.0 )
233
231
except queue .Empty :
234
232
continue
235
-
233
+
236
234
if event is None :
237
235
# Sentinel for shutdown
238
236
break
@@ -264,7 +262,7 @@ def _process_events(self):
264
262
self .logger .debug (f"Triggering refresh_topology() after "
265
263
f"{ events_since_last_refresh } events or { int (elapsed )} s elapsed." )
266
264
self .topology .refresh_topology ()
267
-
265
+
268
266
# Reset counters
269
267
last_refresh_time = now
270
268
events_since_last_refresh = 0
@@ -274,4 +272,4 @@ def _process_events(self):
274
272
finally :
275
273
self .event_queue .task_done ()
276
274
277
- self .logger .info ("Event processing thread exiting." )
275
+ self .logger .info ("Event processing thread exiting." )
0 commit comments