Skip to content

Conversation

dennis-bilson-port
Copy link
Member

@dennis-bilson-port dennis-bilson-port commented Aug 21, 2025

User description

Description

What

  • Defer real-time quota controller initialization in integrations/gcp/main.py to avoid pre-fork gRPC usage.

Why

  • Multi-process mode uses fork; gRPC (google-cloud clients) is not fork-safe. Initializing gRPC in the parent before forking leads to deadlocks and the warning: “Other threads are currently calling into gRPC, skipping fork() handlers.”

How

  • Replace the on_start call to resolve_request_controllers() with conservative defaults derived from search_all_resources_per_minute_quota (use 80% as effective rate).
  • No changes to resync/search codepaths; gRPC clients are still instantiated within child execution where it’s safe.
  • Backward-compatible; behavior remains configurable via integration settings; eliminates fork+gRPC deadlock in multi-process mode.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Bug fix


Description

  • Fix gRPC fork deadlock in GCP integration multi-process mode

  • Replace dynamic quota discovery with conservative defaults

  • Defer gRPC initialization to child processes

  • Maintain backward compatibility with configurable settings


Diagram Walkthrough

flowchart LR
  A["Parent Process"] --> B["Fork Child Processes"]
  B --> C["Child Process gRPC Init"]
  D["Old: Pre-fork gRPC"] --> E["Deadlock Risk"]
  A --> F["New: Conservative Defaults"]
  F --> C
Loading

File Walkthrough

Relevant files
Bug fix
main.py
Defer gRPC initialization with conservative defaults         

integrations/gcp/main.py

  • Replace dynamic gRPC quota discovery with conservative defaults
  • Calculate effective quota as 80% of configured search quota
  • Initialize rate limiter and semaphore with static values
  • Fix typo in comment (ROJECT -> PROJECT)
+11/-7   
Documentation
CHANGELOG.md
Add changelog entry for gRPC fork fix                                       

integrations/gcp/CHANGELOG.md

  • Add changelog entry for version 0.1.176
  • Document fix for multi-process resync hang issue
+8/-0     
Configuration changes
pyproject.toml
Version bump to 0.1.176                                                                   

integrations/gcp/pyproject.toml

  • Bump version from 0.1.175 to 0.1.176
+1/-1     

@dennis-bilson-port dennis-bilson-port self-assigned this Aug 21, 2025
@dennis-bilson-port dennis-bilson-port marked this pull request as ready for review August 21, 2025 12:23
@dennis-bilson-port dennis-bilson-port requested a review from a team as a code owner August 21, 2025 12:23
Copy link
Contributor

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Concurrency Limit Choice

The semaphore is set to the same value as the per-minute rate (effective_quota). If tasks are bursty, this could allow a large concurrent fan-out that exceeds backend limits despite rate limiting. Consider capping concurrency separately (e.g., small fixed number) or tying it to per-second rate.

PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = asyncio.BoundedSemaphore(
    effective_quota
)
Config Robustness

search_all_resources_per_minute_quota is cast to int without validation. Non-integer or negative values in config could cause ValueError or ineffective limits. Consider safe parsing, lower bounds, and logging when defaults are applied.

default_quota = int(
    ocean.integration_config.get("search_all_resources_per_minute_quota", 400)
)
effective_quota = max(int(default_quota * 0.8), 1)
Background Threshold Scaling

BACKGROUND_TASK_THRESHOLD scales with max_rate*10. With high quotas this might spawn excessive background tasks. Validate the multiplier or clamp to a sane maximum to avoid memory/CPU spikes.

BACKGROUND_TASK_THRESHOLD = float(
    PROJECT_V3_GET_REQUESTS_RATE_LIMITER.max_rate * 10
)

Copy link
Contributor

qodo-merge-pro bot commented Aug 21, 2025

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Preserve dynamic quota discovery

Hard-coding rate limits from a static 80% of search quota may misalign real-time
limits with actual per-project/topic quotas and regional throttles, risking
either under-utilization or renewed throttling. Consider deferring only
gRPC-dependent discovery while still computing accurate limits post-fork in
child processes (or lazily on first use) to preserve correctness without
pre-fork gRPC initialization.

Examples:

integrations/gcp/main.py [102-120]
async def setup_real_time_request_controllers() -> None:
    global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
    global PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE
    global BACKGROUND_TASK_THRESHOLD
    if not ocean.event_listener_type == "ONCE":
        default_quota = int(
            ocean.integration_config.get("search_all_resources_per_minute_quota", 400)
        )
        effective_quota = max(int(default_quota * 0.8), 1)


 ... (clipped 9 lines)

