Skip to content

Commit f6005e7

Browse files
[Core] Port 14266 ocean to use new bulk inserts route (#1742)
### **User description** # Description What - release bulk upserts to all ocean integrations Why - infra improvments How - remove the is saas if ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. ___ ### **PR Type** Enhancement, Tests ___ ### **Description** - Refactored entity upsert batching to always use bulk route - Removed SaaS-specific conditional logic for bulk upserts - Unified batching logic for all environments - Improved error handling for batch upserts - Removed `bulk_upserts_enabled` feature flag from configuration and tests - Simplified configuration by eliminating obsolete flag - Updated tests to reflect new default behavior ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>entities.py</strong><dd><code>Always use bulk upsert batching and unify error handling</code>&nbsp; </dd></summary> <hr> port_ocean/clients/port/mixins/entities.py <li>Removed SaaS-only check for bulk upserts; now always uses bulk <br>batching<br> <li> Unified batching and error handling logic for all environments<br> <li> Improved fallback for 413 errors by retrying entities individually<br> <li> Cleaned up legacy conditional branches </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1742/files#diff-1cd3dc7778947127c84baab24278808bee9c4ded96b1c320e28ece0f4d55ba7f">+50/-56</a>&nbsp; </td> </tr> </table></td></tr><tr><td><strong>Configuration changes</strong></td><td><table> <tr> <td> <details> <summary><strong>settings.py</strong><dd><code>Remove bulk upserts feature flag from configuration</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/config/settings.py <li>Removed <code>bulk_upserts_enabled</code> feature flag from configuration<br> <li> Cleaned up related configuration code </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1742/files#diff-c4ab144128bdbbe50dd95b726ae9c2f4cbc7306025e07a9e1db3dff7c5d4b3bb">+0/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>test_sync_raw.py</strong><dd><code>Remove obsolete bulk upserts flag from tests</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/tests/core/handlers/mixins/test_sync_raw.py <li>Removed test configuration of <code>bulk_upserts_enabled</code> flag<br> <li> Updated tests to match new default bulk upsert behavior </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1742/files#diff-0530b15484dd5afce1e42bb43fe26cd56639f7302e997400c173a0df0a636eae">+0/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr></tr></tbody></table> ___ > <details> <summary> Need help?</summary><li>Type <code>/help how to ...</code> in the comments thread for any questions about Qodo Merge usage.</li><li>Check out the <a href="https://qodo-merge-docs.qodo.ai/usage-guide/">documentation</a> for more information.</li></details> --------- Co-authored-by: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com>
1 parent 086a04c commit f6005e7

File tree

5 files changed

+56
-61
lines changed

5 files changed

+56
-61
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
<!-- towncrier release notes start -->
99

10+
## 0.24.3 (2025-06-08)
11+
12+
### Improvements
13+
- Using Port bulk upserts api in resyncs in all Ocean.
14+
1015
## 0.24.2 (2025-06-04)
1116

1217
### Improvements

port_ocean/clients/port/mixins/entities.py

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -353,66 +353,60 @@ async def upsert_entities_in_batches(
353353
entities_results: list[tuple[bool, Entity]] = []
354354
blueprint = entities[0].blueprint
355355

356-
if ocean.app.is_saas():
357-
bulk_size = self.calculate_entities_batch_size(entities)
358-
bulks = [
359-
entities[i : i + bulk_size] for i in range(0, len(entities), bulk_size)
360-
]
361-
362-
bulk_results = await asyncio.gather(
363-
*(
364-
self.upsert_entities_bulk(
365-
blueprint,
366-
bulk,
367-
request_options,
368-
user_agent_type,
369-
should_raise=should_raise,
370-
)
371-
for bulk in bulks
372-
),
373-
return_exceptions=True,
374-
)
356+
bulk_size = self.calculate_entities_batch_size(entities)
357+
bulks = [
358+
entities[i : i + bulk_size] for i in range(0, len(entities), bulk_size)
359+
]
360+
361+
bulk_results = await asyncio.gather(
362+
*(
363+
self.upsert_entities_bulk(
364+
blueprint,
365+
bulk,
366+
request_options,
367+
user_agent_type,
368+
should_raise=should_raise,
369+
)
370+
for bulk in bulks
371+
),
372+
return_exceptions=True,
373+
)
375374

376-
for bulk, bulk_result in zip(bulks, bulk_results):
377-
if isinstance(bulk_result, httpx.HTTPStatusError) or isinstance(
378-
bulk_result, Exception
375+
for bulk, bulk_result in zip(bulks, bulk_results):
376+
if isinstance(bulk_result, httpx.HTTPStatusError) or isinstance(
377+
bulk_result, Exception
378+
):
379+
if should_raise:
380+
raise bulk_result
381+
# If should_raise is False, retry batch in sequential order as a fallback only for 413 errors
382+
if (
383+
isinstance(bulk_result, httpx.HTTPStatusError)
384+
and bulk_result.response.status_code == 413
379385
):
380-
if should_raise:
381-
raise bulk_result
382-
# If should_raise is False, retry batch in sequential order as a fallback only for 413 errors
386+
individual_upsert_results = (
387+
await self._upsert_entities_batch_individually(
388+
bulk, request_options, user_agent_type, should_raise
389+
)
390+
)
391+
entities_results.extend(individual_upsert_results)
392+
else:
393+
# For other errors, mark all entities in the batch as failed
394+
for entity in bulk:
395+
failed_result: tuple[bool, Entity] = (
396+
False,
397+
self._reduce_entity(entity),
398+
)
399+
entities_results.append(failed_result)
400+
elif isinstance(bulk_result, list):
401+
for status, entity in bulk_result:
383402
if (
384-
isinstance(bulk_result, httpx.HTTPStatusError)
385-
and bulk_result.response.status_code == 413
386-
):
387-
individual_upsert_results = (
388-
await self._upsert_entities_batch_individually(
389-
bulk, request_options, user_agent_type, should_raise
390-
)
403+
status is not None
404+
): # when using the search identifier we might not have an actual identifier
405+
bulk_result_tuple: tuple[bool, Entity] = (
406+
bool(status),
407+
entity,
391408
)
392-
entities_results.extend(individual_upsert_results)
393-
else:
394-
# For other errors, mark all entities in the batch as failed
395-
for entity in bulk:
396-
failed_result: tuple[bool, Entity] = (
397-
False,
398-
self._reduce_entity(entity),
399-
)
400-
entities_results.append(failed_result)
401-
elif isinstance(bulk_result, list):
402-
for status, entity in bulk_result:
403-
if (
404-
status is not None
405-
): # when using the search identifier we might not have an actual identifier
406-
bulk_result_tuple: tuple[bool, Entity] = (
407-
bool(status),
408-
entity,
409-
)
410-
entities_results.append(bulk_result_tuple)
411-
else:
412-
individual_upsert_results = await self._upsert_entities_batch_individually(
413-
entities, request_options, user_agent_type, should_raise
414-
)
415-
entities_results.extend(individual_upsert_results)
409+
entities_results.append(bulk_result_tuple)
416410

417411
return entities_results
418412

port_ocean/config/settings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
108108

109109
upsert_entities_batch_max_length: int = 20
110110
upsert_entities_batch_max_size_in_bytes: int = 1024 * 1024
111-
bulk_upserts_enabled: bool = False
112111

113112
@validator("process_execution_mode")
114113
def validate_process_execution_mode(

port_ocean/tests/core/handlers/mixins/test_sync_raw.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ async def test_sync_raw_mixin_self_dependency(
9898
) -> None:
9999
mock_ocean.config.upsert_entities_batch_max_length = 20
100100
mock_ocean.config.upsert_entities_batch_max_size_in_bytes = 1024 * 1024
101-
mock_ocean.config.bulk_upserts_enabled = True
102101

103102
entities_params = [
104103
("entity_1", "service", {"service": "entity_1"}, True),
@@ -219,7 +218,6 @@ async def test_sync_raw_mixin_circular_dependency(
219218
) -> None:
220219
mock_ocean.config.upsert_entities_batch_max_length = 20
221220
mock_ocean.config.upsert_entities_batch_max_size_in_bytes = 1024 * 1024
222-
mock_ocean.config.bulk_upserts_enabled = True
223221

224222
entities_params = [
225223
("entity_1", "service", {"service": "entity_2"}, True),
@@ -359,7 +357,6 @@ async def test_sync_raw_mixin_dependency(
359357
) -> None:
360358
mock_ocean.config.upsert_entities_batch_max_length = 20
361359
mock_ocean.config.upsert_entities_batch_max_size_in_bytes = 1024 * 1024
362-
mock_ocean.config.bulk_upserts_enabled = True
363360

364361
entities_params = [
365362
("entity_1", "service", {"service": "entity_3"}, True),

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "port-ocean"
3-
version = "0.24.2"
3+
version = "0.24.3"
44
description = "Port Ocean is a CLI tool for managing your Port projects."
55
readme = "README.md"
66
homepage = "https://app.getport.io"

0 commit comments

Comments
 (0)