Skip to content

Commit ba6ad97

Browse files
authored
Merge pull request #7982 from guohelu/feature_template_webhook
feat: 任务模板支持webhook --story=126514126
2 parents 953fc36 + ee218ec commit ba6ad97

File tree

11 files changed

+142
-3
lines changed

11 files changed

+142
-3
lines changed

bin/pre_release

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ python manage.py createcachetable django_cache
44
python manage.py update_component_models
55
python manage.py update_variable_models
66
python manage.py sync_saas_apigw
7-
python manage.py register_bksops_notice
7+
python manage.py register_bksops_notice
8+
python manage.py sync_webhook_events ./config webhook_resources.yaml

config/default.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
"apigw_manager.apigw",
118118
"bk_notice_sdk",
119119
"bk_audit.contrib.bk_audit",
120+
"webhook",
120121
)
121122

122123
# 这里是默认的中间件,大部分情况下,不需要改动

config/webhook_resources.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
version: 1
2+
events:
3+
- code: task_failed
4+
name: 任务失败
5+
- code: task_finished
6+
name: 任务完成
Binary file not shown.

gcloud/constants.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,5 +259,14 @@ class GseAgentStatus(Enum):
259259
ONlINE = 1
260260

261261

262-
def get_default_scope():
263-
return {PROJECT: ["*"]}
262+
class WebhookScopeType(Enum):
263+
"""webhook作用域类型"""
264+
265+
TEMPLATE = "template"
266+
267+
268+
class WebhookEventType(Enum):
269+
"""webhook事件类型"""
270+
271+
TASK_FAILED = "task_failed"
272+
TASK_FINISHED = "task_finished"

gcloud/core/apis/drf/serilaziers/task_template.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class CreateTaskTemplateSerializer(BaseTaskTemplateSerializer):
7070
pipeline_tree = serializers.CharField()
7171
project = serializers.IntegerField(write_only=True)
7272
template_id = serializers.CharField(help_text="模板ID", source="id", read_only=True)
73+
webhook_configs = serializers.JSONField(help_text="webhook配置", required=False)
7374

7475
def validate_project(self, value):
7576
try:
@@ -113,6 +114,7 @@ class Meta:
113114
"pipeline_tree",
114115
"project",
115116
"template_id",
117+
"webhook_configs",
116118
]
117119

118120

gcloud/core/apis/drf/viewsets/task_template.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
from gcloud.tasktmpl3.signals import post_template_save_commit
5252
from gcloud.template_base.domains.template_manager import TemplateManager
5353
from gcloud.user_custom_config.constants import TASKTMPL_ORDERBY_OPTIONS
54+
from gcloud.utils.webhook import apply_webhook_configs, get_webhook_configs, clear_scope_webhooks
55+
from gcloud.constants import WebhookScopeType
5456

5557
logger = logging.getLogger("root")
5658
manager = TemplateManager(template_model_cls=TaskTemplate)
@@ -166,10 +168,12 @@ def list(self, request, *args, **kwargs):
166168
user_model.objects.get(username=request.user.username).tasktemplate_set.all().values_list("id", flat=True)
167169
)
168170
template_ids = [obj["id"] for obj in data]
171+
webhook_configs = get_webhook_configs(scope_code=template_ids)
169172
templates_labels = TemplateLabelRelation.objects.fetch_templates_labels(template_ids)
170173
for obj in data:
171174
obj["is_add"] = 1 if obj["id"] in collected_templates else 0
172175
obj["template_labels"] = templates_labels.get(obj["id"], [])
176+
obj["webhook_configs"] = webhook_configs.get(str(obj["id"]), {})
173177
return self.get_paginated_response(data) if page is not None else Response(data)
174178

