Skip to content

Commit 12e7313

Browse files
committed
DynamoDB: Add table loader for full-load operations
1 parent 31897f9 commit 12e7313

File tree

21 files changed

+892
-3
lines changed

21 files changed

+892
-3
lines changed

.github/workflows/dynamodb.yml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
---
2+
name: "Tests: DynamoDB"
3+
4+
on:
5+
pull_request:
6+
branches: ~
7+
paths:
8+
- '.github/workflows/dynamodb.yml'
9+
- 'cratedb_toolkit/io/dynamodb/**'
10+
- 'pyproject.toml'
11+
push:
12+
branches: [ main ]
13+
paths:
14+
- '.github/workflows/dynamodb.yml'
15+
- 'cratedb_toolkit/io/dynamodb/**'
16+
- 'pyproject.toml'
17+
18+
# Allow job to be triggered manually.
19+
workflow_dispatch:
20+
21+
# Run job each second night after CrateDB nightly has been published.
22+
# The reason about "why each second night", is because free capacity
23+
# for Codecov uploads is limited.
24+
schedule:
25+
- cron: '0 3 */2 * *'
26+
27+
# Cancel in-progress jobs when pushing to the same branch.
28+
concurrency:
29+
cancel-in-progress: true
30+
group: ${{ github.workflow }}-${{ github.ref }}
31+
32+
jobs:
33+
34+
tests:
35+
36+
runs-on: ${{ matrix.os }}
37+
strategy:
38+
fail-fast: false
39+
matrix:
40+
os: ["ubuntu-latest"]
41+
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
42+
# https://github.com/aio-libs/yarl/pull/942
43+
python-version: ["3.8", "3.11"]
44+
localstack-version: ["3.6"]
45+
46+
env:
47+
OS: ${{ matrix.os }}
48+
PYTHON: ${{ matrix.python-version }}
49+
LOCALSTACK_VERSION: ${{ matrix.localstack-version }}
50+
# Do not tear down Testcontainers
51+
TC_KEEPALIVE: true
52+
53+
name: "
54+
Python ${{ matrix.python-version }},
55+
LocalStack ${{ matrix.localstack-version }},
56+
OS ${{ matrix.os }}
57+
"
58+
steps:
59+
60+
- name: Acquire sources
61+
uses: actions/checkout@v4
62+
63+
- name: Set up Python
64+
uses: actions/setup-python@v5
65+
with:
66+
python-version: ${{ matrix.python-version }}
67+
architecture: x64
68+
cache: 'pip'
69+
cache-dependency-path: 'pyproject.toml'
70+
71+
- name: Set up project
72+
run: |
73+
74+
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
75+
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
76+
pip install "setuptools>=64" --upgrade
77+
78+
# Install package in editable mode.
79+
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]
80+
81+
- name: Run linter and software tests
82+
run: |
83+
pytest -m dynamodb
84+
85+
- name: Upload coverage to Codecov
86+
uses: codecov/codecov-action@v4
87+
env:
88+
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
89+
with:
90+
files: ./coverage.xml
91+
flags: dynamodb
92+
env_vars: OS,PYTHON
93+
name: codecov-umbrella
94+
fail_ci_if_error: false

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- MongoDB: Add capability to give type hints and add transformations
1818
- Dependencies: Adjust code for lorrystream version 0.0.3
1919
- Dependencies: Update to lorrystream 0.0.4 and commons-codec 0.0.7
20+
- DynamoDB: Add table loader for full-load operations
2021

2122
## 2024/07/25 v0.0.16
2223
- `ctk load table`: Added support for MongoDB Change Streams

cratedb_toolkit/api/main.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
113113
source_url = resource.url
114114
target_url = self.address.dburi
115115
source_url_obj = URL(source_url)
116-
if source_url.startswith("influxdb"):
116+
if source_url.startswith("dynamodb"):
117+
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy
118+
119+
if not dynamodb_copy(source_url, target_url, progress=True):
120+
msg = "Data loading failed"
121+
logger.error(msg)
122+
raise OperationFailed(msg)
123+
124+
elif source_url.startswith("influxdb"):
117125
from cratedb_toolkit.io.influxdb import influxdb_copy
118126

