Skip to content

Commit 48b3944

Browse files
author
Tomás Link
committed
Add PipelineConfig, PipelineFactory, DagFactory
1 parent 3f26f2b commit 48b3944

File tree

9 files changed

+465
-2
lines changed

9 files changed

+465
-2
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""Factories for building Apache Beam DAGs with BigQuery integration.
2+
3+
This module defines abstract base classes for DAG factories that produce
4+
Apache Beam pipelines, including support for creating BigQuery read/write
5+
clients and helpers with optional mocking capabilities.
6+
7+
Classes:
8+
DagFactory: Abstract base class providing BigQuery client factories and
9+
requiring a build_dag method.
10+
11+
LinearDagFactory: Extends DagFactory for linear pipelines composed of
12+
sources, core, optional side inputs, and sinks.
13+
"""
14+
15+
from abc import ABC, abstractmethod
16+
from functools import partial
17+
from typing import Callable, Optional, Tuple
18+
19+
from apache_beam import PTransform
20+
21+
from gfw.common.beam.pipeline.dag import Dag, LinearDag
22+
from gfw.common.beam.transforms import WriteToPartitionedBigQuery
23+
from gfw.common.bigquery.helper import BigQueryHelper
24+
from gfw.common.pipeline.config import PipelineConfig
25+
26+
27+
class DagFactory(ABC):
28+
"""Abstract base class for DAG factories producing Apache Beam pipelines.
29+
30+
Provides factory properties for BigQuery read/write clients and helpers.
31+
"""
32+
33+
def __init__(self, config: PipelineConfig) -> None:
34+
self.config = config
35+
36+
@property
37+
def write_to_bigquery_factory(self) -> Callable[..., WriteToPartitionedBigQuery]:
38+
"""Returns a factory for WriteToPartitionedBigQuery clients.
39+
40+
Uses mocked clients if configured.
41+
"""
42+
return WriteToPartitionedBigQuery.get_client_factory(mocked=self.config.mock_bq_clients)
43+
44+
@property
45+
def bigquery_helper_factory(self) -> Callable[..., BigQueryHelper]:
46+
"""Returns a factory for BigQueryHelper instances.
47+
48+
Returns:
49+
Callable that creates BigQueryHelper instances with
50+
the appropriate client factory.
51+
"""
52+
client_factory = BigQueryHelper.get_client_factory(mocked=self.config.mock_bq_clients)
53+
return partial(BigQueryHelper, client_factory=client_factory)
54+
55+
@abstractmethod
56+
def build_dag(self) -> Dag:
57+
"""Builds the DAG.
58+
59+
Must be implemented in subclasses.
60+
61+
Returns:
62+
A tuple of PTransforms representing the DAG components.
63+
"""
64+
pass
65+
66+
67+
class LinearDagFactory(DagFactory, ABC):
68+
"""Base class for linear DAG factories that assemble sources, core, side inputs, and sinks."""
69+
70+
@property
71+
@abstractmethod
72+
def sources(self) -> Tuple[PTransform, ...]:
73+
"""Returns the source PTransforms for the LinearDag.
74+
75+
Returns:
76+
Tuple of PTransforms serving as data sources.
77+
"""
78+
pass
79+
80+
@property
81+
@abstractmethod
82+
def core(self) -> PTransform:
83+
"""Returns the core PTransform that processes data in the LinearDag.
84+
85+
Returns:
86+
The core processing PTransform.
87+
"""
88+
pass
89+
90+
@property
91+
def side_inputs(self) -> Optional[PTransform]:
92+
"""Returns optional side inputs PTransform for the LinearDag.
93+
94+
Returns:
95+
A PTransform for side inputs or None if not used.
96+
"""
97+
return None
98+
99+
@property
100+
@abstractmethod
101+
def sinks(self) -> Tuple[PTransform, ...]:
102+
"""Returns the sink PTransforms for the LinearDag.
103+
104+
Returns:
105+
Tuple of PTransforms serving as data sinks.
106+
"""
107+
pass
108+
109+
def build_dag(self) -> LinearDag:
110+
"""Builds a LinearDag instance from the configured pipeline parts.
111+
112+
Returns:
113+
A LinearDag composed of sources, core, side inputs, and sinks.
114+
"""
115+
return LinearDag(
116+
sources=tuple(self.sources),
117+
core=self.core,
118+
side_inputs=self.side_inputs,
119+
sinks=tuple(self.sinks),
120+
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""Factory for constructing Beam pipelines from configuration and DAG factories.
2+
3+
This module defines the PipelineFactory class, which builds a fully configured
4+
Pipeline instance from a given PipelineConfig and DagFactory.
5+
"""
6+
7+
from gfw.common.beam.pipeline import Pipeline
8+
from gfw.common.beam.pipeline.dag.factory import DagFactory
9+
from gfw.common.pipeline.config import PipelineConfig
10+
11+
12+
class PipelineFactory:
13+
"""Builds a Beam Pipeline from a configuration object and a DAG factory.
14+
15+
Attributes:
16+
config:
17+
Configuration for the pipeline, including version and CLI arguments.
18+
19+
dag_factory:
20+
Factory that produces the pipeline's DAG.
21+
22+
name:
23+
Optional name for the pipeline.
24+
"""
25+
26+
def __init__(
27+
self,
28+
config: PipelineConfig,
29+
dag_factory: DagFactory,
30+
name: str = "",
31+
) -> None:
32+
"""Initializes the factory with config, DAG factory, and optional name.
33+
34+
Args:
35+
config:
36+
The pipeline configuration.
37+
38+
dag_factory:
39+
Factory that provides the pipeline DAG.
40+
41+
name:
42+
Optional name for the pipeline.
43+
"""
44+
self.config = config
45+
self.dag_factory = dag_factory
46+
self.name = name
47+
48+
def build_pipeline(self) -> Pipeline:
49+
"""Constructs and returns a fully configured Pipeline instance.
50+
51+
Returns:
52+
A pipeline with DAG, version, name, and CLI arguments.
53+
"""
54+
return Pipeline(
55+
name=self.name,
56+
version=self.config.version,
57+
dag=self.dag_factory.build_dag(),
58+
unparsed_args=self.config.unknown_unparsed_args,
59+
**self.config.unknown_parsed_args,
60+
)

