Skip to content

Commit 1088ebf

Browse files
ivankalinovskiIvan Kalinovski
andauthored
[Core] Yield items to parse (#1973)
### **User description** # Description What - Move the items to parse logic to be yielded from the generator to support ocean's logic. Why - Causes OOM How - get items to parse when the batches are yielded. ## Type of change Please leave one option from the following and delete the rest: - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync finishes successfully - [x] Resync able to create entities - [x] Resync able to update entities - [x] Resync able to detect and delete entities - [x] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. ___ ### **PR Type** Enhancement ___ ### **Description** - Add configuration option to yield items to parse in batches - Move items parsing logic from entity processor to generator wrapper - Support configurable batch size for yielded items ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Configuration"] --> B["Generator Wrapper"] B --> C["Items Parsing Logic"] C --> D["Batch Processing"] D --> E["Yielded Results"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Configuration changes</strong></td><td><table> <tr> <td> <details> <summary><strong>settings.py</strong><dd><code>Add yield items configuration options</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/config/settings.py <ul><li>Add <code>yield_items_to_parse</code> boolean configuration flag<br> <li> Add <code>yield_items_to_parse_batch_size</code> integer configuration with default <br>value 10</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1973/files#diff-c4ab144128bdbbe50dd95b726ae9c2f4cbc7306025e07a9e1db3dff7c5d4b3bb">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>jq_entity_processor.py</strong><dd><code>Conditionally disable items parsing in entity processor</code>&nbsp; &nbsp; </dd></summary> <hr> port_ocean/core/handlers/entity_processor/jq_entity_processor.py <ul><li>Wrap existing items parsing logic with configuration check<br> <li> Skip items parsing when <code>yield_items_to_parse</code> is enabled</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1973/files#diff-28d9b2ac4a86bc467437b181a2035101bdf8c0f308cc7b73652857fe462667ae">+10/-9</a>&nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>utils.py</strong><dd><code>Implement yielding items to parse with batching</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/core/integrations/mixins/utils.py <ul><li>Add imports for JQEntityProcessor and ResourceConfig<br> <li> Implement items parsing logic in generator wrapper when yielding <br>enabled<br> <li> Add batch processing for yielded items with configurable batch size<br> <li> Handle error cases for invalid items parsing results</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1973/files#diff-7459ee67d5bdb7c3d4f122b015af9402d100aa32eee18c957c195867caae82d0">+27/-2</a>&nbsp; &nbsp; </td> </tr> </table></td></tr></tr></tbody></table> </details> ___ --------- Co-authored-by: Ivan Kalinovski <ivankalinovski@ivan.k-laptop>
1 parent 7b794a5 commit 1088ebf

File tree

6 files changed

+47
-15
lines changed

6 files changed

+47
-15
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
<!-- towncrier release notes start -->
9-
## 0.27.1 (2025-08-07)
9+
## 0.27.2 (2025-08-07)
10+
11+
### Bug Fixes
12+
13+
- Move the items to parse logic to be yielded from the generator to support ocean's logic.
14+
15+
## 0.27.1 (2025-08-10)
1016

1117
### Bug Fixes
1218

port_ocean/config/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
111111
upsert_entities_batch_max_length: int = 20
112112
upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024
113113
lakehouse_enabled: bool = False
114+
yield_items_to_parse: bool = False
115+
yield_items_to_parse_batch_size: int = 10
114116

115117
@validator("process_execution_mode")
116118
def validate_process_execution_mode(

port_ocean/core/handlers/entity_processor/jq_entity_processor.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,15 +246,16 @@ async def _calculate_entity(
246246
parse_all: bool = False,
247247
) -> tuple[list[MappedEntity], list[Exception]]:
248248
raw_data = [data.copy()]
249-
if items_to_parse:
250-
items = await self._search(data, items_to_parse)
251-
if not isinstance(items, list):
252-
logger.warning(
253-
f"Failed to parse items for JQ expression {items_to_parse}, Expected list but got {type(items)}."
254-
f" Skipping..."
255-
)
256-
return [], []
257-
raw_data = [{"item": item, **data} for item in items]
249+
if not ocean.config.yield_items_to_parse:
250+
if items_to_parse:
251+
items = await self._search(data, items_to_parse)
252+
if not isinstance(items, list):
253+
logger.warning(
254+
f"Failed to parse items for JQ expression {items_to_parse}, Expected list but got {type(items)}."
255+
f" Skipping..."
256+
)
257+
return [], []
258+
raw_data = [{"item": item, **data} for item in items]
258259

259260
entities, errors = await gather_and_split_errors_from_results(
260261
[

port_ocean/core/integrations/mixins/sync_raw.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ async def _execute_resync_tasks(
116116
logger.info(
117117
f"Found async generator function for {resource_config.kind} name: {task.__qualname__}"
118118
)
119-
results.append(resync_generator_wrapper(task, resource_config.kind))
119+
results.append(resync_generator_wrapper(task, resource_config.kind,resource_config.port.items_to_parse))
120120
else:
121121
logger.info(
122122
f"Found sync function for {resource_config.kind} name: {task.__qualname__}"

port_ocean/core/integrations/mixins/utils.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from contextlib import contextmanager
2-
from typing import Awaitable, Generator, Callable
2+
from typing import Awaitable, Generator, Callable, cast
33

44
from loguru import logger
55

66
import asyncio
77
import multiprocessing
88

9+
from port_ocean.core.handlers.entity_processor.jq_entity_processor import JQEntityProcessor
10+
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
911
from port_ocean.core.ocean_types import (
1012
ASYNC_GENERATOR_RESYNC_TYPE,
1113
RAW_RESULT,
@@ -49,7 +51,7 @@ async def resync_function_wrapper(
4951

5052

5153
async def resync_generator_wrapper(
52-
fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str
54+
fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str, items_to_parse: str | None = None
5355
) -> ASYNC_GENERATOR_RESYNC_TYPE:
5456
generator = fn(kind)
5557
errors = []
@@ -58,7 +60,28 @@ async def resync_generator_wrapper(
5860
try:
5961
with resync_error_handling():
6062
result = await anext(generator)
61-
yield validate_result(result)
63+
if not ocean.config.yield_items_to_parse:
64+
yield validate_result(result)
65+
else:
66+
batch_size = ocean.config.yield_items_to_parse_batch_size
67+
if items_to_parse:
68+
for data in result:
69+
items = await cast(JQEntityProcessor, ocean.app.integration.entity_processor)._search(data, items_to_parse)
70+
if not isinstance(items, list):
71+
logger.warning(
72+
f"Failed to parse items for JQ expression {items_to_parse}, Expected list but got {type(items)}."
73+
f" Skipping..."
74+
)
75+
yield []
76+
raw_data = [{"item": item, **data} for item in items]
77+
while True:
78+
raw_data_batch = raw_data[:batch_size]
79+
yield raw_data_batch
80+
raw_data = raw_data[batch_size:]
81+
if len(raw_data) == 0:
82+
break
83+
else:
84+
yield validate_result(result)
6285
except OceanAbortException as error:
6386
errors.append(error)
6487
ocean.metrics.inc_metric(

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "port-ocean"
3-
version = "0.27.1"
3+
version = "0.27.2"
44
description = "Port Ocean is a CLI tool for managing your Port projects."
55
readme = "README.md"
66
homepage = "https://app.getport.io"

0 commit comments

Comments
 (0)