119127
http_scheme = "http://"

cratedb_toolkit/io/dynamodb/__init__.py

Whitespace-only changes.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import boto3
2+
from yarl import URL
3+
4+
5+
class DynamoDBAdapter:
6+
def __init__(self, dynamodb_url: URL, echo: bool = False):
7+
self.session = boto3.Session(
8+
aws_access_key_id=dynamodb_url.user,
9+
aws_secret_access_key=dynamodb_url.password,
10+
region_name=dynamodb_url.query.get("region"),
11+
)
12+
endpoint_url = None
13+
if dynamodb_url.host and dynamodb_url.host.lower() != "aws":
14+
endpoint_url = f"http://{dynamodb_url.host}:{dynamodb_url.port}"
15+
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
16+
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)
17+
18+
def scan(self, table_name: str):
19+
"""
20+
Return all items from DynamoDB table.
21+
"""
22+
return self.dynamodb_client.scan(TableName=table_name)
23+
24+
def count_records(self, table_name: str):
25+
table = self.dynamodb_resource.Table(table_name)
26+
return table.item_count

cratedb_toolkit/io/dynamodb/api.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import logging
2+
3+
from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
def dynamodb_copy(source_url, target_url, progress: bool = False):
9+
"""
10+
11+
Synopsis
12+
--------
13+
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
14+
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/us-east-1/ProductCatalog
15+
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog
16+
17+
ctk load table dynamodb://arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog
18+
arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo
19+
20+
ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=eu-central-1
21+
22+
Resources
23+
---------
24+
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html
25+
26+
Backlog
27+
-------
28+
Currently, it is not directly possible to address DynamoDB tables by ARN, i.e. for using a different AccountID.
29+
- https://github.com/boto/boto3/issues/2658
30+
- https://stackoverflow.com/questions/71019941/how-to-point-to-the-arn-of-a-dynamodb-table-instead-of-using-the-name-when-using
31+
- https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/configure-cross-account-access-to-amazon-dynamodb.html
32+
"""
33+
logger.info("Invoking DynamoDBFullLoad")
34+
ddb_full = DynamoDBFullLoad(
35+
dynamodb_url=source_url,
36+
cratedb_url=target_url,
37+
progress=progress,
38+
)
39+
ddb_full.start()
40+
return True
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# DynamoDB Backlog
2+
3+
## Iteration +1
4+
- Pagination / Batch Getting.
5+
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination
6+
7+
- Use `batch_get_item`.
8+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html
9+
10+
- Scan by query instead of full.
11+
12+
13+
## Iteration +2
14+
15+
### Resumption on errors?
16+
Another variant to scan the table, probably for resuming on errors?
17+
```python
18+
key = None
19+
while True:
20+
if key is None:
21+
response = table.scan()
22+
else:
23+
response = table.scan(ExclusiveStartKey=key)
24+
key = response.get("LastEvaluatedKey", None)
25+
```
26+
27+
### Item transformations?
28+
That's another item transformation idea picked up from an example program.
29+
Please advise if this is sensible in all situations, or if it's just a
30+
special case.
31+
32+
```python
33+
if 'id' in item and not isinstance(item['id'], str):
34+
item['id'] = str(item['id'])
35+
```

cratedb_toolkit/io/dynamodb/copy.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# ruff: noqa: S608
2+
import logging
3+
import typing as t
4+
5+
import sqlalchemy as sa
6+
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
7+
from tqdm import tqdm
8+
from yarl import URL
9+
10+
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
11+
from cratedb_toolkit.model import DatabaseAddress
12+
from cratedb_toolkit.util import DatabaseAdapter
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class DynamoDBFullLoad:
18+
"""
19+
Copy DynamoDB table into CrateDB table.
20+
"""
21+
22+
def __init__(
23+
self,
24+
dynamodb_url: str,
25+
cratedb_url: str,
26+
progress: bool = False,
27+
):
28+
cratedb_address = DatabaseAddress.from_string(cratedb_url)
29+
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
30+
cratedb_table = cratedb_table_address.fullname
31+
32+
self.dynamodb_url = URL(dynamodb_url)
33+
self.dynamodb_adapter = DynamoDBAdapter(self.dynamodb_url)
34+
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
35+
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
36+
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
37+
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table)
38+
39+
self.progress = progress
40+
41+
def start(self):
42+
"""
43+
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
44+
"""
45+
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
46+
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
47+
with self.cratedb_adapter.engine.connect() as connection:
48+
if not self.cratedb_adapter.table_exists(self.cratedb_table):
49+
connection.execute(sa.text(self.translator.sql_ddl))
50+
connection.commit()
51+
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
52+
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
53+
progress_bar = tqdm(total=records_in)
54+
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
55+
records_out = 0
56+
for sql in self.items_to_sql(result["Items"]):
57+
if sql:
58+
try:
59+
connection.execute(sa.text(sql))
60+
records_out += 1
61+
except sa.exc.ProgrammingError as ex:
62+
logger.warning(f"Running query failed: {ex}")
63+
progress_bar.update()
64+
progress_bar.close()
65+
connection.commit()
66+
logger.info(f"Number of records written: {records_out}")
67+
if records_out < records_in:
68+
logger.warning("No data has been copied")
69+
70+
def items_to_sql(self, items):
71+
"""
72+
Convert data for record items to INSERT statements.
73+
"""
74+
for item in items:
75+
yield self.translator.to_sql(item)
76+
77+
78+
class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB):
79+
@property
80+
def sql_ddl(self):
81+
"""`
82+
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
83+
"""
84+
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"
85+
86+
def to_sql(self, record: t.Dict[str, t.Any]) -> str:
87+
"""
88+
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
89+
"""
90+
values_clause = self.image_to_values(record)
91+
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
92+
return sql
93+
94+
@staticmethod
95+
def quote_table_name(name: str):
96+
# TODO @ Upstream: Quoting table names should be the responsibility of the caller.
97+
return name

cratedb_toolkit/io/processor/kinesis_lambda.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import logging
3535
import os
3636
import sys
37+
import typing as t
3738

3839
import sqlalchemy as sa
3940
from commons_codec.exception import UnknownOperationError
@@ -77,6 +78,7 @@
7778

7879
# TODO: Automatically create destination table.
7980
# TODO: Propagate mapping definitions and other settings.
81+
cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB]
8082
if MESSAGE_FORMAT == "dms":
8183
cdc = DMSTranslatorCrateDB(column_types=column_types)
8284
elif MESSAGE_FORMAT == "dynamodb":
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
3+
# not use this file except in compliance with the License. You may obtain
4+
# a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11+
# License for the specific language governing permissions and limitations
12+
# under the License.
13+
import os
14+
15+
from testcontainers.localstack import LocalStackContainer
16+
17+
from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer
18+
19+
20+
class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer):
21+
"""
22+
A Testcontainer for LocalStack with improved configurability.
23+
24+
It honors the `TC_KEEPALIVE` and `LOCALSTACK_VERSION` environment variables.
25+
26+
Defining `TC_KEEPALIVE` will set a signal not to shut down the container
27+
after running the test cases, in order to speed up subsequent invocations.
28+
29+
`LOCALSTACK_VERSION` will define the designated LocalStack version, which is
30+
useful when used within a test matrix. Its default value is `latest`.
31+
"""
32+
33+
LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest")
34+
35+
def __init__(
36+
self,
37+
image: str = f"localstack/localstack:{LOCALSTACK_VERSION}",
38+
**kwargs,
39+
) -> None:
40+
super().__init__(image=image, **kwargs)
41+
self.with_name("testcontainers-localstack")

0 commit comments

Comments
 (0)