src/gfw/common/bigquery/helper.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44

55
from dataclasses import dataclass
6-
from functools import cached_property, partial
6+
from functools import cached_property
77
from pathlib import Path
88
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
99
from unittest import mock
@@ -118,8 +118,16 @@ def mocked(cls, **kwargs: Any) -> "BigQueryHelper":
118118
@classmethod
119119
def get_client_factory(cls, mocked: bool = False) -> Callable[..., bigquery.client.Client]:
120120
"""Returns a factory for bigquery.Client objects."""
121+
122+
def mock_client_factory(*args: Any, **kwargs: Any) -> mock.Mock:
123+
# Extract project from kwargs or use default None.
124+
# Otherwise project is not set and is needed.
125+
project = kwargs.get("project", None)
126+
client = mock.create_autospec(bigquery.Client, project=project, instance=True)
127+
return client
128+
121129
if mocked:
122-
return partial(mock.create_autospec, bigquery.client.Client, instance=True)
130+
return mock_client_factory
123131

124132
return bigquery.client.Client
125133

src/gfw/common/pipeline/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""Configuration models and utilities for building data pipelines.
2+
3+
The `pipeline` package defines configuration models and utilities
4+
for building and managing data processing pipelines.
5+
6+
Modules:
7+
config: Contains classes and helpers for parsing and managing
8+
pipeline configuration, including date ranges and
9+
unknown arguments from the CLI.
10+
11+
This package is typically used by pipeline factories and runners
12+
to extract configuration parameters, validate inputs, and
13+
initialize pipeline components with consistent settings.
14+
"""

src/gfw/common/pipeline/config.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""This module defines the `PipelineConfig` class used to configure data pipeline executions.
2+
3+
It includes:
4+
- A dataclass `PipelineConfig` that stores date ranges and any unknown arguments.
5+
- Automatic parsing of date strings into `datetime.date` objects.
6+
- Utility methods to convert from a `SimpleNamespace` or to a dictionary.
7+
- A custom exception `PipelineConfigError` for handling invalid configuration inputs.
8+
9+
Intended for use in CLI-based or programmatic pipeline setups where date ranges
10+
and additional arguments need to be passed and validated.
11+
"""
12+
13+
from abc import ABC, abstractmethod
14+
from dataclasses import asdict, dataclass, field
15+
from datetime import date
16+
from functools import cached_property
17+
from types import SimpleNamespace
18+
from typing import Any
19+
20+
21+
ERROR_DATE = "Dates must be in ISO format. Got: {}."
22+
23+
24+
class PipelineConfigError(Exception):
25+
"""Custom exception for pipeline configuration errors."""
26+
27+
pass
28+
29+
30+
@dataclass
31+
class PipelineConfig(ABC):
32+
"""Configuration object for data pipeline execution.
33+
34+
Args:
35+
date_range:
36+
Tuple of start and end dates in ISO format (YYYY-MM-DD).
37+
38+
mock_bq_clients:
39+
If True, all BigQuery interactions will be mocked.
40+
41+
unknown_parsed_args:
42+
Parsed CLI or config arguments not explicitly defined in the config.
43+
44+
unknown_unparsed_args:
45+
Raw unparsed CLI arguments.
46+
"""
47+
48+
date_range: tuple[str, str]
49+
mock_bq_clients: bool = False
50+
unknown_parsed_args: dict[str, Any] = field(default_factory=dict)
51+
unknown_unparsed_args: tuple[str, ...] = ()
52+
53+
@classmethod
54+
def from_namespace(cls, ns: SimpleNamespace) -> "PipelineConfig":
55+
"""Creates a PipelineConfig instance from a SimpleNamespace.
56+
57+
Args:
58+
ns: Namespace containing attributes matching PipelineConfig fields.
59+
60+
Returns:
61+
A new PipelineConfig instance.
62+
"""
63+
return cls(**vars(ns))
64+
65+
@cached_property
66+
def parsed_date_range(self) -> tuple[date, date]:
67+
"""Returns the parsed start and end dates as `datetime.date` objects.
68+
69+
Raises:
70+
PipelineConfigError: If any of the dates are not in valid ISO format.
71+
72+
Returns:
73+
A tuple containing start and end dates as `date` objects.
74+
"""
75+
try:
76+
start_str, end_str = self.date_range
77+
return (date.fromisoformat(start_str), date.fromisoformat(end_str))
78+
except ValueError as e:
79+
raise PipelineConfigError(ERROR_DATE.format(self.date_range)) from e
80+
81+
@property
82+
def start_date(self) -> date:
83+
"""Returns the start date of the configured range.
84+
85+
Returns:
86+
A `date` object representing the start of the range.
87+
"""
88+
return self.parsed_date_range[0]
89+
90+
@property
91+
def end_date(self) -> date:
92+
"""Returns the end date of the configured range.
93+
94+
Returns:
95+
A `date` object representing the end of the range.
96+
"""
97+
return self.parsed_date_range[1]
98+
99+
@abstractmethod
100+
@property
101+
def version(self) -> str:
102+
"""Returns the version of the pipeline."""
103+
104+
def to_dict(self) -> dict[str, Any]:
105+
"""Converts the PipelineConfig to a dictionary.
106+
107+
Returns:
108+
A dictionary representation of the configuration.
109+
"""
110+
return asdict(self)

tests/beam/pipeline/dag/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)