Solution Walkthrough:

Before:

async def setup_real_time_request_controllers() -> None:
    # This runs pre-fork in the parent process
    global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
    global PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE

    # Uses a static, conservative default to avoid gRPC calls
    default_quota = ocean.integration_config.get(...)
    effective_quota = int(default_quota * 0.8)

    PROJECT_V3_GET_REQUESTS_RATE_LIMITER = PersistentAsyncLimiter.get_limiter(max_rate=effective_quota)
    PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = asyncio.BoundedSemaphore(effective_quota)

After:

# In main.py
PROJECT_V3_GET_REQUESTS_RATE_LIMITER = None
PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = None
_limiter_lock = asyncio.Lock()

async def get_or_init_rate_limiters():
    # This function would be called post-fork from within a child process
    global PROJECT_V3_GET_REQUESTS_RATE_LIMITER
    global PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE
    async with _limiter_lock:
        if PROJECT_V3_GET_REQUESTS_RATE_LIMITER is None:
            # Perform dynamic gRPC-based discovery here, now that it's safe
            limiter, semaphore = await resolve_request_controllers(...)
            PROJECT_V3_GET_REQUESTS_RATE_LIMITER = limiter
            PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = semaphore
    return PROJECT_V3_GET_REQUESTS_RATE_LIMITER, PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE
Suggestion importance[1-10]: 8

__

Why: This is a significant architectural suggestion that correctly identifies a functional regression (loss of dynamic quota accuracy) introduced by the fix and proposes a valid alternative that solves the gRPC fork-safety issue while preserving the original, more accurate behavior.

Medium
Possible issue
Clamp and validate quota values

Guard against zero or negative quotas and mismatched limiter/semaphore
capacities. Also ensure a sensible upper bound to avoid excessive parallelism if
configuration is set too high. Clamp the effective quota to a safe min/max and
reuse that value consistently.

integrations/gcp/main.py [107-120]

-default_quota = int(
-    ocean.integration_config.get("search_all_resources_per_minute_quota", 400)
-)
-effective_quota = max(int(default_quota * 0.8), 1)
+default_quota_raw = ocean.integration_config.get("search_all_resources_per_minute_quota", 400)
+try:
+    default_quota = int(default_quota_raw)
+except (TypeError, ValueError):
+    default_quota = 400
+# Clamp to [1, 10_000] to avoid zero/negative and runaway parallelism
+effective_quota = max(min(int(default_quota * 0.8), 10_000), 1)
 
 PROJECT_V3_GET_REQUESTS_RATE_LIMITER = PersistentAsyncLimiter.get_limiter(
     max_rate=effective_quota
 )
 PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = asyncio.BoundedSemaphore(
     effective_quota
 )
-BACKGROUND_TASK_THRESHOLD = float(
-    PROJECT_V3_GET_REQUESTS_RATE_LIMITER.max_rate * 10
-)
+BACKGROUND_TASK_THRESHOLD = float(effective_quota * 10)
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion improves robustness by adding a try-except block to handle invalid quota configurations and introduces a sensible upper bound to prevent excessive parallelism.

Medium
  • Update

Copy link
Contributor

@melodyogonna melodyogonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines 107 to 116
default_quota = int(
ocean.integration_config.get("search_all_resources_per_minute_quota", 400)
)
effective_quota = max(int(default_quota * 0.8), 1)

PROJECT_V3_GET_REQUESTS_RATE_LIMITER = PersistentAsyncLimiter.get_limiter(
max_rate=effective_quota
)
PROJECT_V3_GET_REQUESTS_BOUNDED_SEMAPHORE = asyncio.BoundedSemaphore(
effective_quota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! However, I think changing this to always use the static quota declared in the env might not be well aligned to the GCP integration. In general we use the dynamic quota for gcp and then use this config as fallback.

I will like if we can focus on making the integration use different instance of the grpc if possible or rather than setting the global persistent limiter on start we can defer it till when the first live event comes in and it is needed though I still think we might run into the same problems again.

Let's try to have different instance of the grpc do the call that happens on the main process for the quota used in live event so that way we don't have to fork across process and that should be safe. I don't think there is issue accross sub processes so that might solve the issue.

Let me know your thoughts

Copy link
Member Author

@dennis-bilson-port dennis-bilson-port Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sound advice, @oiadebayo.
My proposed resolution is to make the initial call to the quota API using the ocean http_sync_client in the main process and allow the rest of the resync and live events use the grpc forks just like they used to.
The issue doesn't occur on resyncs and on live events ingestion.

Sounds good?

@github-actions github-actions bot added size/M and removed size/S labels Aug 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants