Skip to content

Commit da6f11c

Browse files
author
Tomás Link
committed
Add ReadFromJson and WriteToJson
1 parent 33ad297 commit da6f11c

File tree

4 files changed

+215
-0
lines changed

4 files changed

+215
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Module with reusable source PTransform for reading JSON inputs."""
2+
3+
from pathlib import Path
4+
from typing import Any, Callable, Optional, Union
5+
6+
import apache_beam as beam
7+
8+
from apache_beam.pvalue import PCollection
9+
10+
from gfw.common.io import json_load
11+
12+
13+
class ReadFromJson(beam.PTransform):
14+
"""Beam transform to read a PCollection from a JSON file.
15+
16+
This transform loads a local JSON or JSONLines file eagerly (outside the pipeline),
17+
then injects the resulting records into the pipeline using `beam.Create`.
18+
19+
Useful for testing, prototyping, or controlled ingestion.
20+
21+
Args:
22+
input_file:
23+
Path to the local file to read.
24+
25+
coder:
26+
Callable to apply to each decoded record. Defaults to `dict`.
27+
28+
lines:
29+
If True, interprets the input as newline-delimited JSON (JSONLines).
30+
31+
create_kwargs:
32+
Optional dictionary of keyword arguments to pass to `beam.Create`.
33+
Use this to control serialization, type hints, etc.
34+
35+
**kwargs:
36+
Additional keyword arguments passed to base PTransform class.
37+
38+
Raises:
39+
ValueError:
40+
If the input file does not exist at pipeline construction time.
41+
42+
Example:
43+
>>> with beam.Pipeline() as p:
44+
... pcoll = p | ReadFromJson("data/input.json", lines=True)
45+
... pcoll | beam.Map(print)
46+
"""
47+
48+
def __init__(
49+
self,
50+
input_file: Union[str, Path],
51+
coder: Callable = dict,
52+
lines: bool = False,
53+
create_kwargs: Optional[dict] = None,
54+
**kwargs: Any,
55+
) -> None:
56+
"""Builds a ReadFromJson instance."""
57+
super().__init__(**kwargs)
58+
self._input_file = Path(input_file)
59+
self._coder = coder
60+
self._lines = lines
61+
self._create_kwargs = create_kwargs or {}
62+
63+
def expand(self, p: PCollection) -> PCollection:
64+
"""Apply transform to pipeline 'p': create PCollection from loaded JSON data."""
65+
# Why not use beam.io.ReadFromJson instead of (beam.Create + json_load)?
66+
# Because ReadFromJson returns BeamSchema objects, not plain dicts,
67+
# and requires conversion like: dict(x._asdict()).
68+
# inputs = (
69+
# p
70+
# | beam.io.ReadFromJson(str(input_file), lines=False, convert_dates=False)
71+
# | beam.Map(lambda x: dict(x._asdict())).with_output_types(Message)
72+
# )
73+
# In our case, json_load + beam.Create gives us full control over parsing
74+
# and works better for small, local test/config files where eager loading is acceptable.
75+
76+
if not self._input_file.exists():
77+
raise ValueError(f"Input file does not exist: {self._input_file}")
78+
79+
data = json_load(self._input_file, lines=self._lines, coder=self._coder)
80+
return p | beam.Create(data, **self._create_kwargs).with_output_types(self._coder)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Module with reusable PTransforms for writing output PCollections."""
2+
3+
import json
4+
5+
from datetime import datetime
6+
from pathlib import Path
7+
from typing import Any
8+
9+
import apache_beam as beam
10+
11+
from apache_beam.pvalue import PCollection
12+
13+
14+
class WriteToJson(beam.PTransform):
15+
"""Writes PCollection as JSON.
16+
17+
Args:
18+
output_dir:
19+
Output directory.
20+
21+
output_prefix:
22+
Prefix to use in filename/s.
23+
24+
**kwargs:
25+
Additional keyword arguments passed to base PTransform class.
26+
"""
27+
28+
WORKDIR_DEFAULT = "workdir"
29+
30+
def __init__(
31+
self, output_dir: str = WORKDIR_DEFAULT, output_prefix: str = "", **kwargs: Any
32+
) -> None:
33+
super().__init__(**kwargs)
34+
self._output_dir = Path(output_dir)
35+
36+
time = datetime.now().isoformat(timespec="seconds").replace("-", "").replace(":", "")
37+
self._output_prefix = f"beam-{output_prefix}-{time}"
38+
39+
self._prefix = self._output_dir.joinpath(self._output_prefix).as_posix()
40+
self._shard_name_template = ""
41+
self._suffix = ".json"
42+
43+
# This is what beam.io.WriteToText does to construct the path.
44+
self.path = Path("".join([self._prefix, self._shard_name_template, self._suffix]))
45+
46+
def expand(self, pcoll: PCollection) -> PCollection:
47+
"""Writes the input PCollection to a JSON file."""
48+
return pcoll | "WriteToJson" >> (
49+
beam.Map(json.dumps)
50+
| beam.io.WriteToText(
51+
self._prefix,
52+
shard_name_template=self._shard_name_template,
53+
file_name_suffix=self._suffix,
54+
)
55+
)
56+
"""
57+
Why not use beam.io.WriteToJson?
58+
`WriteToJson` has issues writing to local files.
59+
WriteToJson raises a ValueError when the path does not point to a GCS location.
60+
It works when used together with `ReadFromBigQuery` and a GCS location is specified there.
61+
This makes it unreliable for local development or testing.
62+
63+
Additionally, it internally relies on `pandas.DataFrame.to_json`, which introduces extra
64+
dependencies and may not preserve the original structure of dict-like records.
65+
https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToJson
66+
67+
Example usage:
68+
from apache_beam.io.fileio import default_file_naming
69+
70+
file_naming = default_file_naming(prefix=self._output_prefix, suffix=".json")
71+
return pcoll | beam.io.WriteToJson(
72+
self._output_dir.as_posix(),
73+
file_naming=file_naming,
74+
lines=True,
75+
indent=4,
76+
)
77+
78+
For these reasons, we use `WriteToText` + `json.dumps`, which is lightweight, predictable,
79+
and preserves control over formatting and encoding.
80+
https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
81+
"""
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from pathlib import Path
2+
3+
import pytest
4+
from apache_beam.testing.test_pipeline import TestPipeline
5+
from apache_beam.testing.util import assert_that, equal_to
6+
7+
from gfw.common.beam.transforms.read_from_json import ReadFromJson
8+
from gfw.common.io import json_save
9+
10+
11+
@pytest.mark.parametrize(
12+
"lines",
13+
[True, False],
14+
ids=["json-lines-true", "json-lines-false"]
15+
)
16+
def test_read_from_json_variants(tmp_path, lines):
17+
path = tmp_path / ("data.jsonl" if lines else "data.json")
18+
input_data = [{"x": 1}, {"x": 2}]
19+
json_save(path, input_data, lines=lines)
20+
21+
with TestPipeline() as p:
22+
output = p | ReadFromJson(path, lines=lines)
23+
assert_that(output, equal_to(input_data))
24+
25+
26+
def test_raises_if_file_does_not_exist():
27+
path = Path("/this/does/not/exist.json")
28+
29+
with pytest.raises(ValueError, match="Input file does not exist"):
30+
with TestPipeline() as p:
31+
_ = p | ReadFromJson(path)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from pathlib import Path
2+
3+
import apache_beam as beam
4+
from apache_beam.testing.test_pipeline import TestPipeline as _TestPipeline
5+
6+
from gfw.common.beam.transforms.write_to_json import WriteToJson
7+
from gfw.common.io import json_load
8+
9+
10+
def test_write_json(tmp_path):
11+
transform = WriteToJson(output_dir=tmp_path, output_prefix="test")
12+
13+
messages = [{"x": 1}, {"x": 2}]
14+
15+
with _TestPipeline() as p:
16+
inputs = p | beam.Create(messages)
17+
inputs | transform
18+
19+
output_file = transform.path
20+
assert Path(output_file).is_file()
21+
22+
output_messages = json_load(output_file, lines=True)
23+
assert len(output_messages) == len(messages)

0 commit comments

Comments
 (0)