Skip to content

Commit 8522c7d

Browse files
improve error management in async functions
1 parent 9c87b20 commit 8522c7d

File tree

4 files changed

+74
-48
lines changed

4 files changed

+74
-48
lines changed

nebula/core/engine.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,11 @@ async def shutdown(self):
873873
# Wait for tasks to complete naturally with shorter timeout
874874
try:
875875
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=3)
876+
except asyncio.CancelledError:
877+
logging.warning(
878+
"Timeout reached during task cleanup (CancelledError); proceeding with shutdown anyway."
879+
)
880+
# Do not re-raise, just continue
876881
except TimeoutError:
877882
logging.warning("Some tasks did not complete in time, forcing cancellation...")
878883
for task in tasks:
@@ -881,9 +886,13 @@ async def shutdown(self):
881886
# Wait a bit more for cancellations to take effect
882887
try:
883888
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=2)
889+
except asyncio.CancelledError:
890+
logging.warning(
891+
"Timeout reached during forced cancellation (CancelledError); proceeding with shutdown anyway."
892+
)
893+
# Do not re-raise, just continue
884894
except TimeoutError:
885895
logging.warning("Some tasks still not responding to cancellation")
886-
887896
# Final aggressive cleanup - cancel all remaining tasks
888897
remaining_tasks = [
889898
t for t in asyncio.all_tasks() if t is not asyncio.current_task() and not t.done()
@@ -894,8 +903,17 @@ async def shutdown(self):
894903
task.cancel()
895904
try:
896905
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=1)
906+
except asyncio.CancelledError:
907+
logging.warning(
908+
"Timeout reached during final forced cancellation (CancelledError); proceeding with shutdown anyway."
909+
)
910+
# Do not re-raise, just continue
897911
except TimeoutError:
898912
logging.exception("Some tasks still not responding to forced cancellation")
913+
# Proceed anyway after all cancellation attempts
914+
logging.warning("Proceeding with shutdown even if some tasks are still pending/cancelled.")
915+
else:
916+
logging.info("No remaining tasks to clean up.")
899917

900918
logging.info("✅ Engine shutdown complete")
901919

nebula/core/network/connection.py

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,23 @@ async def _monitor_inactivity(self):
169169
periodically checking if the last activity exceeds the inactivity threshold.
170170
If inactive, marks the connection as inactive and logs a warning.
171171
"""
172-
while await self.is_running():
173-
if self.direct:
174-
break
175-
await asyncio.sleep(self.INACTIVITY_DAEMON_SLEEP_TIME)
176-
async with self._activity_lock:
177-
time_since_last = time.time() - self._last_activity
178-
if time_since_last > self.INACTIVITY_TIMER:
179-
if not self._inactivity:
180-
self._inactivity = True
181-
logging.info(f"[{self}] Connection marked as inactive.")
182-
else:
183-
if self._inactivity:
184-
self._inactivity = False
172+
try:
173+
while await self.is_running():
174+
if self.direct:
175+
break
176+
await asyncio.sleep(self.INACTIVITY_DAEMON_SLEEP_TIME)
177+
async with self._activity_lock:
178+
time_since_last = time.time() - self._last_activity
179+
if time_since_last > self.INACTIVITY_TIMER:
180+
if not self._inactivity:
181+
self._inactivity = True
182+
logging.info(f"[{self}] Connection marked as inactive.")
183+
else:
184+
if self._inactivity:
185+
self._inactivity = False
186+
except asyncio.CancelledError:
187+
logging.info("_monitor_inactivity cancelled during shutdown.")
188+
return
185189

186190
def get_federated_round(self):
187191
return self.federated_round
@@ -482,21 +486,19 @@ async def handle_incoming_message(self) -> None:
482486
try:
483487
while await self.is_running():
484488
if self.pending_messages_queue.full():
485-
await asyncio.sleep(0.1) # Wait a bit if the queue is full to create backpressure
489+
await asyncio.sleep(0.1)
486490
continue
487491
header = await self._read_exactly(self.HEADER_SIZE)
488492
message_id, chunk_index, is_last_chunk = self._parse_header(header)
489-
490493
chunk_data = await self._read_chunk(reusable_buffer)
491494
await self._update_activity()
492495
self._store_chunk(message_id, chunk_index, chunk_data, is_last_chunk)
493-
# logging.debug(f"Received chunk {chunk_index} of message {message_id.hex()} | size: {len(chunk_data)} bytes")
494-
# Active connection without fails
495496
self.incompleted_reconnections = 0
496497
if is_last_chunk:
497498
await self._process_complete_message(message_id)
498-
except asyncio.CancelledError as e:
499-
logging.exception(f"Message handling cancelled: {e}")
499+
except asyncio.CancelledError:
500+
logging.info("handle_incoming_message cancelled during shutdown.")
501+
return
500502
except ConnectionError as e:
501503
logging.exception(f"Connection closed while reading: {e}")
502504
except Exception as e:
@@ -678,27 +680,23 @@ def _decompress(self, data: bytes, compression: str) -> bytes | None:
678680
async def process_message_queue(self) -> None:
679681
"""
680682
Continuously processes messages from the pending queue.
681-
682-
Behavior:
683-
- Retrieves messages from the queue one by one.
684-
- Delegates the message to the appropriate handler based on its type.
685-
- Ensures the queue is marked as processed.
686-
687-
Notes:
688-
Runs indefinitely unless externally cancelled or stopped.
689683
"""
690-
while await self.is_running():
691-
try:
692-
if self.pending_messages_queue is None:
693-
logging.error("Pending messages queue is not initialized")
694-
return
695-
data_type_prefix, message = await self.pending_messages_queue.get()
696-
await self._handle_message(data_type_prefix, message)
697-
self.pending_messages_queue.task_done()
698-
except Exception as e:
699-
logging.exception(f"Error processing message queue: {e}")
700-
finally:
701-
await asyncio.sleep(0)
684+
try:
685+
while await self.is_running():
686+
try:
687+
if self.pending_messages_queue is None:
688+
logging.error("Pending messages queue is not initialized")
689+
return
690+
data_type_prefix, message = await self.pending_messages_queue.get()
691+
await self._handle_message(data_type_prefix, message)
692+
self.pending_messages_queue.task_done()
693+
except Exception as e:
694+
logging.exception(f"Error processing message queue: {e}")
695+
finally:
696+
await asyncio.sleep(0)
697+
except asyncio.CancelledError:
698+
logging.info("process_message_queue cancelled during shutdown.")
699+
return
702700

703701
async def _handle_message(self, data_type_prefix: bytes, message: bytes) -> None:
704702
"""

nebula/core/network/forwarder.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,17 @@ async def run_forwarder(self):
7575
if self.config.participant["scenario_args"]["federation"] == "CFL":
7676
logging.info("🔁 Federation is CFL. Forwarder is disabled...")
7777
return
78-
while await self.is_running():
79-
# logging.debug(f"🔁 Pending messages: {self.pending_messages.qsize()}")
80-
start_time = time.time()
81-
await self.pending_messages_lock.acquire_async()
82-
await self.process_pending_messages(messages_left=self.number_forwarded_messages)
83-
await self.pending_messages_lock.release_async()
84-
sleep_time = max(0, self.interval - (time.time() - start_time))
85-
await asyncio.sleep(sleep_time)
78+
try:
79+
while await self.is_running():
80+
start_time = time.time()
81+
await self.pending_messages_lock.acquire_async()
82+
await self.process_pending_messages(messages_left=self.number_forwarded_messages)
83+
await self.pending_messages_lock.release_async()
84+
sleep_time = max(0, self.interval - (time.time() - start_time))
85+
await asyncio.sleep(sleep_time)
86+
except asyncio.CancelledError:
87+
logging.info("run_forwarder cancelled during shutdown.")
88+
return
8689

8790
async def stop(self):
8891
self._running.clear()

nebula/core/node.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ def randomize_value(value, variability):
235235
if node.cm is not None:
236236
await node.cm.network_wait()
237237

238+
# Ensure shutdown is always called and awaited before main() returns
239+
if hasattr(node, "shutdown") and callable(node.shutdown):
240+
logging.info("Calling node.shutdown() for final cleanup and Docker removal...")
241+
await node.shutdown()
242+
else:
243+
logging.warning("Node does not have a shutdown() method; skipping explicit shutdown.")
244+
238245

239246
if __name__ == "__main__":
240247
config_path = str(sys.argv[1])

0 commit comments

Comments
 (0)