Skip to content

Commit c602312

Browse files
IdansPortmatan84shalev007
authored
[Core] feat: implement core change for group queue (#1936)
### **User description** # Description Why Problem: GitHub webhook events for the same resource (like a PR) were getting processed out of order when multiple workers ran concurrently. This broke temporal consistency - you'd get a "PR closed" event processed before "PR updated", leading to stale data. Solution: Group events by resource ID and ensure only one worker processes events for any given resource at a time, while still allowing parallel processing across different resources. What Three core changes: Group-based queuing: Added GroupQueue that partitions events by group_id and locks groups during processing Resource identification: Created group_selector.py to extract consistent resource IDs from GitHub webhook payloads Multi-worker coordination: Modified processor manager to spawn multiple workers that respect group locks How Resource Identification: Extract consistent IDs from GitHub events (e.g., pull_request-123, issue-456) to group related events together. Group Queue: Queue implementation that enforces sequential processing within groups while allowing concurrent processing across groups. Workers lock groups on get() and unlock on commit(). Worker Pool: Multiple workers per webhook path, each pulling from the shared group queue. Integration automatically chooses group-aware or simple processing based on worker count configuration. Result: Events for the same resource process in order, different resources process in parallel. ## Type of change Please leave one option from the following and delete the rest: - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [x] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Overview of the flow: <img width="841" height="760" alt="image" src="https://github.com/user-attachments/assets/ad8b39db-530f-4e43-9e82-66f41dbce66b" /> Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. ___ ### **PR Type** Enhancement ___ ### **Description** - Implement group-based queue for ordered event processing - Add multi-worker support with group locking mechanism - Ensure sequential processing within resource groups - Enable parallel processing across different resource groups ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Webhook Events"] --> B["GroupQueue"] B --> C["Group Selector"] C --> D["Resource Groups"] D --> E["Worker Pool"] E --> F["Sequential Processing per Group"] E --> G["Parallel Processing across Groups"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Configuration changes</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>settings.py</strong><dd><code>Add event workers configuration setting</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-c4ab144128bdbbe50dd95b726ae9c2f4cbc7306025e07a9e1db3dff7c5d4b3bb">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>6 files</summary><table> <tr> <td><strong>__init__.py</strong><dd><code>Export GroupQueue class</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-c9170904db583a716fd64819353c6319e176ef9dff07e4355d491a410627efa5">+2/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>abstract_queue.py</strong><dd><code>Add size method to queue interface</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-c5f159796d2303aac77a0311758cafb17185c6b2ac983d3d6519754bb163ae5f">+8/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>group_queue.py</strong><dd><code>Implement group-based queue with worker coordination</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-8d8b3ee228422ff5eb26b7221858028a7bf52eb77712ffd81cf13daf7f7e619b">+156/-0</a>&nbsp; </td> </tr> <tr> <td><strong>local_queue.py</strong><dd><code>Implement size method for local queue</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-334e57aa010cbd80d458171ffd86093c6076ef97648f4d26ad8db9ea05b133e7">+3/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>processor_manager.py</strong><dd><code>Replace single processor with multi-worker system</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-58a2a8bcf5e453cdb2b298a8edcb716c3c3454bb37eed3d6fb3c0c88b50e8b6d">+68/-70</a>&nbsp; </td> </tr> <tr> <td><strong>webhook_event.py</strong><dd><code>Add group_id field to webhook events</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-b731c7033797cd3ff458c104fde99743a6ec09baa1c2da550975f1e383c3143e">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Tests</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>test_group_queue.py</strong><dd><code>Comprehensive test suite for group queue</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-c6c81784c82c06f64808e8e6b211361fa4e24c31de973c7c9bc7ac544bbb1f26">+580/-0</a>&nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Documentation</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>CHANGELOG.md</strong><dd><code>Document parallel queue implementation improvement</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-06572a96a58dc510037d5efa622f9bec8519bc1beab13c9f251e97e657a9d4ed">+5/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Miscellaneous</strong></td><td><details><summary>1 files</summary><table> <tr> <td><strong>pyproject.toml</strong><dd><code>Bump version to 0.26.2</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/port-labs/ocean/pull/1936/files#diff-50c86b7ed8ac2cf95bd48334961bf0530cdc77b5a56f852c5c61b89d735fd711">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr></tr></tbody></table> </details> ___ --------- Co-authored-by: Matan <51418643+matan84@users.noreply.github.com> Co-authored-by: Shalev Avhar <51760613+shalev007@users.noreply.github.com>
1 parent 805b3c9 commit c602312

File tree

10 files changed

+950
-80
lines changed

10 files changed

+950
-80
lines changed

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
<!-- towncrier release notes start -->
9+
## 0.27.0 (2025-08-03)
10+
### Improvements
11+
12+
- Enhanced webhook event processing with GroupQueue implementation
13+
14+
Introduced GroupQueue to ensure exclusive processing per group while allowing parallel processing across different groups
15+
Multiple workers can now process webhook events from different groups concurrently, improving throughput
16+
FIFO ordering is maintained within each group to preserve event sequence integrity
17+
Added automatic lock timeout mechanism to recover from frozen or hung workers
18+
Implemented context-based group tracking using ContextVar for cleaner worker-to-group association
19+
20+
- Performance optimizations
21+
Configurable number of workers per webhook path (event_workers_count)
22+
Reduced contention by allowing concurrent processing of independent groups
23+
Improved resource cleanup and state management after processing
24+
925
## 0.26.3 (2025-08-04)
1026

1127
### Bug Fixes

port_ocean/config/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from port_ocean.config.base import BaseOceanModel, BaseOceanSettings
1111
from port_ocean.core.event_listener import EventListenerSettingsType
12+
1213
from port_ocean.core.models import (
1314
CachingStorageMode,
1415
CreatePortResourcesOrigin,
@@ -88,6 +89,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
8889
event_listener: EventListenerSettingsType = Field(
8990
default=cast(EventListenerSettingsType, {"type": "POLLING"})
9091
)
92+
event_workers_count: int = 1
9193
# If an identifier or type is not provided, it will be generated based on the integration name
9294
integration: IntegrationSettings = Field(
9395
default_factory=lambda: IntegrationSettings(type="", identifier="")
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from .abstract_queue import AbstractQueue
22
from .local_queue import LocalQueue
3+
from .group_queue import GroupQueue
34

4-
__all__ = ["AbstractQueue", "LocalQueue"]
5+
__all__ = ["AbstractQueue", "LocalQueue", "GroupQueue"]

port_ocean/core/handlers/queue/abstract_queue.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
class AbstractQueue(ABC, Generic[T]):
88
"""Abstract interface for queues"""
99

10+
def __init__(self, name: str | None = None):
11+
pass
12+
1013
@abstractmethod
1114
async def put(self, item: T) -> None:
1215
"""Put an item into the queue"""
@@ -22,6 +25,11 @@ async def teardown(self) -> None:
2225
"""Wait for all items to be processed"""
2326
pass
2427

28+
@abstractmethod
29+
async def size(self) -> int:
30+
"""Size of the queue"""
31+
pass
32+
2533
@abstractmethod
2634
async def commit(self) -> None:
2735
"""Mark item as processed"""
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import asyncio
2+
from collections import defaultdict, deque
3+
import time
4+
from typing import Deque, Dict, Optional, Set, TypeVar, Any
5+
from contextvars import ContextVar
6+
7+
from loguru import logger
8+
9+
from .abstract_queue import AbstractQueue
10+
11+
T = TypeVar("T")
12+
MaybeStr = str | None
13+
14+
_NO_GROUP = object()
15+
_current_group: ContextVar[Any] = ContextVar("current_group", default=_NO_GROUP)
16+
17+
18+
class GroupQueue(AbstractQueue[T]):
19+
"""Queue with exclusive processing per group."""
20+
21+
def __init__(
22+
self,
23+
group_key: MaybeStr = None,
24+
name: MaybeStr = None,
25+
lock_timeout: float = 300,
26+
):
27+
super().__init__(name)
28+
self.group_key = group_key
29+
self._queues: Dict[MaybeStr, Deque[T]] = defaultdict(deque)
30+
self._locked: Set[MaybeStr] = set()
31+
self._queue_not_empty = asyncio.Condition()
32+
self.lock_timeout = lock_timeout
33+
self._lock_timestamps: Dict[MaybeStr, float] = {}
34+
self._timeout_task: Optional[asyncio.Task[None]] = None
35+
36+
async def _background_timeout_check(self) -> None:
37+
"""Periodically release locks that have timed out."""
38+
while True:
39+
try:
40+
await asyncio.sleep(self.lock_timeout / 4)
41+
async with self._queue_not_empty:
42+
await self._release_expired_locks()
43+
except asyncio.CancelledError:
44+
break
45+
46+
def _extract_group_key(self, item: T) -> MaybeStr:
47+
"""Extract the group key from an item."""
48+
if self.group_key is None:
49+
return None
50+
if not hasattr(item, self.group_key):
51+
raise ValueError(
52+
f"Item {item!r} lacks attribute '{self.group_key}' required for grouping"
53+
)
54+
return getattr(item, self.group_key)
55+
56+
async def put(self, item: T) -> None:
57+
"""Add item to its group's queue."""
58+
group_key = self._extract_group_key(item)
59+
async with self._queue_not_empty:
60+
self._queues[group_key].append(item)
61+
self._queue_not_empty.notify_all()
62+
63+
async def _release_expired_locks(self) -> None:
64+
"""Release locks that have exceeded the timeout."""
65+
now = time.time()
66+
expired_groups = []
67+
68+
for group, timestamp in list(self._lock_timestamps.items()):
69+
if now - timestamp > self.lock_timeout:
70+
expired_groups.append(group)
71+
logger.warning(f"Releasing expired lock for group {group}")
72+
self._locked.discard(group)
73+
del self._lock_timestamps[group]
74+
75+
if expired_groups:
76+
self._queue_not_empty.notify_all()
77+
78+
async def get(self) -> T:
79+
"""Get the next item from an unlocked group, locking that group."""
80+
if self._timeout_task is None or self._timeout_task.done():
81+
self._timeout_task = asyncio.create_task(self._background_timeout_check())
82+
83+
async with self._queue_not_empty:
84+
while True:
85+
await self._release_expired_locks()
86+
87+
for group, queue in self._queues.items():
88+
if queue and group not in self._locked:
89+
self._locked.add(group)
90+
self._lock_timestamps[group] = time.time()
91+
_current_group.set(group)
92+
return queue[0]
93+
94+
await self._queue_not_empty.wait()
95+
96+
async def commit(self) -> None:
97+
"""Remove the current item and unlock its group."""
98+
group = _current_group.get()
99+
if group is _NO_GROUP:
100+
logger.warning("commit() called without active get()")
101+
return
102+
103+
async with self._queue_not_empty:
104+
queue = self._queues.get(group)
105+
if queue:
106+
queue.popleft()
107+
if not queue:
108+
del self._queues[group]
109+
110+
self._locked.discard(group)
111+
self._lock_timestamps.pop(group, None)
112+
_current_group.set(_NO_GROUP)
113+
self._queue_not_empty.notify_all()
114+
115+
async def teardown(self) -> None:
116+
"""Wait until all queues are empty and no groups are locked."""
117+
async with self._queue_not_empty:
118+
while any(self._queues.values()) or self._locked:
119+
await self._queue_not_empty.wait()
120+
121+
if self._timeout_task and not self._timeout_task.done():
122+
self._timeout_task.cancel()
123+
try:
124+
await self._timeout_task
125+
except asyncio.CancelledError:
126+
pass
127+
128+
async def size(self) -> int:
129+
"""Return total number of items across all groups."""
130+
async with self._queue_not_empty:
131+
return sum(len(queue) for queue in self._queues.values())
132+
133+
async def force_unlock_all(self) -> None:
134+
"""Force unlock all groups."""
135+
async with self._queue_not_empty:
136+
self._locked.clear()
137+
self._lock_timestamps.clear()
138+
self._queue_not_empty.notify_all()

port_ocean/core/handlers/queue/local_queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ async def teardown(self) -> None:
2323

2424
async def commit(self) -> None:
2525
self._queue.task_done()
26+
27+
async def size(self) -> int:
28+
return self._queue.qsize()

0 commit comments

Comments
 (0)