Skip to content

Commit 4602090

Browse files
authored
[Core] Integrations monitoring data structure (#1624)
# Description What - 1. updated the current event we send with the metrics today. 2. updated the places we send metrics in example json: ``` { { "integration_type":"gitlab", "integration_identifier":"gitlab-local", "integration_version":"0.2.62", "ocean_version":"0.22.5", "kind_identifier":"group-with-members-0", "kind":"group-with-members", "event_id":"bf1926ab-ba16-468c-850a-0380530a06a8", "sync_state":"data ingested", "_id":"integration_RFSJKG22HJKGDF", "metrics":{ "phase":{ "resync":{ "duration_seconds":8.380515999999943, "success":1.0 }, "load":{ "object_count_type":{ "skipped":{ "object_count":0.0 }, "failed":{ "object_count":2.0 }, "loaded":{ "object_count":0.0 } } }, "extract":{ "object_count_type":{ "raw_extracted":{ "object_count":2.0 } } }, "transform":{ "object_count_type":{ "transformed":{ "object_count":2.0 }, "filtered_out":{ "object_count":0.0 }, "failed":{ "object_count":0.0 } } } } } } ``` Why - in order to support showing integration visibility in the UI of port ## Type of change Please leave one option from the following and delete the rest: - [x] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector
1 parent 1f51ea6 commit 4602090

File tree

7 files changed

+184
-25
lines changed

7 files changed

+184
-25
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
<!-- towncrier release notes start -->
9+
## 0.22.9 (2025-05-18)
10+
11+
### Improvements
12+
- Enhanced Ocean metrics event structure for better data organization and analysis
13+
- Expanded metrics collection points to provide more comprehensive monitoring capabilities
14+
915
## 0.22.8 (2025-05-15)
1016

1117
### Improvements

port_ocean/clients/port/mixins/entities.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import httpx
77
from loguru import logger
8-
8+
from port_ocean.context.ocean import ocean
99
from port_ocean.clients.port.authentication import PortAuthentication
1010
from port_ocean.clients.port.types import RequestOptions, UserAgentType
1111
from port_ocean.clients.port.utils import (
@@ -15,6 +15,8 @@
1515
from port_ocean.core.models import Entity, PortAPIErrorMessage
1616
from starlette import status
1717

18+
from port_ocean.helpers.metric.metric import MetricPhase, MetricType
19+
1820

1921
class EntityClientMixin:
2022
def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
@@ -81,6 +83,15 @@ async def upsert_entity(
8183
f"blueprint: {entity.blueprint}"
8284
)
8385
result = response.json()
86+
ocean.metrics.inc_metric(
87+
name=MetricType.OBJECT_COUNT_NAME,
88+
labels=[
89+
ocean.metrics.current_resource_kind(),
90+
MetricPhase.LOAD,
91+
MetricPhase.LoadResult.FAILED,
92+
],
93+
value=1,
94+
)
8495

8596
if (
8697
response.status_code == status.HTTP_404_NOT_FOUND
@@ -89,6 +100,17 @@ async def upsert_entity(
89100
):
90101
# Return false to differentiate from `result_entity.is_using_search_identifier`
91102
return False
103+
else:
104+
ocean.metrics.inc_metric(
105+
name=MetricType.OBJECT_COUNT_NAME,
106+
labels=[
107+
ocean.metrics.current_resource_kind(),
108+
MetricPhase.LOAD,
109+
MetricPhase.LoadResult.LOADED,
110+
],
111+
value=1,
112+
)
113+
92114
handle_port_status_code(response, should_raise)
93115
result = response.json()
94116

port_ocean/core/handlers/entities_state_applier/port/applier.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def delete_diff(
103103
and deletion_rate <= entity_deletion_threshold
104104
):
105105
await self._safe_delete(diff.deleted, kept_entities, user_agent_type)
106-
ocean.metrics.set_metric(
106+
ocean.metrics.inc_metric(
107107
name=MetricType.DELETION_COUNT_NAME,
108108
labels=[ocean.metrics.current_resource_kind(), MetricPhase.DELETE],
109109
value=len(diff.deleted),

port_ocean/core/integrations/mixins/sync_raw.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from port_ocean.core.utils.utils import resolve_entities_diff, zip_and_sum, gather_and_split_errors_from_results
3333
from port_ocean.exceptions.core import OceanAbortException
34-
from port_ocean.helpers.metric.metric import MetricType, MetricPhase
34+
from port_ocean.helpers.metric.metric import SyncState, MetricType, MetricPhase
3535
from port_ocean.helpers.metric.utils import TimeMetric
3636

3737
SEND_RAW_DATA_EXAMPLES_AMOUNT = 5
@@ -238,11 +238,21 @@ async def _register_resource_raw(
238238
if changed_entities:
239239
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
240240
total_entities=len(objects_diff[0].entity_selector_diff.passed))
241+
ocean.metrics.inc_metric(
242+
name=MetricType.OBJECT_COUNT_NAME,
243+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.LOAD, MetricPhase.LoadResult.SKIPPED],
244+
value=len(objects_diff[0].entity_selector_diff.passed) - len(changed_entities)
245+
)
241246
await self.entities_state_applier.upsert(
242247
changed_entities, user_agent_type
243248
)
244249
else:
245250
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
251+
ocean.metrics.inc_metric(
252+
name=MetricType.OBJECT_COUNT_NAME,
253+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.LOAD, MetricPhase.LoadResult.SKIPPED],
254+
value=len(objects_diff[0].entity_selector_diff.passed)
255+
)
246256
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
247257
except Exception as e:
248258
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
@@ -335,6 +345,7 @@ async def _register_in_batches(
335345
passed_entities.extend(calculation_result.entity_selector_diff.passed)
336346
number_of_transformed_entities += calculation_result.number_of_transformed_entities
337347
except* OceanAbortException as error:
348+
ocean.metrics.sync_state = SyncState.FAILED
338349
errors.append(error)
339350

340351
logger.info(
@@ -347,18 +358,30 @@ async def _register_in_batches(
347358
value=int(not errors)
348359
)
349360

350-
ocean.metrics.set_metric(
361+
ocean.metrics.inc_metric(
351362
name=MetricType.OBJECT_COUNT_NAME,
352-
labels=[ocean.metrics.current_resource_kind(), MetricPhase.EXTRACT],
363+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.EXTRACT , MetricPhase.ExtractResult.EXTRACTED],
353364
value=number_of_raw_results
354365
)
355366

356-
ocean.metrics.set_metric(
367+
ocean.metrics.inc_metric(
357368
name=MetricType.OBJECT_COUNT_NAME,
358-
labels=[ocean.metrics.current_resource_kind(), MetricPhase.TRANSFORM],
369+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.TRANSFORM , MetricPhase.TransformResult.TRANSFORMED],
359370
value=number_of_transformed_entities
360371
)
361372

373+
ocean.metrics.inc_metric(
374+
name=MetricType.OBJECT_COUNT_NAME,
375+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.TRANSFORM , MetricPhase.TransformResult.FILTERED_OUT],
376+
value=number_of_raw_results -number_of_transformed_entities
377+
)
378+
379+
ocean.metrics.inc_metric(
380+
name=MetricType.OBJECT_COUNT_NAME,
381+
labels=[ocean.metrics.current_resource_kind(), MetricPhase.TRANSFORM , MetricPhase.TransformResult.FAILED],
382+
value=len(errors)
383+
)
384+
362385
return passed_entities, errors
363386

364387
async def register_raw(
@@ -558,7 +581,6 @@ async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->
558581
for entity in event.entity_topological_sorter.get_entities(False):
559582
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)
560583

561-
562584
@TimeMetric(MetricPhase.RESYNC)
563585
async def sync_raw_all(
564586
self,
@@ -584,12 +606,16 @@ async def sync_raw_all(
584606
EventType.RESYNC,
585607
trigger_type=trigger_type,
586608
):
609+
ocean.metrics.event_id = event.id
610+
587611
# If a resync is triggered due to a mappings change, we want to make sure that we have the updated version
588612
# rather than the old cache
589613
app_config = await self.port_app_config_handler.get_port_app_config(
590614
use_cache=False
591615
)
592616
logger.info(f"Resync will use the following mappings: {app_config.dict()}")
617+
ocean.metrics.initialize_metrics([f"{resource.kind}-{index}" for index, resource in enumerate(app_config.resources)])
618+
await ocean.metrics.flush()
593619

594620
# Execute resync_start hooks
595621
for resync_start_fn in self.event_strategy["resync_start"]:
@@ -616,20 +642,19 @@ async def sync_raw_all(
616642
# config as we might have multiple resources in the same event
617643
async with resource_context(resource,index):
618644
resource_kind_id = f"{resource.kind}-{index}"
645+
ocean.metrics.sync_state = SyncState.SYNCING
646+
await ocean.metrics.flush(kind=resource_kind_id)
647+
619648
task = asyncio.create_task(
620649
self._register_in_batches(resource, user_agent_type)
621650
)
622651

623652
event.on_abort(lambda: task.cancel())
624653
kind_results: tuple[list[Entity], list[Exception]] = await task
625-
ocean.metrics.set_metric(
626-
name=MetricType.OBJECT_COUNT_NAME,
627-
labels=[ocean.metrics.current_resource_kind(), MetricPhase.LOAD],
628-
value=len(kind_results[0])
629-
)
630654

631655
creation_results.append(kind_results)
632-
656+
if ocean.metrics.sync_state != SyncState.FAILED:
657+
ocean.metrics.sync_state = SyncState.COMPLETED
633658
await ocean.metrics.flush(kind=resource_kind_id)
634659

635660
await self.sort_and_upsert_failed_entities(user_agent_type)

port_ocean/helpers/metric/metric.py

Lines changed: 115 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ class MetricPhase:
2222
RESYNC = "resync"
2323
DELETE = "delete"
2424

25+
class TransformResult:
26+
TRANSFORMED = "transformed"
27+
FILTERED_OUT = "filtered_out"
28+
FAILED = "failed"
29+
30+
class LoadResult:
31+
LOADED = "loaded"
32+
FAILED = "failed"
33+
SKIPPED = "skipped"
34+
35+
class ExtractResult:
36+
EXTRACTED = "raw_extracted"
37+
2538

2639
class MetricType:
2740
# Define metric names as constants
@@ -33,6 +46,13 @@ class MetricType:
3346
DELETION_COUNT_NAME = "deletion_count"
3447

3548

49+
class SyncState:
50+
SYNCING = "syncing"
51+
COMPLETED = "completed"
52+
QUEUED = "queued"
53+
FAILED = "failed"
54+
55+
3656
# Registry for core and custom metrics
3757
_metrics_registry: Dict[str, Tuple[str, str, List[str]]] = {
3858
MetricType.DURATION_NAME: (
@@ -43,7 +63,7 @@ class MetricType:
4363
MetricType.OBJECT_COUNT_NAME: (
4464
MetricType.OBJECT_COUNT_NAME,
4565
"object_count description",
46-
["kind", "phase"],
66+
["kind", "phase", "object_count_type"],
4767
),
4868
MetricType.ERROR_COUNT_NAME: (
4969
MetricType.ERROR_COUNT_NAME,
@@ -86,6 +106,9 @@ def set(self, *args: Any) -> None:
86106
def labels(self, *args: Any) -> None:
87107
return None
88108

109+
def inc(self, *args: Any) -> None:
110+
return None
111+
89112

90113
class Metrics:
91114
def __init__(
@@ -100,6 +123,24 @@ def __init__(
100123
self.load_metrics()
101124
self._integration_version: Optional[str] = None
102125
self._ocean_version: Optional[str] = None
126+
self.event_id = ""
127+
self.sync_state = SyncState.QUEUED
128+
129+
@property
130+
def event_id(self) -> str:
131+
return self._event_id
132+
133+
@event_id.setter
134+
def event_id(self, value: str) -> None:
135+
self._event_id = value
136+
137+
@property
138+
def sync_state(self) -> str:
139+
return self._sync_state
140+
141+
@sync_state.setter
142+
def sync_state(self, value: str) -> None:
143+
self._sync_state = value
103144

104145
@property
105146
def integration_version(self) -> str:
@@ -139,6 +180,19 @@ def get_metric(self, name: str, labels: list[str]) -> Gauge | EmptyMetric:
139180
return EmptyMetric()
140181
return metrics.labels(*labels)
141182

183+
def inc_metric(self, name: str, labels: list[str], value: float) -> None:
184+
"""Increment a metric value in a single method call.
185+
186+
Args:
187+
name (str): The metric name to inc.
188+
labels (list[str]): The labels to apply to the metric.
189+
value (float): The value to inc.
190+
"""
191+
if not self.enabled:
192+
return None
193+
194+
self.get_metric(name, labels).inc(value)
195+
142196
def set_metric(self, name: str, labels: list[str], value: float) -> None:
143197
"""Set a metric value in a single method call.
144198
@@ -152,6 +206,49 @@ def set_metric(self, name: str, labels: list[str], value: float) -> None:
152206

153207
self.get_metric(name, labels).set(value)
154208

209+
def initialize_metrics(self, kind_blockes: list[str]) -> None:
210+
for kind in kind_blockes:
211+
self.set_metric(MetricType.SUCCESS_NAME, [kind, MetricPhase.RESYNC], 0)
212+
self.set_metric(MetricType.DURATION_NAME, [kind, MetricPhase.RESYNC], 0)
213+
214+
self.set_metric(
215+
MetricType.OBJECT_COUNT_NAME,
216+
[kind, MetricPhase.EXTRACT, MetricPhase.ExtractResult.EXTRACTED],
217+
0,
218+
)
219+
220+
self.set_metric(
221+
MetricType.OBJECT_COUNT_NAME,
222+
[kind, MetricPhase.TRANSFORM, MetricPhase.TransformResult.TRANSFORMED],
223+
0,
224+
)
225+
self.set_metric(
226+
MetricType.OBJECT_COUNT_NAME,
227+
[kind, MetricPhase.TRANSFORM, MetricPhase.TransformResult.FILTERED_OUT],
228+
0,
229+
)
230+
self.set_metric(
231+
MetricType.OBJECT_COUNT_NAME,
232+
[kind, MetricPhase.TRANSFORM, MetricPhase.TransformResult.FAILED],
233+
0,
234+
)
235+
236+
self.set_metric(
237+
MetricType.OBJECT_COUNT_NAME,
238+
[kind, MetricPhase.LOAD, MetricPhase.LoadResult.LOADED],
239+
0,
240+
)
241+
self.set_metric(
242+
MetricType.OBJECT_COUNT_NAME,
243+
[kind, MetricPhase.LOAD, MetricPhase.LoadResult.FAILED],
244+
0,
245+
)
246+
self.set_metric(
247+
MetricType.OBJECT_COUNT_NAME,
248+
[kind, MetricPhase.LOAD, MetricPhase.LoadResult.SKIPPED],
249+
0,
250+
)
251+
155252
def create_mertic_router(self) -> APIRouter:
156253
if not self.enabled:
157254
return APIRouter()
@@ -201,14 +298,21 @@ async def flush(
201298
if kind and sample.labels.get("kind") != kind:
202299
continue
203300

204-
# Create nested dictionary structure based on labels
205-
for key, value in sample.labels.items():
206-
if key not in current_level:
207-
current_level[key] = {}
208-
current_level = current_level[key]
209-
if value not in current_level:
210-
current_level[value] = {}
211-
current_level = current_level[value]
301+
# Get the ordered labels from the registry
302+
ordered_labels = _metrics_registry.get(
303+
sample.name, (None, None, [])
304+
)[2]
305+
306+
# Create nested dictionary structure based on ordered labels
307+
for label_name in ordered_labels:
308+
if label_name in sample.labels:
309+
value = sample.labels[label_name]
310+
if label_name not in current_level:
311+
current_level[label_name] = {}
312+
current_level = current_level[label_name]
313+
if value not in current_level:
314+
current_level[value] = {}
315+
current_level = current_level[value]
212316

213317
current_level[sample.name] = sample.value
214318

@@ -228,6 +332,8 @@ async def flush(
228332
"ocean_version": self.ocean_version,
229333
"kind_identifier": kind_key,
230334
"kind": "-".join(kind_key.split("-")[:-1]),
335+
"event_id": self.event_id,
336+
"sync_state": self.sync_state,
231337
"metrics": metrics,
232338
}
233339
logger.info(f"Sending metrics to webhook {kind_key}: {event}")

port_ocean/helpers/metric/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any:
1717
res = await func(*args, **kwargs)
1818
end = time.monotonic()
1919
duration = end - start
20-
ocean.metrics.set_metric(
20+
ocean.metrics.inc_metric(
2121
name=MetricType.DURATION_NAME,
2222
labels=[ocean.metrics.current_resource_kind(), phase],
2323
value=duration,

0 commit comments

Comments
 (0)