Skip to content

Commit 139a7a8

Browse files
authored
[Core] Added ingest url support (#1972)
### **User description** # Description What - Added ingest url support Why - To be able to send data to the ingest url How - new env variable ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ___ ### **PR Type** Enhancement ___ ### **Description** - Added ingest URL support for data ingestion - Modified Port client to accept ingest URL parameter - Updated raw data posting to use ingest endpoint - Added ingest URL configuration with default value ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Configuration"] -- "adds ingest_url" --> B["Port Settings"] B -- "passes to" --> C["Port Client"] C -- "initializes" --> D["Port Authentication"] D -- "provides URL for" --> E["Raw Data Ingestion"] ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>authentication.py</strong><dd><code>Add ingest URL parameter to authentication</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/clients/port/authentication.py <ul><li>Added <code>ingest_url</code> parameter to constructor<br> <li> Stored ingest URL as instance variable</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-8f8323c5413065b6eb1248e6236a4cf41985610d82b71b405b3a72ab35b44fda">+2/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>client.py</strong><dd><code>Update PortClient to support ingest URL</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/clients/port/client.py <ul><li>Added <code>ingest_url</code> parameter to PortClient constructor<br> <li> Passed ingest URL to PortAuthentication initialization<br> <li> Minor import reordering</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-ad4a0acea99647c007146b6c19e22ecd670838144cf5c58f3b08a0996190b82c">+5/-2</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>integrations.py</strong><dd><code>Use ingest URL for raw data posting</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/clients/port/mixins/integrations.py <ul><li>Changed raw data POST endpoint to use <code>ingest_url</code><br> <li> Updated logging from debug to info level</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-6fa1fdcc86d6cf46d57bc513204cd5ffeafede20d1a079786705644c4937a521">+3/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>sync_raw.py</strong><dd><code>Add logging for lakehouse feature flags</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/core/integrations/mixins/sync_raw.py - Added info logging for lakehouse data flags </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-7087c0350a44c64c6ca1e4333b34e158b354bdd15932e51860efaddda535cddc">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>ocean.py</strong><dd><code>Initialize PortClient with ingest URL</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/ocean.py <ul><li>Passed ingest URL to PortClient initialization<br> <li> Minor import reordering and formatting</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-c8b6a91b406bdc777f220ab2fcead0b5027c8817c7c2e2e7adb06c192cc95ad6">+11/-12</a>&nbsp; </td> </tr> </table></td></tr><tr><td><strong>Configuration changes</strong></td><td><table> <tr> <td> <details> <summary><strong>settings.py</strong><dd><code>Add ingest URL configuration setting</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> port_ocean/config/settings.py <ul><li>Added <code>ingest_url</code> field to PortSettings<br> <li> Set default value to "https://ingest.getport.io"<br> <li> Minor import cleanup</ul> </details> </td> <td><a href="https://github.com/port-labs/ocean/pull/1972/files#diff-c4ab144128bdbbe50dd95b726ae9c2f4cbc7306025e07a9e1db3dff7c5d4b3bb">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr></tr></tbody></table> </details> ___
1 parent 9b6b56c commit 139a7a8

File tree

11 files changed

+90
-58
lines changed

11 files changed

+90
-58
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
<!-- towncrier release notes start -->
9+
## 0.27.10 (2025-08-24)
10+
11+
### Improvements
12+
13+
- Added ingest url support
14+
915
## 0.27.9 (2025-08-20)
1016

1117
### Improvements

port_ocean/clients/port/authentication.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
integration_identifier: str,
3636
integration_type: str,
3737
integration_version: str,
38+
ingest_url: str,
3839
):
3940
self.client = client
4041
self.api_url = api_url
@@ -43,6 +44,7 @@ def __init__(
4344
self.integration_identifier = integration_identifier
4445
self.integration_type = integration_type
4546
self.integration_version = integration_version
47+
self.ingest_url = ingest_url
4648
self.last_token_object: TokenResponse | None = None
4749

4850
async def _get_token(self, client_id: str, client_secret: str) -> TokenResponse:

port_ocean/clients/port/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
from loguru import logger
24

35
from port_ocean.clients.port.authentication import PortAuthentication
@@ -10,11 +12,10 @@
1012
KafkaCreds,
1113
)
1214
from port_ocean.clients.port.utils import (
13-
handle_port_status_code,
1415
get_internal_http_client,
16+
handle_port_status_code,
1517
)
1618
from port_ocean.exceptions.clients import KafkaCredentialsNotFound
17-
from typing import Any
1819

