Skip to content

Add scheduler for scan tasks #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ classifiers = [
description = "Control system agnostic framework for building Device support in Python that will work for both EPICS and Tango"
dependencies = [
"aioserial",
"apscheduler",
"numpy",
"pydantic",
"pvi~=0.10.0",
Expand Down
75 changes: 28 additions & 47 deletions src/fastcs/backend.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import Future
from types import MethodType

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from softioc.asyncio_dispatcher import AsyncioDispatcher

from .attributes import AttrR, AttrW, Sender, Updater
Expand All @@ -21,7 +19,6 @@ def __init__(
self._controller = controller

self._initial_tasks = [controller.connect]
self._scan_tasks: list[Future] = []

asyncio.run_coroutine_threadsafe(
self._controller.initialise(), self._loop
Expand All @@ -30,6 +27,9 @@ def __init__(
self._mapping = Mapping(self._controller)
self._link_process_tasks()

self._scheduler = AsyncIOScheduler(event_loop=self._loop)
self._add_scan_tasks()

self._context = {
"dispatcher": self._dispatcher,
"controller": self._controller,
Expand All @@ -41,9 +41,14 @@ def _link_process_tasks(self):
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)

def _add_scan_tasks(self):
for single_mapping in self._mapping.get_controller_mappings():
_add_scan_method_tasks(self._scheduler, single_mapping)
_add_attribute_updater_tasks(self._scheduler, single_mapping)

def run(self):
self._run_initial_tasks()
self._start_scan_tasks()
self._scheduler.start()

self._run()

Expand All @@ -52,12 +57,6 @@ def _run_initial_tasks(self):
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
future.result()

def _start_scan_tasks(self):
scan_tasks = _get_scan_tasks(self._mapping)

for task in scan_tasks:
asyncio.run_coroutine_threadsafe(task(), self._loop)

def _run(self):
raise NotImplementedError("Specific Backend must implement _run")

Expand Down Expand Up @@ -98,36 +97,35 @@ async def callback(value):
return callback


def _get_scan_tasks(mapping: Mapping) -> list[Callable]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)

for single_mapping in mapping.get_controller_mappings():
_add_scan_method_tasks(scan_dict, single_mapping)
_add_attribute_updater_tasks(scan_dict, single_mapping)

scan_tasks = _get_periodic_scan_tasks(scan_dict)
return scan_tasks


def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
def _add_scan_method_tasks(scheduler: AsyncIOScheduler, single_mapping: SingleMapping):
for method in single_mapping.scan_methods.values():
scan_dict[method.period].append(
MethodType(method.fn, single_mapping.controller)
path = single_mapping.controller.path
scheduler.add_job(
MethodType(method.fn, single_mapping.controller),
"interval",
seconds=method.period,
name=f"{'_'.join(path)}_{method.fn.__name__}"
if path
else method.fn.__name__,
)


def _add_attribute_updater_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
scheduler: AsyncIOScheduler, single_mapping: SingleMapping
):
for attribute in single_mapping.attributes.values():
for attr_name, attribute in single_mapping.attributes.items():
match attribute:
case AttrR(updater=Updater(update_period=update_period)) as attribute:
callback = _create_updater_callback(
attribute, single_mapping.controller
)
scan_dict[update_period].append(callback)
path = single_mapping.controller.path
scheduler.add_job(
callback,
"interval",
seconds=update_period,
name=f"{'_'.join(path)}_{attr_name}" if path else attr_name,
)


def _create_updater_callback(attribute, controller):
Expand All @@ -142,20 +140,3 @@ async def callback():
raise

return callback


def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]:
periodic_scan_tasks: list[Callable] = []
for period, methods in scan_dict.items():
periodic_scan_tasks.append(_create_periodic_scan_task(period, methods))

return periodic_scan_tasks


def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable:
async def scan_task() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
await asyncio.sleep(period)

return scan_task