175179
@swagger_auto_schema(
@@ -196,11 +200,13 @@ def list_with_top_collection(self, request, *args, **kwargs):
196200
# 注入权限
197201
data = self.injection_auth_actions(request, serializer.data, serializer.instance)
198202
template_ids = [obj["id"] for obj in data]
203+
webhook_configs = get_webhook_configs(scope_code=template_ids)
199204
templates_labels = TemplateLabelRelation.objects.fetch_templates_labels(template_ids)
200205
for obj in data:
201206
obj["template_labels"] = templates_labels.get(obj["id"], [])
202207
obj["is_collected"] = 1 if obj["id"] in collection_template_ids else 0
203208
obj["collection_id"] = collection_template_map.get(obj["id"], -1)
209+
obj["webhook_configs"] = webhook_configs.get(str(obj["id"]), {})
204210
return self.get_paginated_response(data) if page is not None else Response(data)
205211

206212
def retrieve(self, request, *args, **kwargs):
@@ -209,6 +215,8 @@ def retrieve(self, request, *args, **kwargs):
209215
data = self.injection_auth_actions(request, serializer.data, instance)
210216
labels = TemplateLabelRelation.objects.fetch_templates_labels([instance.id]).get(instance.id, [])
211217
data["template_labels"] = [label["label_id"] for label in labels]
218+
webhook_configs = get_webhook_configs(scope_code=[str(instance.id)])
219+
data["webhook_configs"] = webhook_configs.get(str(instance.id), {})
212220
bk_audit_add_event(
213221
username=request.user.username,
214222
action_id=IAMMeta.FLOW_VIEW_ACTION,
@@ -224,6 +232,7 @@ def create(self, request, *args, **kwargs):
224232
creator = request.user.username
225233
pipeline_tree = json.loads(serializer.validated_data.pop("pipeline_tree"))
226234
description = serializer.validated_data.pop("description", "")
235+
webhook_configs = serializer.validated_data.pop("webhook_configs", {})
227236
with transaction.atomic():
228237
result = manager.create_pipeline(
229238
name=name, creator=creator, pipeline_tree=pipeline_tree, description=description
@@ -237,6 +246,14 @@ def create(self, request, *args, **kwargs):
237246
serializer.validated_data["pipeline_template_id"] = result["data"].template_id
238247
template_labels = serializer.validated_data.pop("template_labels")
239248
self.perform_create(serializer)
249+
if webhook_configs:
250+
apply_result = apply_webhook_configs(webhook_configs, str(serializer.instance.id))
251+
if not apply_result["result"]:
252+
message = apply_result["message"]
253+
logger.error(message)
254+
return Response(
255+
{"detail": ErrorDetail(message, err_code.REQUEST_PARAM_INVALID.code)}, exception=True
256+
)
240257
self._sync_template_lables(serializer.instance.id, template_labels)
241258
headers = self.get_success_headers(serializer.data)
242259
# 发送信号
@@ -277,6 +294,7 @@ def update(self, request, *args, **kwargs):
277294
editor = request.user.username
278295
pipeline_tree = json.loads(serializer.validated_data.pop("pipeline_tree"))
279296
description = serializer.validated_data.pop("description", "")
297+
webhook_config = serializer.validated_data.pop("webhook_config", {})
280298
with transaction.atomic():
281299
result = manager.update_pipeline(
282300
pipeline_template=template.pipeline_template,
@@ -293,6 +311,17 @@ def update(self, request, *args, **kwargs):
293311

294312
serializer.validated_data["pipeline_template"] = template.pipeline_template
295313
template_labels = serializer.validated_data.pop("template_labels")
314+
if webhook_config:
315+
apply_result = apply_webhook_configs(webhook_config, str(serializer.instance.id))
316+
if not apply_result["result"]:
317+
message = apply_result["message"]
318+
logger.error(message)
319+
return Response(
320+
{"detail": ErrorDetail(message, err_code.REQUEST_PARAM_INVALID.code)}, exception=True
321+
)
322+
elif not webhook_config and get_webhook_configs([str(serializer.instance.id)]):
323+
clear_scope_webhooks(WebhookScopeType.TEMPLATE.value, [str(serializer.instance.id)])
324+
296325
self.perform_update(serializer)
297326
self._sync_template_lables(serializer.instance.id, template_labels)
298327
# 发送信号
@@ -332,6 +361,11 @@ def destroy(self, request, *args, **kwargs):
332361
relation_queryset = TemplateRelationship.objects.filter(ancestor_template_id=pipeline_template_id)
333362
for relation in relation_queryset:
334363
relation.templatescheme_set.clear()
364+
clear_result = clear_scope_webhooks(WebhookScopeType.TEMPLATE.value, [template.id])
365+
if not clear_result["result"]:
366+
message = clear_result["message"]
367+
logger.error(message)
368+
return Response({"detail": ErrorDetail(message, err_code.REQUEST_PARAM_INVALID.code)}, exception=True)
335369
# 删除流程模板
336370
template.is_deleted = True
337371
template.save()

gcloud/taskflow3/signals/handlers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
TimeoutNodeConfig,
4343
)
4444
from gcloud.taskflow3.signals import taskflow_finished, taskflow_revoked
45+
from gcloud.constants import WebhookScopeType, WebhookEventType
46+
from webhook.signals import event_broadcast_signal
4547

4648
logger = logging.getLogger("celery")
4749

@@ -83,6 +85,22 @@ def _send_node_fail_message(node_id, pipeline_id):
8385
logger.exception("pipeline_fail_handler[taskflow_id=%s] task delay error: %s" % (taskflow.id, e))
8486

8587

88+
def send_task_message(pipeline_id, msg_type):
89+
try:
90+
taskflow = TaskFlowInstance.objects.get(pipeline_instance__instance_id=pipeline_id)
91+
# broadcast events through webhooks
92+
event = WebhookEventType.TASK_FAILED.value if msg_type == ATOM_FAILED else WebhookEventType.TASK_FINISHED.value
93+
scopes = [(WebhookScopeType.TEMPLATE.value, str(taskflow.template_id))]
94+
event_broadcast_signal.send(
95+
sender=event, scopes=scopes, extra_info={"taskflow_id": taskflow.id, "event": event}
96+
)
97+
98+
except Exception as e:
99+
logger.exception(f"[send_task_message] task() send message({msg_type}) error: {e}")
100+
else:
101+
logger.info(f"[send_task_message] task() send message({msg_type}) success")
102+
103+
86104
def _check_and_callback(task, *args, **kwargs):
87105
record = TaskCallBackRecord.objects.filter(task_id=task.id).first()
88106
if not record:
@@ -185,6 +203,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,
185203
return
186204

187205
_send_node_fail_message(node_id=node_id, pipeline_id=root_id)
206+
send_task_message(pipeline_id=root_id, msg_type=ATOM_FAILED)
188207

189208
elif to_state == bamboo_engine_states.REVOKED and node_id == root_id:
190209
try:
@@ -198,6 +217,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,
198217
pipeline_end.send(sender=Pipeline, root_pipeline_id=root_id)
199218
except Exception:
200219
logger.exception("pipeline_end send error")
220+
send_task_message(pipeline_id=root_id, msg_type=TASK_FINISHED)
201221

202222
try:
203223
_node_timeout_info_update(settings.redis_inst, to_state, node_id, version)

gcloud/template_base/apis/drf/viewsets/template.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from gcloud.template_base.apis.drf.permission import CommonTemplatePermission, ProjectTemplatePermission
2525
from gcloud.template_base.apis.drf.serilaziers.template import BatchDeleteSerialzer, TemplateIdsSerializer
2626
from gcloud.template_base.domains.template_manager import TemplateManager
27+
from gcloud.utils.webhook import clear_scope_webhooks
28+
from gcloud.constants import WebhookScopeType
2729

2830
logger = logging.getLogger("root")
2931

@@ -39,6 +41,9 @@ def batch_delete(self, request, *args, **kwargs):
3941
body_serializer = self.template_ids_serializer(data=data)
4042
body_serializer.is_valid(raise_exception=True)
4143
template_ids = body_serializer.validated_data.get("template_ids")
44+
clear_result = clear_scope_webhooks(WebhookScopeType.TEMPLATE.value, template_ids)
45+
if not clear_result["result"]:
46+
raise APIException(f'[batch_delete] clear_webhooks False: {clear_result["message"]}')
4247

4348
manager = TemplateManager(template_model_cls=self.tmpl_model)
4449
result = manager.batch_delete(template_ids)

gcloud/utils/webhook.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
4+
Edition) available.
5+
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
6+
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://opensource.org/licenses/MIT
9+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
specific language governing permissions and limitations under the License.
12+
"""
13+
14+
import logging
15+
16+
from webhook.api import apply_scope_subscriptions, apply_scope_webhooks
17+
from gcloud.constants import WebhookScopeType, WebhookEventType
18+
from webhook.api import clear_webhooks, get_scope_webhook_retry_policy
19+
20+
logger = logging.getLogger("root")
21+
22+
23+
def get_webhook_configs(scope_code: list):
24+
try:
25+
webhooks = get_scope_webhook_retry_policy(scope_type=WebhookScopeType.TEMPLATE.value, scope_code=scope_code)
26+
except Exception as e:
27+
logger.exception(f"get_scope_webhooks error: {e}")
28+
return {"result": False, "message": f"Failed to get webhook configs: {e}", "data": {}, "code": "500"}
29+
30+
return webhooks
31+
32+
33+
def clear_scope_webhooks(scope_type: str, scope_code: list):
34+
try:
35+
clear_webhooks(scope_type=scope_type, scope_code=scope_code)
36+
except Exception as e:
37+
logger.exception(f"clear_webhooks error: {e}")
38+
return {"result": False, "message": f"Failed to clear webhooks: {e}", "data": {}, "code": "500"}
39+
40+
return {"result": True, "message": "success", "data": {}, "code": "0"}
41+
42+
43+
def apply_webhook_configs(webhook_configs, scope_code):
44+
webhook_code = f"template_{scope_code}_webhook"
45+
webhook_name = f"template_{scope_code}_webhook"
46+
webhook_configs.update({"code": webhook_code, "name": webhook_name})
47+
scope_type = WebhookScopeType.TEMPLATE.value
48+
subscription_configs = {webhook_code: [WebhookEventType.TASK_FAILED.value, WebhookEventType.TASK_FINISHED.value]}
49+
try:
50+
apply_scope_webhooks(scope_type=scope_type, scope_code=scope_code, webhooks=[webhook_configs])
51+
apply_scope_subscriptions(
52+
scope_type=scope_type, scope_code=scope_code, subscription_configs=subscription_configs
53+
)
54+
except Exception as e:
55+
logger.exception(f"apply_webhook_configs error: {e}")
56+
return {"result": False, "message": f"Failed to apply webhook configs: {e}", "data": {}, "code": "500"}
57+
58+
return {"result": True, "message": "success", "data": {}, "code": "0"}

0 commit comments

Comments
 (0)