9
9
from loguru import logger
10
10
from port_ocean .clients .port .types import UserAgentType
11
11
from port_ocean .context .event import TriggerType , event_context , EventType , event
12
+ from port_ocean .context .metric_resource import metric_resource_context
12
13
from port_ocean .context .ocean import ocean
13
14
from port_ocean .context .resource import resource_context
14
15
from port_ocean .context import resource
33
34
)
34
35
from port_ocean .core .utils .utils import resolve_entities_diff , zip_and_sum , gather_and_split_errors_from_results
35
36
from port_ocean .exceptions .core import OceanAbortException
36
- from port_ocean .helpers .metric .metric import SyncState , MetricType , MetricPhase
37
- from port_ocean .helpers .metric .utils import TimeMetric
37
+ from port_ocean .helpers .metric .metric import MetricResourceKind , SyncState , MetricType , MetricPhase
38
+ from port_ocean .helpers .metric .utils import TimeMetric , TimeMetricWithResourceKind
38
39
from port_ocean .utils .ipc import FileIPC
39
40
40
41
SEND_RAW_DATA_EXAMPLES_AMOUNT = 5
@@ -249,9 +250,16 @@ async def _register_resource_raw(
249
250
labels = [ocean .metrics .current_resource_kind (), MetricPhase .LOAD , MetricPhase .LoadResult .SKIPPED ],
250
251
value = len (objects_diff [0 ].entity_selector_diff .passed ) - len (changed_entities )
251
252
)
252
- await self .entities_state_applier .upsert (
253
+ upserted_entities = await self .entities_state_applier .upsert (
253
254
changed_entities , user_agent_type
254
255
)
256
+
257
+ ocean .metrics .set_metric (
258
+ name = MetricType .OBJECT_COUNT_NAME ,
259
+ labels = [ocean .metrics .current_resource_kind (), MetricPhase .LOAD , MetricPhase .LoadResult .LOADED ],
260
+ value = len (upserted_entities )
261
+ )
262
+
255
263
else :
256
264
logger .info ("Entities in batch didn't changed since last sync, skipping" , total_entities = len (objects_diff [0 ].entity_selector_diff .passed ))
257
265
ocean .metrics .inc_metric (
@@ -265,6 +273,11 @@ async def _register_resource_raw(
265
273
modified_objects = await self .entities_state_applier .upsert (
266
274
objects_diff [0 ].entity_selector_diff .passed , user_agent_type
267
275
)
276
+ ocean .metrics .set_metric (
277
+ name = MetricType .OBJECT_COUNT_NAME ,
278
+ labels = [ocean .metrics .current_resource_kind (), MetricPhase .LOAD , MetricPhase .LoadResult .LOADED ],
279
+ value = len (upserted_entities )
280
+ )
268
281
else :
269
282
modified_objects = await self .entities_state_applier .upsert (
270
283
objects_diff [0 ].entity_selector_diff .passed , user_agent_type
@@ -633,27 +646,101 @@ async def _process_resource(self,resource: ResourceConfig, index: int, user_agen
633
646
async with resource_context (resource ,index ):
634
647
resource_kind_id = f"{ resource .kind } -{ index } "
635
648
ocean .metrics .sync_state = SyncState .SYNCING
649
+
636
650
task = asyncio .create_task (
637
651
self ._register_in_batches (resource , user_agent_type )
638
652
)
639
653
event .on_abort (lambda : task .cancel ())
640
654
kind_results : tuple [list [Entity ], list [Exception ]] = await task
641
- ocean .metrics .set_metric (
642
- name = MetricType .OBJECT_COUNT_NAME ,
643
- labels = [ocean .metrics .current_resource_kind (), MetricPhase .LOAD , MetricPhase .LoadResult .LOADED ],
644
- value = len (kind_results [0 ])
645
- )
646
655
647
656
if ocean .metrics .sync_state != SyncState .FAILED :
648
657
ocean .metrics .sync_state = SyncState .COMPLETED
649
658
650
659
await ocean .metrics .send_metrics_to_webhook (
651
660
kind = resource_kind_id
652
661
)
653
- # await ocean.metrics.report_kind_sync_metrics(kind=resource_kind_id) # TODO: uncomment this when end points are ready
662
+ await ocean .metrics .report_kind_sync_metrics (kind = resource_kind_id , blueprint = resource . port . entity . mappings . blueprint )
654
663
655
664
return kind_results
656
665
666
+ @TimeMetricWithResourceKind (MetricPhase .RESYNC )
667
+ async def resync_reconciliation (
668
+ self ,
669
+ creation_results : list [tuple [list [Entity ], list [Exception ]]],
670
+ did_fetched_current_state : bool ,
671
+ user_agent_type : UserAgentType ,
672
+ app_config : Any ,
673
+ silent : bool = True ,
674
+ ) -> None :
675
+ """Handle the reconciliation phase of the resync process.
676
+
677
+ This method handles:
678
+ 1. Sorting and upserting failed entities
679
+ 2. Checking if current state was fetched
680
+ 3. Calculating resync diff
681
+ 4. Handling errors
682
+ 5. Deleting entities that are no longer needed
683
+ 6. Executing resync complete hooks
684
+
685
+ Args:
686
+ creation_results (list[tuple[list[Entity], list[Exception]]]): Results from entity creation
687
+ did_fetched_current_state (bool): Whether the current state was successfully fetched
688
+ user_agent_type (UserAgentType): The type of user agent
689
+ app_config (Any): The application configuration
690
+ silent (bool): Whether to raise exceptions or handle them silently
691
+
692
+ """
693
+ await self .sort_and_upsert_failed_entities (user_agent_type )
694
+
695
+ if not did_fetched_current_state :
696
+ logger .warning (
697
+ "Due to an error before the resync, the previous state of entities at Port is unknown."
698
+ " Skipping delete phase due to unknown initial state."
699
+ )
700
+ return False
701
+
702
+ logger .info ("Starting resync diff calculation" )
703
+ generated_entities , errors = zip_and_sum (creation_results ) or [
704
+ [],
705
+ [],
706
+ ]
707
+
708
+ if errors :
709
+ message = f"Resync failed with { len (errors )} errors, skipping delete phase due to incomplete state"
710
+ error_group = ExceptionGroup (
711
+ message ,
712
+ errors ,
713
+ )
714
+ if not silent :
715
+ raise error_group
716
+
717
+ logger .error (message , exc_info = error_group )
718
+ return False
719
+
720
+ logger .info (
721
+ f"Running resync diff calculation, number of entities created during sync: { len (generated_entities )} "
722
+ )
723
+ entities_at_port = await ocean .port_client .search_entities (
724
+ user_agent_type
725
+ )
726
+
727
+ await self .entities_state_applier .delete_diff (
728
+ {"before" : entities_at_port , "after" : generated_entities },
729
+ user_agent_type , app_config .get_entity_deletion_threshold ()
730
+ )
731
+
732
+ logger .info ("Resync finished successfully" )
733
+
734
+ # Execute resync_complete hooks
735
+ if "resync_complete" in self .event_strategy :
736
+ logger .info ("Executing resync_complete hooks" )
737
+
738
+ for resync_complete_fn in self .event_strategy ["resync_complete" ]:
739
+ await resync_complete_fn ()
740
+
741
+ logger .info ("Finished executing resync_complete hooks" )
742
+
743
+
657
744
@TimeMetric (MetricPhase .RESYNC )
658
745
async def sync_raw_all (
659
746
self ,
@@ -689,8 +776,9 @@ async def sync_raw_all(
689
776
logger .info (f"Resync will use the following mappings: { app_config .dict ()} " )
690
777
691
778
kinds = [f"{ resource .kind } -{ index } " for index , resource in enumerate (app_config .resources )]
779
+ blueprints = [resource .port .entity .mappings .blueprint for resource in app_config .resources ]
692
780
ocean .metrics .initialize_metrics (kinds )
693
- # await ocean.metrics.report_sync_metrics(kinds=kinds) # TODO: uncomment this when end points are ready
781
+ await ocean .metrics .report_sync_metrics (kinds = kinds , blueprints = blueprints )
694
782
695
783
# Clear cache
696
784
await ocean .app .cache_provider .clear ()
@@ -716,65 +804,20 @@ async def sync_raw_all(
716
804
multiprocessing .set_start_method ('fork' , True )
717
805
try :
718
806
for index ,resource in enumerate (app_config .resources ):
719
-
720
807
logger .info (f"Starting processing resource { resource .kind } with index { index } " )
721
-
722
808
creation_results .append (await self .process_resource (resource ,index ,user_agent_type ))
723
-
724
- await self .sort_and_upsert_failed_entities (user_agent_type )
725
-
726
809
except asyncio .CancelledError as e :
727
810
logger .warning ("Resync aborted successfully, skipping delete phase. This leads to an incomplete state" )
728
811
raise
729
812
else :
730
- if not did_fetched_current_state :
731
- logger .warning (
732
- "Due to an error before the resync, the previous state of entities at Port is unknown."
733
- " Skipping delete phase due to unknown initial state."
734
- )
735
- return
736
-
737
- logger .info ("Starting resync diff calculation" )
738
- generated_entities , errors = zip_and_sum (creation_results ) or [
739
- [],
740
- [],
741
- ]
742
-
743
- if errors :
744
- message = f"Resync failed with { len (errors )} errors, skipping delete phase due to incomplete state"
745
- error_group = ExceptionGroup (
746
- message ,
747
- errors ,
748
- )
749
- if not silent :
750
- raise error_group
751
-
752
- logger .error (message , exc_info = error_group )
753
- return False
754
- else :
755
- logger .info (
756
- f"Running resync diff calculation, number of entities created during sync: { len (generated_entities )} "
757
- )
758
- entities_at_port = await ocean .port_client .search_entities (
759
- user_agent_type
760
- )
761
- await self .entities_state_applier .delete_diff (
762
- {"before" : entities_at_port , "after" : generated_entities },
763
- user_agent_type , app_config .get_entity_deletion_threshold ()
764
- )
765
-
766
- logger .info ("Resync finished successfully" )
767
-
768
- # Execute resync_complete hooks
769
- if "resync_complete" in self .event_strategy :
770
- logger .info ("Executing resync_complete hooks" )
771
-
772
- for resync_complete_fn in self .event_strategy ["resync_complete" ]:
773
- await resync_complete_fn ()
774
-
775
- logger .info ("Finished executing resync_complete hooks" )
776
-
777
- return True
813
+ await self .resync_reconciliation (
814
+ creation_results ,
815
+ did_fetched_current_state ,
816
+ user_agent_type ,
817
+ app_config ,
818
+ silent
819
+ )
820
+ await ocean .metrics .report_sync_metrics (kinds = [MetricResourceKind .RECONCILIATION ])
778
821
finally :
779
822
await ocean .app .cache_provider .clear ()
780
823
if ocean .app .process_execution_mode == ProcessExecutionMode .multi_process :
0 commit comments