diff --git a/CHANGES.md b/CHANGES.md index ec881c9..f743e51 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,3 +23,5 @@ https://supertask.readthedocs.io/ - Refactored data model and CLI - Removed HTTP service +- Added the ability to use Python files including inline-defined task metadata, e.g. + `supertask run my_task.py` diff --git a/examples/contrib/cratedb_cleanup.py b/examples/contrib/cratedb_cleanup.py new file mode 100644 index 0000000..7acfafd --- /dev/null +++ b/examples/contrib/cratedb_cleanup.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +import typing as t + +# /// script +# requires-python = ">=3.9" +# dependencies = [ +# "cratedb-toolkit", +# "sqlalchemy-cratedb", +# "tqdm", +# ] +# /// +# /// task +# cron = "*/5 * * * * * *" +# [env] +# DATABASE_URL = "crate://crate@localhost:4200/" +# [options] +# schemas = ["foo", "bar"] +# table_prefixes = ["tmp_", "temp_"] +# /// +import sqlalchemy as sa +from cratedb_toolkit.model import TableAddress + +logger = logging.getLogger(__name__) + + +class DatabaseCleanupTask: + """ + A task definition to clean up temporary tables in a database. + """ + + def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None): + self.schemas = schemas + self.table_prefixes = table_prefixes + database_url = os.getenv("DATABASE_URL") + if database_url is None: + raise ValueError("Database URL environment variable is not set: DATABASE_URL") + self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True) + + def run(self) -> None: + """ + Inquire relevant table addresses and clean up temporary tables. + """ + with self.engine.connect() as conn: + for table in self.table_addresses: + sql = f"DROP TABLE IF EXISTS {table.fullname}" + logger.info(f"Dropping table {table.fullname}: {sql}") + conn.execute(sa.text(sql)) + + @property + def table_addresses(self) -> t.List[TableAddress]: + """ + Table addresses selected by filter. + + TODO: Elaborate with `include` vs. `exclude` selectors? + TODO: Q: How to make the current prefix match (`table_prefixes`) more advanced? + A: Just use regexes, or provide other wildcard schemes? + TODO: Possibly refactor to stdlib or CrateDB Toolkit. + """ + inspector = sa.inspect(self.engine) + bucket: t.List[TableAddress] = [] + for schema in inspector.get_schema_names(): + if schema in self.schemas: + tables = inspector.get_table_names(schema=schema) + for table in tables: + for prefix in self.table_prefixes: + if table.startswith(prefix): + bucket.append(TableAddress(schema=schema, table=table)) + return bucket + + +def run(**kwargs): + logging.basicConfig(level=logging.INFO, handlers=[sys.stderr]) + task = DatabaseCleanupTask(**kwargs) + task.run() + + +if __name__ == "__main__": + """ + crash -c "create table testdrive.tmp_foo (id int)" + export DATABASE_URL=crate://crate@localhost:4200/ + python examples/contrib/cratedb_cleanup.py + """ + run( + schemas=["testdrive"], + table_prefixes=["tmp_", "temp_"], + ) diff --git a/pyproject.toml b/pyproject.toml index 731956e..6714a54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,7 +90,7 @@ dependencies = [ "icecream<3", "jinja2<4", "markupsafe<4", - "pueblo[fileio]", + "pueblo[fileio,sfa-full]", "pydantic>=2,<3", "python-dotenv[cli]<2", "python-multipart==0.0.20", diff --git a/supertask/core.py b/supertask/core.py index c90c4c8..aceab9b 100644 --- a/supertask/core.py +++ b/supertask/core.py @@ -12,6 +12,7 @@ from apscheduler.util import ref_to_obj from halo import Halo from icecream import ic +from pueblo.sfa.core import ApplicationAddress, SingleFileApplication from supertask.model import JobStore, Settings, Task from supertask.store.cratedb import CrateDBSQLAlchemyJobStore @@ -149,6 +150,15 @@ def run(self, *args, **kwargs): func = ref_to_obj(step.run) retval = func(*step.args, **step.kwargs) logger.info(f"Result: {retval}") + elif step.uses == "python-file": + # TODO: Refactor into single-line invocation when possible. + address = ApplicationAddress.from_spec(step.run) + app = SingleFileApplication(address=address) + app.load_any() + app.import_module() + app._entrypoint = getattr(app._module, "run", None) + retval = app.run(*step.args, **step.kwargs) + logger.info(f"Result: {retval}") else: raise RuntimeError(f"Unknown step type: {step.uses}") diff --git a/supertask/model.py b/supertask/model.py index bdf11f4..c9588e4 100644 --- a/supertask/model.py +++ b/supertask/model.py @@ -4,6 +4,7 @@ import hashlib import json import logging +import os import re import socket import typing as t @@ -14,6 +15,8 @@ from pueblo.io import to_io from pydantic import BaseModel, Field +from supertask.util import read_inline_script_metadata + logger = logging.getLogger(__name__) @@ -118,7 +121,7 @@ class Step(BaseModel): uses: str run: str args: t.List[ScalarType] = Field(default_factory=list) - kwargs: t.Dict[str, ScalarType] = Field(default_factory=dict) + kwargs: t.Dict[str, t.Any] = Field(default_factory=dict) if_: bool = Field(alias="if", default=True) @@ -135,15 +138,15 @@ class Timetable(BaseModel): Manage information about a whole timetable, including multiple task definitions. """ - meta: t.Dict[str, t.Any] - tasks: t.List[Task] + meta: t.Dict[str, t.Any] = Field(default_factory=dict) + tasks: t.List[Task] = Field(default_factory=list) NAMESPACE_ATTRIBUTE: t.ClassVar = "namespace" SOURCE_ATTRIBUTE: t.ClassVar = "taskfile" def model_post_init(self, __context: t.Any) -> None: """ - Adjust model after initialization. + Adjust the model after initialization. """ # If the timetable file or resource does not provide a namespace identifier, provide an ephemeral one. if self.NAMESPACE_ATTRIBUTE not in self.meta or not self.meta[self.NAMESPACE_ATTRIBUTE]: @@ -174,7 +177,7 @@ def digest(value): @classmethod def load(cls, taskfile: str): """ - Load task definitions from file or resource. + Load task definitions from a file or resource. """ logger.info(f"Loading task(s) from file. Source: {taskfile}") @@ -184,12 +187,37 @@ def load(cls, taskfile: str): elif taskfile.endswith(".yaml") or taskfile.endswith(".yml"): # Use YAML 1.2 compliant loading, otherwise "on" will be translated to `True`, for example. data = yaml.load(f, Loader=yamlcore.CoreLoader) # noqa: S506 + elif taskfile.endswith(".py"): + return cls.from_python(taskfile) else: raise NotImplementedError(f"Task or timetable file type not supported: {taskfile}") - data.setdefault("meta", {}) data["meta"][cls.SOURCE_ATTRIBUTE] = taskfile return cls(**data) + @classmethod + def from_python(cls, pythonfile: str): + tt = cls() + pythonfile_path = Path(pythonfile) + tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile + task_data = read_inline_script_metadata("task", pythonfile_path.read_text()) + os.environ.update(task_data.get("env", {})) + tt.tasks.append( + Task( + meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True), + on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]), + steps=[ + Step( + name=pythonfile_path.stem, + uses="python-file", + run=f"{pythonfile_path}:run", + args=[], + kwargs=task_data.get("options", {}), + ), + ], + ) + ) + return tt + class CronJob(BaseModel): """ diff --git a/supertask/util.py b/supertask/util.py index 3d82835..072248e 100644 --- a/supertask/util.py +++ b/supertask/util.py @@ -1,7 +1,10 @@ import logging +import re +import typing as t import colorlog from colorlog.escape_codes import escape_codes +from pueblo.sfa.pep723 import PEP_723_REGEX def setup_logging(level=logging.INFO, debug: bool = False, width: int = 30): @@ -20,3 +23,28 @@ def setup_logging(level=logging.INFO, debug: bool = False, width: int = 30): logging.getLogger("crate.client").setLevel(level) logging.getLogger("sqlalchemy_cratedb").setLevel(level) logging.getLogger("urllib3.connectionpool").setLevel(level) + + +def read_inline_script_metadata(type_: str, script: str) -> t.Dict[str, t.Any]: + """ + Reference implementation to read inline script metadata (PEP 723). + + https://packaging.python.org/en/latest/specifications/inline-script-metadata/ + https://peps.python.org/pep-0723/ + + TODO: Synchronize with `pueblo.sfa.pep723`. + """ + + name = type_ or "script" + matches = list(filter(lambda m: m.group("type") == name, re.finditer(PEP_723_REGEX, script))) + if len(matches) > 1: + raise ValueError(f"Multiple {name} blocks found") + if len(matches) == 1: + import tomli + + content = "".join( + line[2:] if line.startswith("# ") else line[1:] + for line in matches[0].group("content").splitlines(keepends=True) + ) + return tomli.loads(content) + return {}