Skip to content

Commit cca24d8

Browse files
committed
misc improvements
1 parent 0b3a661 commit cca24d8

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

rpc/service.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class DataLoader(Enum):
7171

7272

7373
class WorkloadLoader(BaseWorkloadLoader):
74-
def __init__(self) -> None:
75-
self._workload = Workload.empty()
74+
def __init__(self, _flags) -> None:
75+
self._workload = Workload.empty(_flags)
7676

7777
def add_task_graph(self, task_graph: TaskGraph):
7878
self._workload.add_task_graph(task_graph)
@@ -249,7 +249,7 @@ async def RegisterFramework(self, request, context):
249249
name=f"WorkerPool_{parsed_uri.netloc}",
250250
_logger=self._logger,
251251
)
252-
self._workload_loader = WorkloadLoader()
252+
self._workload_loader = WorkloadLoader(FLAGS)
253253

254254
# Enable orchestrated mode
255255
FLAGS.orchestrated = True
@@ -298,6 +298,14 @@ async def DeregisterFramework(self, request, context):
298298
async def RegisterDriver(self, request, context):
299299
stime = self.__stime()
300300

301+
if not self.__worker_registered():
302+
msg = f"[{stime}] Failed to register driver (id={request.id}) because no worker has been registered yet."
303+
self._logger.error(msg)
304+
return erdos_scheduler_pb2.RegisterDriverResponse(
305+
success=False,
306+
message=msg,
307+
)
308+
301309
if request.id in self._registered_app_drivers:
302310
msg = f"[{stime}] Driver with id '{request.id}' is already registered"
303311
self._logger.error(msg)
@@ -337,6 +345,14 @@ async def DeregisterDriver(self, request, context):
337345
task_graph_name = self._registered_app_drivers[request.id]
338346
del self._registered_app_drivers[request.id]
339347

348+
# Log stats
349+
log_stats_event = Event(
350+
event_type=EventType.LOG_STATS,
351+
time=stime,
352+
)
353+
with self._lock:
354+
self._simulator._event_queue.add_event(log_stats_event)
355+
340356
msg = f"[{stime}] Successfully de-registered driver with id {request.id} for task graph {task_graph_name}"
341357
self._logger.info(msg)
342358
return erdos_scheduler_pb2.DeregisterDriverResponse(

simulator.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,7 @@ def __handle_task_cancellation(self, event: Event) -> None:
11381138
f"{event.task.timestamp},{event.task.id},{event.task.task_graph},"
11391139
f"{event.task.slowest_execution_strategy.runtime.time}"
11401140
)
1141+
self.log_stats(event.time)
11411142

11421143
# If the task already had a placement, we remove the placement from our queue.
11431144
if event.task.id in self._future_placement_events:
@@ -1255,8 +1256,12 @@ def __handle_task_finished(self, event: Event) -> None:
12551256
f"{task_graph.deadline.to(EventTime.Unit.US).time},"
12561257
f"{tardiness.to(EventTime.Unit.US).time}"
12571258
)
1259+
12581260
if task_graph.deadline < event.time:
12591261
self._missed_task_graph_deadlines += 1
1262+
1263+
self.log_stats(event.time)
1264+
12601265
self._logger.info(
12611266
"[%s] Finished the TaskGraph %s with a deadline %s at the "
12621267
"completion of the task %s with a tardiness of %s.",
@@ -1764,8 +1769,7 @@ def __handle_event(self, event: Event) -> bool:
17641769
self.__handle_scheduler_finish(event)
17651770
elif event.event_type == EventType.SIMULATOR_END:
17661771
# End of the simulator loop.
1767-
assert event.time == self._simulator_time
1768-
self.log_stats()
1772+
self.log_stats(event.time)
17691773
self._csv_logger.debug(
17701774
f"{event.time.time},SIMULATOR_END",
17711775
)
@@ -1787,7 +1791,9 @@ def __step(self, step_size: EventTime = EventTime(1, EventTime.Unit.US)) -> None
17871791
the clock (in us).
17881792
"""
17891793
if step_size < EventTime.zero():
1790-
raise ValueError(f"Simulator cannot step backwards {step_size}")
1794+
raise ValueError(
1795+
f"[{self._simulator_time}] Simulator cannot step backwards {step_size}"
1796+
)
17911797

17921798
# Step the simulator for the required steps and construct TASK_FINISHED events
17931799
# for any tasks that were able to complete their execution.
@@ -2156,9 +2162,9 @@ def __log_utilization(self, sim_time: EventTime):
21562162
f"{worker_pool_resources.get_available_quantity(resource)}"
21572163
)
21582164

2159-
def log_stats(self):
2165+
def log_stats(self, sim_time: EventTime):
21602166
self._csv_logger.debug(
2161-
f"{self._simulator_time.time},LOG_STATS,{self._finished_tasks},"
2167+
f"{sim_time.time},LOG_STATS,{self._finished_tasks},"
21622168
f"{self._cancelled_tasks},{self._missed_task_deadlines},"
21632169
f"{self._finished_task_graphs},"
21642170
f"{len(self._workload.get_cancelled_task_graphs())},"

0 commit comments

Comments
 (0)