File tree Expand file tree Collapse file tree 1 file changed +17
-1
lines changed Expand file tree Collapse file tree 1 file changed +17
-1
lines changed Original file line number Diff line number Diff line change 6
6
from airflow .exceptions import AirflowClusterPolicyViolation
7
7
8
8
ALLOWED_OWNERS = "team_contacts"
9
+ ALLOWED_TAGS = "airflow_tags_allowed_list"
9
10
10
11
11
12
def dag_policy (dag ):
@@ -26,6 +27,11 @@ def dag_policy(dag):
26
27
else dag .dagrun_timeout
27
28
)
28
29
30
+ # Set tasks retries max to 3
31
+ retries = dag .default_args .get ("retries" , False )
32
+ if retries and retries > 3 :
33
+ dag .default_args ["retries" ] = 3
34
+
29
35
# Check if owner exists
30
36
owner = dag .default_args .get ("owner" , "" )
31
37
owner_dag_list = owner .split ("," )
@@ -49,8 +55,18 @@ def dag_policy(dag):
49
55
)
50
56
51
57
# Check if dag has tags
52
- if not dag .tags :
58
+ tags = dag .tags
59
+ if not tags :
53
60
raise AirflowClusterPolicyViolation (
54
61
f"DAG has no tags. At least one tag required."
55
62
)
63
+
64
+ # Check if tag is allowed
65
+ tag_allowed_list = yaml .safe_load (Variable .get (ALLOWED_TAGS ))
66
+ if not all (item in tag_allowed_list for item in tags ):
67
+ raise AirflowClusterPolicyViolation (
68
+ f"One of tags(s) { tags } not in Airflow Variable { ALLOWED_TAGS } "
69
+ )
70
+
71
+
56
72
# EOF
You can’t perform that action at this time.
0 commit comments