@@ -175,7 +175,7 @@ def test_service():
175
175
actual_task_ids .add (placement .task_id )
176
176
assert actual_task_ids == {2 }
177
177
178
- # Register the second (correct) TaskGraph, wont be able to run due to inadequate resources
178
+ # Attempt to register the second TaskGraph, wont be able to run due to inadequate resources
179
179
request = erdos_scheduler_pb2 .RegisterTaskGraphRequest (
180
180
id = "task-graph-1" ,
181
181
name = "TPCH Query 4 50 200" ,
@@ -189,10 +189,33 @@ def test_service():
189
189
],
190
190
)
191
191
response = stub .RegisterTaskGraph (request )
192
+ assert (
193
+ not response .success
194
+ and re .search (
195
+ r"The worker Pool cannot accomodate the task graph 'task-graph-1'" ,
196
+ response .message ,
197
+ )
198
+ and response .num_executors == 0
199
+ )
200
+
201
+ # Register the third TaskGraph, will run but will get cancelled due to deadline miss
202
+ request = erdos_scheduler_pb2 .RegisterTaskGraphRequest (
203
+ id = "task-graph-2" ,
204
+ name = "TPCH Query 4 50 50" ,
205
+ timestamp = 1234567890 ,
206
+ dependencies = [
207
+ {"key" : {"id" : 0 , "name" : "stage 0" }, "children_ids" : [2 ]},
208
+ {"key" : {"id" : 1 , "name" : "stage 1" }, "children_ids" : [2 ]},
209
+ {"key" : {"id" : 2 , "name" : "stage 2" }, "children_ids" : [3 ]},
210
+ {"key" : {"id" : 3 , "name" : "stage 3" }, "children_ids" : [4 ]},
211
+ {"key" : {"id" : 4 , "name" : "stage 4" }, "children_ids" : []},
212
+ ],
213
+ )
214
+ response = stub .RegisterTaskGraph (request )
192
215
assert (
193
216
response .success
194
217
and re .search (
195
- r"Registered task graph 'task-graph-1 ' successfully" ,
218
+ r"Registered task graph 'task-graph-2 ' successfully" ,
196
219
response .message ,
197
220
)
198
221
and response .num_executors == 10
@@ -203,33 +226,33 @@ def test_service():
203
226
204
227
# Mark the environment as ready
205
228
request = erdos_scheduler_pb2 .RegisterEnvironmentReadyRequest (
206
- id = "task-graph-1 " ,
229
+ id = "task-graph-2 " ,
207
230
num_executors = 10 ,
208
231
timestamp = 1234567890 ,
209
232
)
210
233
response = stub .RegisterEnvironmentReady (request )
211
234
assert response .success and re .search (
212
- r"Successfully marked environment as ready for task graph 'Q4\[task-graph-1 \]@1'" ,
235
+ r"Successfully marked environment as ready for task graph 'Q4\[task-graph-2 \]@1'" ,
213
236
response .message ,
214
237
)
215
238
216
239
# Wait for 10s to get the placements for the second task graph
217
240
time .sleep (10 )
218
241
219
- # Get placements for the task, none should be placed since worker has inadequate resources
242
+ # Get placements for the taskgraph 3, one of first two root vertices should be placed since there are resources
220
243
request = erdos_scheduler_pb2 .GetPlacementsRequest (
221
244
timestamp = 1234567890 ,
222
- id = "task-graph-1 " ,
245
+ id = "task-graph-2 " ,
223
246
)
224
247
response = stub .GetPlacements (request )
225
248
assert response .success
226
249
actual_task_ids = set ()
227
250
for placement in response .placements :
228
251
assert (
229
- placement .worker_id == "1234" and placement .application_id == "task-graph-1 "
252
+ placement .worker_id == "1234" and placement .application_id == "task-graph-2 "
230
253
)
231
254
actual_task_ids .add (placement .task_id )
232
- assert len ( actual_task_ids ) == 0
255
+ assert actual_task_ids == { 1 }
233
256
234
257
# Wait for 100 more seconds and request placements again
235
258
time .sleep (100 )
@@ -244,21 +267,33 @@ def test_service():
244
267
# Wait for 2 seconds to allow scheduler to process task completion and run scheduler
245
268
time .sleep (2 )
246
269
247
- # Get placements for the task, entire taskgraph should be cancelled
270
+ # Get placements for the task, entire taskgraph should be cancelled since deadline missed
271
+ # Other root vertex (0) will be cancelled first. Then the subsequent vertices.
272
+ # NOTE: The simulator will return all current placements for a taskgraph (including
273
+ # those already sent by the service) until the task is marked as finished. Spark will ignore it.
274
+ # In this scenario of task-graph-2, placements has two values- Task 0 in cancelled state and
275
+ # Task 1 in running state. The service will return both of them.
248
276
request = erdos_scheduler_pb2 .GetPlacementsRequest (
249
277
timestamp = 1234567890 ,
250
- id = "task-graph-1 " ,
278
+ id = "task-graph-2 " ,
251
279
)
252
280
response = stub .GetPlacements (request )
253
281
print (response )
254
282
assert response .success
255
283
actual_task_ids = set ()
256
284
for placement in response .placements :
257
- assert (
258
- placement .worker_id == "None"
259
- and placement .application_id == "task-graph-1"
260
- and placement .cancelled == True
261
- )
285
+ if placement .task_id == 0 :
286
+ assert (
287
+ placement .worker_id == "None"
288
+ and placement .application_id == "task-graph-2"
289
+ and placement .cancelled == True
290
+ )
291
+ if placement .task_id == 1 :
292
+ assert (
293
+ placement .worker_id == "1234"
294
+ and placement .application_id == "task-graph-2"
295
+ and placement .cancelled == False
296
+ )
262
297
actual_task_ids .add (placement .task_id )
263
298
assert actual_task_ids == {0 , 1 }
264
299
0 commit comments