Skip to content

Commit ba7b584

Browse files
authored
Merge pull request #7975 from dengyh/refactor/trace_improve
refactor: 增加支持继承属性的trace装饰器和上下文管理器 --story=126421543
2 parents d166368 + aeced1f commit ba7b584

File tree

4 files changed

+126
-14
lines changed

4 files changed

+126
-14
lines changed

gcloud/core/trace.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
4+
Edition) available.
5+
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
6+
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://opensource.org/licenses/MIT
9+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
specific language governing permissions and limitations under the License.
12+
"""
13+
import copy
14+
from contextlib import contextmanager
15+
from functools import wraps
16+
17+
from opentelemetry import trace
18+
from opentelemetry.sdk.trace.export import SpanProcessor
19+
from opentelemetry.trace import SpanKind
20+
21+
22+
class AttributeInjectionSpanProcessor(SpanProcessor):
23+
"""Span处理器,用于在Span开始时设置属性"""
24+
25+
def __init__(self, attributes):
26+
self.attributes = attributes
27+
28+
def on_start(self, span: trace.Span, parent_context):
29+
if not isinstance(span, trace.Span):
30+
return
31+
32+
for key, value in self.attributes.items():
33+
span.set_attribute(key, value)
34+
35+
def on_end(self, span: trace.Span):
36+
# Implement custom logic if needed on span end
37+
pass
38+
39+
40+
def propagate_attributes(attributes: dict):
41+
"""把attributes设置到span上,并继承到后面所有span
42+
43+
:param attributes: 默认属性
44+
"""
45+
46+
provider = trace.get_tracer_provider()
47+
48+
# Add a span processor that sets attributes on every new span
49+
provider.add_span_processor(AttributeInjectionSpanProcessor(attributes))
50+
51+
52+
@contextmanager
53+
def start_trace(span_name: str, propagate: bool = False, **attributes):
54+
"""Start a trace
55+
56+
:param span_name: 自定义Span名称
57+
:param propagate: 是否需要传播
58+
:param attributes: 需要跟span增加的属性, 默认为空
59+
:yield: 当前上下文的Span
60+
"""
61+
tracer = trace.get_tracer(__name__)
62+
63+
span_attributes = {f"bk_sops.{key}": value for key, value in attributes.items()}
64+
65+
# 设置需要传播的属性
66+
if propagate:
67+
propagate_attributes(span_attributes)
68+
69+
with tracer.start_as_current_span(span_name, kind=SpanKind.SERVER) as span:
70+
# 如果不进行传播,则在当前span手动配置需要添加的属性
71+
for attr_key, attr_value in span_attributes.items():
72+
span.set_attribute(attr_key, attr_value)
73+
74+
yield span
75+
76+
77+
def trace_view(propagate: bool = True, attr_keys=None, **default_attributes):
78+
"""用来装饰view的trace装饰器
79+
80+
:param propagate: 是否需要传播
81+
:param attr_keys: 需要从request和url中获取的属性
82+
:param default_attributes: 默认属性
83+
:return: view_func
84+
"""
85+
attr_keys = attr_keys or []
86+
87+
def decorator(view_func):
88+
@wraps(view_func)
89+
def _wrapped_view(request, *args, **kwargs):
90+
attributes = copy.deepcopy(default_attributes)
91+
92+
for attr_key in attr_keys:
93+
# 需要的属性只要在kwargs, request.GET, request.POST中就可以
94+
for scope in (kwargs, request.GET, request.POST):
95+
if attr_key in scope:
96+
attributes[attr_key] = kwargs[attr_key]
97+
break
98+
99+
with start_trace(view_func.__name__, propagate, **attributes):
100+
return view_func(request, *args, **kwargs)
101+
102+
return _wrapped_view
103+
104+
return decorator

gcloud/taskflow3/apis/django/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from gcloud.contrib.operate_record.constants import OperateType, RecordType
3737
from gcloud.contrib.operate_record.decorators import record_operation
3838
from gcloud.core.models import EngineConfig
39+
from gcloud.core.trace import trace_view
3940
from gcloud.iam_auth import IAMMeta
4041
from gcloud.iam_auth.intercept import iam_intercept
4142
from gcloud.iam_auth.view_interceptors.taskflow import (
@@ -252,6 +253,7 @@ def get_job_instance_log(request, biz_cc_id):
252253

253254
@require_POST
254255
@request_validate(TaskActionValidator)
256+
@trace_view(attr_keys=["project_id"], call_from="web")
255257
@iam_intercept(TaskActionInterceptor())
256258
@record_operation(RecordType.task.name, OperateType.task_action.name)
257259
def task_action(request, action, project_id):
@@ -280,6 +282,7 @@ def task_action(request, action, project_id):
280282

281283
@swagger_auto_schema(methods=["POST"], auto_schema=AnnotationAutoSchema)
282284
@request_validate(NodesActionValidator)
285+
@trace_view(attr_keys=["project_id"], call_from="web")
283286
@iam_intercept(NodesActionInterceptor())
284287
@api_view(["POST"])
285288
@record_operation(RecordType.task.name, OperateType.nodes_action.name)

gcloud/taskflow3/domains/dispatchers/node.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from bamboo_engine import states as bamboo_engine_states
2121
from bamboo_engine.eri import ContextValueType
2222
from django.utils.translation import ugettext_lazy as _
23-
from opentelemetry import trace
2423
from pipeline.component_framework.library import ComponentLibrary
2524
from pipeline.engine import api as pipeline_api
2625
from pipeline.engine import exceptions as pipeline_exceptions
@@ -34,6 +33,7 @@
3433

3534
from engine_pickle_obj.context import SystemObject
3635
from gcloud import err_code
36+
from gcloud.core.trace import start_trace
3737
from gcloud.project_constants.domains.context import get_project_constants_context
3838
from gcloud.taskflow3.utils import format_pipeline_status
3939
from gcloud.tasktmpl3.domains.constants import preview_node_inputs
@@ -73,12 +73,14 @@ def dispatch(self, command: str, operator: str, **kwargs) -> dict:
7373
if command not in self.NODE_COMMANDS:
7474
return {"result": False, "message": "task command is invalid", "code": err_code.INVALID_OPERATION.code}
7575

76-
with trace.get_tracer(__name__).start_as_current_span("node_operate") as span:
77-
span.set_attribute("bk_sops.task_id", self.taskflow_id)
78-
span.set_attribute("bk_sops.node_id", self.node_id)
79-
span.set_attribute("bk_sops.engine_ver", self.engine_ver)
80-
span.set_attribute("bk_sops.node_command", command)
81-
76+
with start_trace(
77+
span_name="node_operate",
78+
propagate=False,
79+
task_id=self.taskflow_id,
80+
node_id=self.node_id,
81+
engine_ver=self.engine_ver,
82+
node_command=command,
83+
):
8284
return getattr(self, "{}_v{}".format(command, self.engine_ver))(operator=operator, **kwargs)
8385

8486
@ensure_return_is_dict

gcloud/taskflow3/domains/dispatchers/task.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
from django.db import transaction
2626
from django.utils import timezone
2727
from django.utils.translation import ugettext_lazy as _
28-
from opentelemetry import trace
2928
from pipeline.core.data.var import Variable
3029
from pipeline.engine import api as pipeline_api
3130
from pipeline.engine import exceptions as pipeline_exceptions
@@ -39,6 +38,7 @@
3938

4039
from engine_pickle_obj.context import SystemObject
4140
from gcloud import err_code
41+
from gcloud.core.trace import start_trace
4242
from gcloud.project_constants.domains.context import get_project_constants_context
4343
from gcloud.taskflow3.domains.context import TaskContext
4444
from gcloud.taskflow3.signals import pre_taskflow_start, taskflow_started
@@ -94,12 +94,15 @@ def dispatch(self, command: str, operator: str) -> dict:
9494
if command in self.OPERATION_TYPE_COMMANDS and not self.pipeline_instance.is_started:
9595
return {"result": False, "message": "task not started", "code": err_code.INVALID_OPERATION.code}
9696

97-
with trace.get_tracer(__name__).start_as_current_span("task_operate") as span:
98-
span.set_attribute("bk_sops.task_id", self.taskflow_id)
99-
span.set_attribute("bk_sops.pipeline_id", self.pipeline_instance.instance_id)
100-
span.set_attribute("bk_sops.engine_ver", self.engine_ver)
101-
span.set_attribute("bk_sops.task_command", command)
102-
97+
with start_trace(
98+
span_name="task_operate",
99+
propagate=False,
100+
task_id=self.taskflow_id,
101+
pipeline_id=self.pipeline_instance.instance_id,
102+
executor=operator,
103+
engine_ver=self.engine_ver,
104+
task_command=command,
105+
):
103106
return getattr(self, "{}_v{}".format(command, self.engine_ver))(operator)
104107

105108
def start_v1(self, executor: str) -> dict:

0 commit comments

Comments
 (0)