2
2
import pandas as pd
3
3
import numpy as np
4
4
5
+ from spaceone .core .auth .jwt .jwt_util import JWTUtil
5
6
from spaceone .core .manager import BaseManager
6
7
from spaceone .core .connector .space_connector import SpaceConnector
7
8
from spaceone .statistics .error import *
20
21
21
22
22
23
class ResourceManager (BaseManager ):
23
- def stat (self , aggregate : list , page : dict ) -> dict :
24
- results = self ._execute_aggregate_operations (aggregate )
24
+ def stat (self , aggregate : list , page : dict , domain_id : str = None ) -> dict :
25
+ results = self ._execute_aggregate_operations (aggregate , domain_id )
25
26
return self ._page (page , results )
26
27
27
- def _execute_aggregate_operations (self , aggregate ):
28
+ def _execute_aggregate_operations (self , aggregate : list , domain_id : str = None ):
28
29
df = None
29
30
30
31
if "query" not in aggregate [0 ]:
31
32
raise ERROR_REQUIRED_QUERY_OPERATION ()
32
33
33
34
for stage in aggregate :
34
35
if "query" in stage :
35
- df = self ._query (stage ["query" ])
36
+ df = self ._query (stage ["query" ], domain_id = domain_id )
36
37
37
38
elif "join" in stage :
38
- df = self ._join (stage ["join" ], df )
39
+ df = self ._join (stage ["join" ], df , domain_id )
39
40
40
41
elif "concat" in stage :
41
- df = self ._concat (stage ["concat" ], df )
42
+ df = self ._concat (stage ["concat" ], df , domain_id )
42
43
43
44
elif "sort" in stage :
44
45
df = self ._sort (stage ["sort" ], df )
@@ -122,8 +123,8 @@ def _sort(options, base_df):
122
123
123
124
return base_df
124
125
125
- def _concat (self , options , base_df ):
126
- concat_df = self ._query (options , operator = "join" )
126
+ def _concat (self , options , base_df , domain_id ):
127
+ concat_df = self ._query (options , operator = "join" , domain_id = domain_id )
127
128
128
129
try :
129
130
base_df = pd .concat ([base_df , concat_df ], ignore_index = True )
@@ -152,15 +153,15 @@ def _generate_empty_data(query):
152
153
153
154
return pd .DataFrame (empty_data )
154
155
155
- def _join (self , options , base_df ):
156
+ def _join (self , options , base_df , domain_id ):
156
157
if "type" in options and options ["type" ] not in _JOIN_TYPE_MAP :
157
158
raise ERROR_INVALID_PARAMETER_TYPE (
158
159
key = "aggregate.join.type" , type = list (_JOIN_TYPE_MAP .keys ())
159
160
)
160
161
161
162
join_keys = options .get ("keys" )
162
163
join_type = options .get ("type" , "LEFT" )
163
- join_df = self ._query (options , operator = "join" )
164
+ join_df = self ._query (options , operator = "join" , domain_id = domain_id )
164
165
165
166
try :
166
167
if join_keys :
@@ -185,7 +186,7 @@ def _join(self, options, base_df):
185
186
186
187
return base_df
187
188
188
- def _query (self , options , operator = "query" ):
189
+ def _query (self , options , operator = "query" , domain_id = None ):
189
190
resource_type = options .get ("resource_type" )
190
191
query = options .get ("query" )
191
192
extend_data = options .get ("extend_data" , {})
@@ -199,12 +200,21 @@ def _query(self, options, operator="query"):
199
200
service , resource = self ._parse_resource_type (resource_type )
200
201
201
202
try :
203
+ token = self .transaction .get_meta ("token" )
204
+ token_type = JWTUtil .get_value_from_token (token , "typ" )
205
+
202
206
connector : SpaceConnector = self .locator .get_connector (
203
207
"SpaceConnector" , service = service
204
208
)
205
209
206
210
_LOGGER .debug (f"[_query] stat resource: { resource_type } .stat" )
207
- response = connector .dispatch (f"{ resource } .stat" , {"query" : query })
211
+ if token_type == "SYSTEM_TOKEN" :
212
+ response = connector .dispatch (
213
+ f"{ resource } .stat" , {"query" : query }, x_domain_id = domain_id
214
+ )
215
+ else :
216
+ response = connector .dispatch (f"{ resource } .stat" , {"query" : query })
217
+
208
218
results = response .get ("results" , [])
209
219
210
220
if len (results ) > 0 and not isinstance (results [0 ], dict ):
0 commit comments