72
72
}
73
73
"""
74
74
75
+
75
76
class LynkContext :
76
77
def __init__ (self , api_url , token , prod_id , prod , env_id , env , ver_id , ver ):
77
78
self .api_url = api_url or INTERLYNK_API_URL
@@ -91,20 +92,20 @@ def validate(self):
91
92
self .data = self ._fetch_context ()
92
93
if not self .data or self .data .get ('errors' ):
93
94
print ("Error getting Interlynk data" )
94
- print ("Possible problems: invalid security token, stale pylynk or invalid INTERLYNK_API_URL" )
95
+ print (
96
+ "Possible problems: invalid security token, stale pylynk or invalid INTERLYNK_API_URL" )
95
97
return False
96
98
97
99
if (self .prod or self .prod_id ) and not self .resolve_prod ():
98
100
self .prod = self .prod_id = None
99
101
print ('Product not found' )
100
102
return False
101
103
102
- if self .prod and not self .resolve_env (): # There must be a default env
104
+ if self .prod and not self .resolve_env (): # There must be a default env
103
105
self .env = self .env_id = None
104
106
print ('Environment not found' )
105
107
return False
106
108
107
-
108
109
if (self .ver or self .ver_id ) and not self .resolve_ver ():
109
110
self .ver = self .ver_id = None
110
111
print ('Version not found' )
@@ -132,11 +133,15 @@ def _fetch_context(self):
132
133
133
134
def resolve_prod (self ):
134
135
if not self .prod_id :
135
- nodes = self .data .get ('data' , {}).get ('organization' , {}).get ('productNodes' , {}).get ('products' , [])
136
- self .prod_id = next ((node ['id' ] for node in nodes if node ['name' ] == self .prod ), None )
136
+ nodes = self .data .get ('data' , {}).get ('organization' , {}).get (
137
+ 'productNodes' , {}).get ('products' , [])
138
+ self .prod_id = next (
139
+ (node ['id' ] for node in nodes if node ['name' ] == self .prod ), None )
137
140
if not self .prod :
138
- nodes = self .data .get ('data' , {}).get ('organization' , {}).get ('productNodes' , {}).get ('products' , [])
139
- self .prod = next ((node ['name' ] for node in nodes if node ['id' ] == self .prod_id ), None )
141
+ nodes = self .data .get ('data' , {}).get ('organization' , {}).get (
142
+ 'productNodes' , {}).get ('products' , [])
143
+ self .prod = next (
144
+ (node ['name' ] for node in nodes if node ['id' ] == self .prod_id ), None )
140
145
return self .prod and self .prod_id
141
146
142
147
def resolve_env (self ):
@@ -150,7 +155,7 @@ def resolve_env(self):
150
155
for product in self .data .get ('data' , {}).get ('organization' , {}).get ('productNodes' , {}).get ('products' , []):
151
156
if product ['id' ] == self .prod_id :
152
157
self .env = next ((env_node ['id' ] for env_node in product .get ('environments' , [])
153
- if env_node .get ('id' ) == self .env_id ), None )
158
+ if env_node .get ('id' ) == self .env_id ), None )
154
159
return self .env and self .env_id
155
160
156
161
def resolve_ver (self ):
@@ -160,17 +165,17 @@ def resolve_ver(self):
160
165
if product ['id' ] == self .prod_id :
161
166
for env in product ['environments' ]:
162
167
if env ['id' ] == self .env_id :
163
- for ver in env ['versions' ]:
164
- if ver ['primaryComponent' ]['version' ] == self .ver :
165
- self .ver_id = ver ['id' ]
168
+ for ver in env ['versions' ]:
169
+ if ver ['primaryComponent' ]['version' ] == self .ver :
170
+ self .ver_id = ver ['id' ]
166
171
if not self .ver :
167
172
for product in self .data .get ('data' , {}).get ('organization' , {}).get ('productNodes' , {}).get ('products' , []):
168
173
if product ['id' ] == self .prod_id :
169
174
for env in product ['environments' ]:
170
175
if env ['id' ] == self .env_id :
171
- for ver in env ['versions' ]:
172
- if ver ['id' ] == self .ver_id :
173
- self .ver = ver ['primaryComponent' ]['version' ]
176
+ for ver in env ['versions' ]:
177
+ if ver ['id' ] == self .ver_id :
178
+ self .ver = ver ['primaryComponent' ]['version' ]
174
179
return self .ver and self .ver_id
175
180
176
181
def __repr__ (self ):
@@ -189,7 +194,8 @@ def prods(self):
189
194
prod_nodes = self .data ['data' ]['organization' ]['productNodes' ]['products' ]
190
195
prod_list = []
191
196
for prod in prod_nodes :
192
- versions = sum (len (env ['versions' ]) for env in prod ['environments' ])
197
+ versions = sum (len (env ['versions' ])
198
+ for env in prod ['environments' ])
193
199
prod_list .append ({
194
200
'name' : prod ['name' ],
195
201
'updatedAt' : prod ['updatedAt' ],
@@ -206,7 +212,7 @@ def versions(self):
206
212
207
213
def download (self ):
208
214
logging .debug ("Downloading SBOM for environment ID %s, sbom ID %s" ,
209
- self .env_id , self .ver_id )
215
+ self .env_id , self .ver_id )
210
216
211
217
variables = {
212
218
"envId" : self .env_id ,
@@ -220,9 +226,10 @@ def download(self):
220
226
}
221
227
222
228
response = requests .post (self .api_url ,
223
- headers = {"Authorization" : "Bearer " + self .token },
224
- json = request_data ,
225
- timeout = INTERLYNK_API_TIMEOUT )
229
+ headers = {
230
+ "Authorization" : "Bearer " + self .token },
231
+ json = request_data ,
232
+ timeout = INTERLYNK_API_TIMEOUT )
226
233
227
234
if response .status_code == 200 :
228
235
try :
@@ -246,7 +253,7 @@ def download(self):
246
253
logging .error ("Failed to parse JSON response." )
247
254
else :
248
255
logging .error ("Failed to send GraphQL request. Status code: %s" ,
249
- response .status_code )
256
+ response .status_code )
250
257
251
258
def upload (self , sbom_file ):
252
259
if os .path .isfile (sbom_file ) is False :
@@ -264,31 +271,32 @@ def upload(self, sbom_file):
264
271
logging .debug ("Uploading SBOM to product ID %s" , self .prod_id )
265
272
266
273
headers = {
267
- "Authorization" : "Bearer " + self .token
274
+ "Authorization" : "Bearer " + self .token
268
275
}
269
276
270
277
operations = json .dumps ({
271
- "query" : QUERY_SBOM_UPLOAD ,
272
- "variables" : {"doc" : None , "projectId" : self .env_id }
278
+ "query" : QUERY_SBOM_UPLOAD ,
279
+ "variables" : {"doc" : None , "projectId" : self .env_id }
273
280
})
274
281
map_data = json .dumps ({"0" : ["variables.doc" ]})
275
282
276
283
form_data = {
277
- "operations" : operations ,
278
- "map" : map_data
284
+ "operations" : operations ,
285
+ "map" : map_data
279
286
}
280
287
281
288
try :
282
289
with open (sbom_file , 'rb' ) as sbom :
283
290
files_map = {'0' : sbom }
284
291
response = requests .post (self .api_url ,
285
- headers = headers ,
286
- data = form_data ,
287
- files = files_map ,
288
- timeout = INTERLYNK_API_TIMEOUT )
292
+ headers = headers ,
293
+ data = form_data ,
294
+ files = files_map ,
295
+ timeout = INTERLYNK_API_TIMEOUT )
289
296
if response .status_code == 200 :
290
297
resp_json = response .json ()
291
- errors = resp_json .get ('errors' )
298
+ errors = resp_json .get ('data' , {}).get (
299
+ 'sbomUpload' , {}).get ('errors' )
292
300
if errors is not None and errors != '[]' :
293
301
print (f"Error uploading sbom: { errors } " )
294
302
return 1
@@ -301,4 +309,3 @@ def upload(self, sbom_file):
301
309
except FileNotFoundError as ex :
302
310
logging .error ("FileNotFoundError: %s" , ex )
303
311
return 1
304
-
0 commit comments