A complete sample project demonstrating how to run an Apache Flink job that:
- Consumes JSON events from a Kinesis stream emulated by LocalStack
- Deduplicates events by
id
andevent_timestamp
- Upserts the deduplicated events into a DynamoDB Local table
- Architecture Overview
- Prerequisites
- Repository Layout
- .gitignore
- Building the Flink Job
- Running with Docker Compose
- Sending Test Data
- Inspecting DynamoDB Local
- Flink Job Pipeline
- Configuration Details
flowchart LR
subgraph Local Dev Environment
A["send_to_kinesis.py (boto3)"] -->|put_record| B["Kinesis Stream (LocalStack)"]
B -->|FlinkKinesisConsumer| C["Flink Job (JobManager & TaskManager)"]
C -->|"DynamoDbClient.putItem"| D["DynamoDB Table (Local)"]
E["init-dynamodb (AWS CLI)"] --> D
end
-
Docker & Docker Compose (v3.8+)
-
Java 11 JDK
-
Maven 3.x
-
Python 3.x + boto3
pip install -r requirements.txt
.
├── .gitignore
├── Dockerfile.flink-awscli # Flink + AWS CLI
├── docker-compose.yml
├── pom.xml # Maven project
├── requirements.txt # Python deps
├── send_to_kinesis.py # Producer script
└── src/main/java/org/example
└── KinesisToDynamoDBJob.java
└── job # Generated at runtime
├── my-job.jar
└── .built.ok
Locally, or via the builder
service:
mvn clean package
cp target/flink-job-1.0-jar-with-dependencies.jar job/my-job.jar
touch job/.built.ok
podman compose up --build
This will:
- Build the Flink job jar
- Start LocalStack (Kinesis)
- Initialize
MyStream
- Start DynamoDB Local
- Create DynamoDB table
DeduplicatedEvents
- Launch Flink JobManager & TaskManager
- Submit the Flink job
Save and run send_to_kinesis.py
:
#!/usr/bin/env python3
import boto3, json, time, uuid
def main():
client = boto3.client(
"kinesis",
region_name="us-east-1",
aws_access_key_id="fakeMyKeyId",
aws_secret_access_key="fakeSecretAccessKey",
endpoint_url="http://localhost:4566"
)
stream = "MyStream"
recs = [
{"id": "user1", "event_timestamp": int(time.time()), "action": "click"},
{"id": "user2", "event_timestamp": int(time.time()), "action": "purchase"},
{"id": str(uuid.uuid4()), "event_timestamp": int(time.time()), "action": "login"},
]
for r in recs:
resp = client.put_record(
StreamName=stream,
Data=json.dumps(r),
PartitionKey=r["id"]
)
print(f"→ {r['id']} → shard {resp['ShardId']}")
if __name__ == "__main__":
main()
python send_to_kinesis.py
aws dynamodb list-tables \
--endpoint-url http://localhost:8000 \
--region us-east-1
aws dynamodb scan \
--table-name DeduplicatedEvents \
--endpoint-url http://localhost:8000 \
--region us-east-1
You can also use the NoSQL Workbench.
flowchart TD
A[Raw JSON from Kinesis]
A --> B["JsonParser (FlatMap)"]
B --> C["KeyBy(id)"]
C --> D["DeduplicationFunction (KeyedProcessFunction)"]
D --> E["DynamoDBUpsertSink (RichSinkFunction)"]
- JsonParser – parses each JSON string into an
Event(id, ts, payload)
- KeyBy – partitions by
id
- DeduplicationFunction – forwards only events with
eventTs
> last seen - DynamoDBUpsertSink – upserts the event into DynamoDB
-
Disable CBOR (avoid LocalStack timestamp parsing issues)
System.setProperty("com.amazonaws.sdk.disableCbor", "true"); System.setProperty("aws.cborEnabled", "false");
-
Initial Position — use ISO-8601 timestamp for
STREAM_INITIAL_TIMESTAMP
kinesisProps.setProperty( ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, java.time.Instant.now().toString() );
-
Environment Variables (in
docker-compose.yml
):AWS_ACCESS_KEY_ID: fakeMyKeyId AWS_SECRET_ACCESS_KEY: fakeSecretAccessKey AWS_REGION: us-east-1 KINESIS_ENDPOINT: http://localstack:4566 DYNAMODB_ENDPOINT: http://dynamodb-local:8000 JAVA_TOOL_OPTIONS: "-Dcom.amazonaws.sdk.disableCbor=true" AWS_CBOR_DISABLE: "true"