1920

2021
class PortClient(
@@ -32,6 +33,7 @@ def __init__(
3233
integration_identifier: str,
3334
integration_type: str,
3435
integration_version: str,
36+
ingest_url: str,
3537
):
3638
self.api_url = f"{base_url}/v1"
3739
self.client = get_internal_http_client(self)
@@ -43,6 +45,7 @@ def __init__(
4345
integration_identifier,
4446
integration_type,
4547
integration_version,
48+
ingest_url,
4649
)
4750
EntityClientMixin.__init__(self, self.auth, self.client)
4851
IntegrationClientMixin.__init__(

port_ocean/clients/port/mixins/integrations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ async def post_integration_raw_data(
296296
logger.debug("starting POST raw data request", raw_data=raw_data)
297297
headers = await self.auth.headers()
298298
response = await self.client.post(
299-
f"{self.auth.api_url}/lakehouse/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items",
299+
f"{self.auth.ingest_url}/lakehouse/integration-type/{self.auth.integration_type}/integration/{self.integration_identifier}/sync/{sync_id}/kind/{kind}/items",
300300
headers=headers,
301301
json={
302302
"items": raw_data,

port_ocean/config/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from port_ocean.config.base import BaseOceanModel, BaseOceanSettings
1111
from port_ocean.core.event_listener import EventListenerSettingsType
12-
1312
from port_ocean.core.models import (
1413
CachingStorageMode,
1514
CreatePortResourcesOrigin,
@@ -47,6 +46,7 @@ class PortSettings(BaseOceanModel, extra=Extra.allow):
4746
client_secret: str = Field(..., sensitive=True)
4847
base_url: AnyHttpUrl = parse_obj_as(AnyHttpUrl, "https://api.getport.io")
4948
port_app_config_cache_ttl: int = 60
49+
ingest_url: AnyHttpUrl = parse_obj_as(AnyHttpUrl, "https://ingest.getport.io")
5050

5151

5252
class IntegrationSettings(BaseOceanModel, extra=Extra.allow):

port_ocean/ocean.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
import asyncio
22
import sys
3-
from contextlib import asynccontextmanager
43
import threading
4+
from contextlib import asynccontextmanager
55
from typing import Any, AsyncIterator, Callable, Dict, Type
66

7-
from port_ocean.cache.base import CacheProvider
8-
from port_ocean.cache.disk import DiskCacheProvider
9-
from port_ocean.cache.memory import InMemoryCacheProvider
10-
from port_ocean.core.models import ProcessExecutionMode
11-
import port_ocean.helpers.metric.metric
12-
13-
from fastapi import FastAPI, APIRouter
14-
7+
from fastapi import APIRouter, FastAPI
158
from loguru import logger
169
from pydantic import BaseModel
1710
from starlette.types import Receive, Scope, Send
1811

12+
import port_ocean.helpers.metric.metric
13+
from port_ocean.cache.base import CacheProvider
14+
from port_ocean.cache.disk import DiskCacheProvider
15+
from port_ocean.cache.memory import InMemoryCacheProvider
1916
from port_ocean.clients.port.client import PortClient
2017
from port_ocean.config.settings import (
2118
IntegrationConfiguration,
@@ -26,16 +23,17 @@
2623
ocean,
2724
)
2825
from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater
26+
from port_ocean.core.handlers.webhook.processor_manager import (
27+
LiveEventsProcessorManager,
28+
)
2929
from port_ocean.core.integrations.base import BaseIntegration
30+
from port_ocean.core.models import ProcessExecutionMode
3031
from port_ocean.log.sensetive import sensitive_log_filter
3132
from port_ocean.middlewares import request_handler
3233
from port_ocean.utils.misc import IntegrationStateStatus
3334
from port_ocean.utils.repeat import repeat_every
3435
from port_ocean.utils.signal import signal_handler
3536
from port_ocean.version import __integration_version__
36-
from port_ocean.core.handlers.webhook.processor_manager import (
37-
LiveEventsProcessorManager,
38-
)
3937

4038

4139
class Ocean:
@@ -69,6 +67,7 @@ def __init__(
6967
integration_identifier=self.config.integration.identifier,
7068
integration_type=self.config.integration.type,
7169
integration_version=__integration_version__,
70+
ingest_url=self.config.port.ingest_url,
7271
)
7372
self.cache_provider: CacheProvider = self._get_caching_provider()
7473
self.process_execution_mode: ProcessExecutionMode = (

port_ocean/tests/core/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,13 @@ async def post(url: str, *args: Any, **kwargs: Any) -> Response:
9696
@pytest.fixture
9797
def mock_port_client(mock_http_client: MagicMock) -> PortClient:
9898
mock_port_client = PortClient(
99-
MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock()
99+
MagicMock(),
100+
MagicMock(),
101+
MagicMock(),
102+
MagicMock(),
103+
MagicMock(),
104+
MagicMock(),
105+
MagicMock(),
100106
)
101107
mock_port_client.auth = AsyncMock()
102108
mock_port_client.auth.headers = AsyncMock(

port_ocean/tests/core/handlers/mixins/test_live_events.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import Any
2-
from httpx import Response
3-
import pytest
42
from unittest.mock import AsyncMock, MagicMock, patch
3+
4+
import pytest
5+
from httpx import Response
6+
57
from port_ocean.clients.port.client import PortClient
68
from port_ocean.clients.port.types import UserAgentType
79
from port_ocean.context.ocean import PortOceanContext
@@ -11,8 +13,6 @@
1113
from port_ocean.core.handlers.entity_processor.jq_entity_processor import (
1214
JQEntityProcessor,
1315
)
14-
from port_ocean.core.handlers.webhook.webhook_event import WebhookEventRawResults
15-
from port_ocean.core.integrations.mixins.live_events import LiveEventsMixin
1616
from port_ocean.core.handlers.port_app_config.models import (
1717
EntityMapping,
1818
MappingsConfig,
@@ -21,6 +21,8 @@
2121
ResourceConfig,
2222
Selector,
2323
)
24+
from port_ocean.core.handlers.webhook.webhook_event import WebhookEventRawResults
25+
from port_ocean.core.integrations.mixins.live_events import LiveEventsMixin
2426
from port_ocean.core.models import Entity
2527
from port_ocean.core.ocean_types import CalculationResult, EntitySelectorDiff
2628
from port_ocean.ocean import Ocean
@@ -253,7 +255,13 @@ def mock_port_app_config_handler(
253255
@pytest.fixture
254256
def mock_port_client(mock_http_client: MagicMock) -> PortClient:
255257
mock_port_client = PortClient(
256-
MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock()
258+
MagicMock(),
259+
MagicMock(),
260+
MagicMock(),
261+
MagicMock(),
262+
MagicMock(),
263+
MagicMock(),
264+
MagicMock(),
257265
)
258266
mock_port_client.auth = AsyncMock()
259267
mock_port_client.auth.headers = AsyncMock(
@@ -340,10 +348,11 @@ async def test_parse_raw_event_results_to_entities_creation(
340348
calculation_result
341349
)
342350

343-
entities_to_create, entities_to_delete = (
344-
await mock_live_events_mixin._parse_raw_event_results_to_entities(
345-
[one_webhook_event_raw_results_for_creation]
346-
)
351+
(
352+
entities_to_create,
353+
entities_to_delete,
354+
) = await mock_live_events_mixin._parse_raw_event_results_to_entities(
355+
[one_webhook_event_raw_results_for_creation]
347356
)
348357

349358
assert entities_to_create == [entity]
@@ -367,10 +376,11 @@ async def test_parse_raw_event_results_to_entities_deletion(
367376
calculation_result
368377
)
369378

370-
entities_to_create, entities_to_delete = (
371-
await mock_live_events_mixin._parse_raw_event_results_to_entities(
372-
[one_webhook_event_raw_results_for_deletion]
373-
)
379+
(
380+
entities_to_create,
381+
entities_to_delete,
382+
) = await mock_live_events_mixin._parse_raw_event_results_to_entities(
383+
[one_webhook_event_raw_results_for_deletion]
374384
)
375385

376386
assert entities_to_create == []

port_ocean/tests/core/handlers/webhook/test_processor_manager.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,16 @@
1-
import pytest
2-
from port_ocean.core.handlers.webhook.processor_manager import (
3-
LiveEventsProcessorManager,
4-
)
5-
from port_ocean.core.handlers.webhook.abstract_webhook_processor import (
6-
AbstractWebhookProcessor,
7-
)
8-
from port_ocean.core.handlers.webhook.webhook_event import (
9-
EventHeaders,
10-
WebhookEvent,
11-
WebhookEventRawResults,
12-
EventPayload,
13-
)
14-
from fastapi import APIRouter
15-
from port_ocean.core.integrations.mixins.handler import HandlerMixin
16-
from port_ocean.utils.signal import SignalHandler
17-
from typing import Dict, Any
181
import asyncio
2+
from typing import Any, Dict
3+
from unittest.mock import AsyncMock, MagicMock, patch
4+
5+
import pytest
6+
from fastapi import APIRouter, FastAPI
197
from fastapi.testclient import TestClient
20-
from fastapi import FastAPI
21-
from port_ocean.context.ocean import PortOceanContext
22-
from unittest.mock import AsyncMock
23-
from port_ocean.context.event import EventContext, event_context, EventType
24-
from port_ocean.context.ocean import ocean
25-
from unittest.mock import MagicMock, patch
268
from httpx import Response
27-
from port_ocean.clients.port.client import PortClient
9+
2810
from port_ocean import Ocean
29-
from port_ocean.core.integrations.base import BaseIntegration
11+
from port_ocean.clients.port.client import PortClient
12+
from port_ocean.context.event import EventContext, EventType, event_context
13+
from port_ocean.context.ocean import PortOceanContext, ocean
3014
from port_ocean.core.handlers.port_app_config.models import (
3115
EntityMapping,
3216
MappingsConfig,
@@ -35,13 +19,28 @@
3519
ResourceConfig,
3620
Selector,
3721
)
22+
from port_ocean.core.handlers.queue import LocalQueue
23+
from port_ocean.core.handlers.webhook.abstract_webhook_processor import (
24+
AbstractWebhookProcessor,
25+
)
26+
from port_ocean.core.handlers.webhook.processor_manager import (
27+
LiveEventsProcessorManager,
28+
)
29+
from port_ocean.core.handlers.webhook.webhook_event import (
30+
EventHeaders,
31+
EventPayload,
32+
WebhookEvent,
33+
WebhookEventRawResults,
34+
)
35+
from port_ocean.core.integrations.base import BaseIntegration
36+
from port_ocean.core.integrations.mixins.handler import HandlerMixin
3837
from port_ocean.core.integrations.mixins.live_events import LiveEventsMixin
3938
from port_ocean.core.models import Entity
4039
from port_ocean.exceptions.webhook_processor import (
4140
RetryableError,
4241
WebhookEventNotSupportedError,
4342
)
44-
from port_ocean.core.handlers.queue import LocalQueue
43+
from port_ocean.utils.signal import SignalHandler
4544

4645

4746
class MockProcessor(AbstractWebhookProcessor):
@@ -275,7 +274,13 @@ async def post(url: str, *args: Any, **kwargs: Any) -> Response:
275274
@pytest.fixture
276275
def mock_port_client(mock_http_client: MagicMock) -> PortClient:
277276
mock_port_client = PortClient(
278-
MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock()
277+
MagicMock(),
278+
MagicMock(),
279+
MagicMock(),
280+
MagicMock(),
281+
MagicMock(),
282+
MagicMock(),
283+
MagicMock(),
279284
)
280285
mock_port_client.auth = AsyncMock()
281286
mock_port_client.auth.headers = AsyncMock(

port_ocean/tests/helpers/port_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ def get_port_client_for_integration(
1818
integration_identifier=integration_identifier,
1919
integration_type=integration_type,
2020
integration_version=integration_version,
21+
ingest_url="https://ingest.getport.io",
2122
)

0 commit comments

Comments
 (0)