Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app_desc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ modules:
plan: 4C1G5R
replicas: 2
cworker:
command: python manage.py celery worker -Q pipeline_additional_task,pipeline_additional_task_priority,node_auto_retry,timeout_node_execute,timeout_nodes_record,task_callback -n common_worker@%h -P threads -c 6 -l info
command: celery worker -A blueapps.core.celery -P threads -Q pipeline_additional_task,pipeline_additional_task_priority,node_auto_retry,timeout_node_execute,timeout_nodes_record,task_callback -n common_worker@%h -c 6 -l info
plan: 4C1G5R
replicas: 2
er-e:
Expand Down
10 changes: 10 additions & 0 deletions gcloud/core/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ def propagate_attributes(attributes: dict):
provider.add_span_processor(AttributeInjectionSpanProcessor(attributes))


def append_attributes(attributes: dict):
"""追加属性到span上

:param attributes: 需要追加的属性
"""
current_span = trace.get_current_span()
for key, value in attributes.items():
current_span.set_attribute(f"bk_sops.{key}", value)


@contextmanager
def start_trace(span_name: str, propagate: bool = False, **attributes):
"""Start a trace
Expand Down
5 changes: 5 additions & 0 deletions gcloud/periodictask/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from gcloud.constants import PROJECT, TaskCreateMethod
from gcloud.core.models import EngineConfig
from gcloud.core.trace import CallFrom, append_attributes, propagate_attributes
from gcloud.periodictask.models import PeriodicTaskHistory
from gcloud.shortcuts.message import send_periodic_task_message
from gcloud.taskflow3.domains.auto_retry import AutoRetryNodeStrategyCreator
Expand All @@ -45,6 +46,10 @@ def pre_periodic_task_start_handler(sender, periodic_task, pipeline_instance, **
engine_ver=periodic_task.extra_info.get("engine_ver", EngineConfig.ENGINE_VER_V1),
)

# 设置传播到后台所有Span的公共属性
propagate_attributes({"project_id": task.project.id, "call_from": CallFrom.BACKEND.value, "task_id": task.id})
append_attributes({"executor": task.executor})

task.record_and_get_executor_proxy(task.executor or periodic_task.creator)

# crete auto retry strategy
Expand Down
Loading