Skip to content

Commit 0ee87a8

Browse files
Fix timeout on async locks
1 parent 7cca67f commit 0ee87a8

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

nebula/core/aggregation/aggregator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from abc import ABC, abstractmethod
34
from functools import partial
@@ -208,6 +209,10 @@ async def get_aggregation(self):
208209
await self._aggregation_done_lock.acquire_async(timeout=timeout)
209210
except TimeoutError:
210211
logging.exception("🔄 get_aggregation | Timeout reached for aggregation")
212+
except asyncio.CancelledError:
213+
logging.exception("🔄 get_aggregation | Lock acquisition was cancelled")
214+
except Exception as e:
215+
logging.exception(f"🔄 get_aggregation | Error acquiring lock: {e}")
211216
finally:
212217
await self._aggregation_done_lock.release_async()
213218

nebula/core/utils/locker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,13 @@ async def acquire_async(self, *args, **kwargs):
7070
)
7171
else:
7272
logging.debug(f"🔒 Acquiring async lock [{self._name}] from {filename}:{lineno}")
73-
await self._lock.acquire(*args, **kwargs)
73+
if "timeout" in kwargs:
74+
try:
75+
await asyncio.wait_for(self._lock.acquire(), timeout=kwargs["timeout"])
76+
except Exception as e:
77+
raise e
78+
else:
79+
await self._lock.acquire()
7480

7581
async def release_async(self, *args, **kwargs):
7682
caller = inspect.stack()[1]

nebula/frontend/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import argparse
22
import asyncio
33
import datetime
4-
from dotenv import load_dotenv
54
import io
65
import json
76
import logging
@@ -12,6 +11,7 @@
1211
from urllib.parse import urlencode
1312

1413
import requests
14+
from dotenv import load_dotenv
1515

1616
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
1717
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".."))

0 commit comments

Comments
 (0)