This project provides an example of how to use the StreamSets Platform SDK to start Job Template instances based on runtime parameters passed in an API request, with additional parameter values retrieved from a configuration database. After Jobs complete, Job metrics are captured and written to the database.
A REST API service wrapper is provided for integration flexibility. In this version, Jobs are assumed to be batch Jobs and metrics are gathered after the Jobs complete. One could enhance this project to capture metrics for streaming Jobs as well.
Here is an overview of the process:
-
There are many scenarios where a StreamSets Job could be launched using this Job Template Service. For example:
-
An Analyst could click a button in a custom UI (not provided here) that makes REST API calls to the Job-Template-Service to launch a Job.
-
A GitHub Action or other CI/CD automation systems could start a Job using REST API calls to the Job-Template-Service, or if Python bindings are available, could directly call the Python Job Template Runner script.
-
-
A subset of the Job's runtime parameters will be passed in by the caller, which we refer to as the "dynamic" runtime parameters, with additional pipeline and connection parameters retrieved from a configuration store, which we refer to as "static" runtime parameters.
-
The configuration store used in this example is a Postgres database.
-
The service will dynamically select the Job Template to be launched based on rules applied to the request's
source-type
andtarget-type
values. -
The service creates and starts Job Template Instance(s) that StreamSets Control Hub schedules on engines.
-
The service spawns a new thread for each Job Template Instance, and each thread waits until its instance completes, then gathers the instance metrics and inserts the metrics into a database table.
-
A PostgreSQL database
-
Python 3.8+
-
Psycopg - PostgreSQL database adapter for Python
-
Flask - Python web application framework
-
StreamSets Platform SDK for Python v6.0.1+
-
StreamSets Platform API Credentials for a user with permissions to start Jobs
-
The REST API is implemented using Flask in the file job_template_service.py
-
Assume we want to create and run an instance of a Job Template named
REST API to Google Cloud Storage
that expects this set of runtime parameters:In this case, we expect the user to pass in values for the
HTTP_URL
,CITY
,GCS_BUCKET
, andGCS_FOLDER
parameters; with the static parametersHTTP_METHOD
,HTTP_MODE
, andGCS_CONNECTION
retrieved from the configuration database. -
The Job Template Service API would be called with a POST request to the endpoint
/streamsets/job-template-runner
with a JSON payload that passes in the following args:user-id user-run-id source-type target-type runtime-parameters
An example call might look like this:
$ curl -X POST \
"http://sequoia.onefoursix.com:8888/streamsets/job-template-runner" \
-H "content-type: application/json" \
-d '{
"user-id": "mark@onefoursix.com",
"user-run-id": "run-123",
"source-type": "http",
"target-type": "gcs",
"runtime-parameters": [
{
"HTTP_URL": "http://api.openweathermap.org/data/2.5/weather",
"CITY": "London",
"GCS_BUCKET": "146_gcs_bucket",
"GCS_FOLDER": "UK"
}
]
}'
-
One can use any arbitrary string value for the
user-run-id
, which can be used subsequently to identify the Job metrics associated with the run. -
The Job Template to be run is selected dynamically, based on the
source-type
andtarget-type
, by the Job Template Runner script. In this example, the template is selected by looking up the source and target types in theingestion_pattern
table and then selecting the matching Job Template. See theget_job_template
method in the database_manager.py file for details. -
The REST API endpoint calls the
run_job_template
method in the file job_template_runner.py -
All interaction with the StreamSets Platform is managed by the class
StreamSetsManager
in the file streamsets_manager.py -
All interaction with the database is managed by the class
DatabaseManager
in the file database_manager.py
- Clone this project to your local machine
-
Create the PostgreSQL user, schema, and tables by executing the
sql/postgres-tables.sql
script against your PostgresSQL database. That will create these four tables in astreamsets
schema:job_template
ingestion_pattern
ingestion_pattern_job_template_relationship
job_instance
-
Create a file named
database.ini
at the root of your local project directory with the following entries, with postgres connection properties, including the user and password just created:
[postgresql]
host=localhost
port=5432
database=postgres
user=streamsets
password=streamsets
- Create a file named
streamsets.ini
at the root of your local project directory with the following entries:
[streamsets]
cred_id=<YOUR CRED ID>
cred_token=<YOUR CRED TOKEN>
- Create or select an existing parameterized pipeline and Job Template in Control Hub. For example, I'll use a Job Template named
REST API to Google Cloud Storage
- Insert a record into the
ingestion_pattern
table to map a source and target types to a pattern name. For example, the entry for myREST API to Google Cloud Storage
Job Template looks like this:
insert into streamsets.ingestion_pattern (
pattern_name,
source,
destination,
create_timestamp
)
values('http-to-gcs', 'http','gcs', CURRENT_TIMESTAMP);
- Insert a record into the
job_template
table that includes the StreamSets Job Template ID, as well as any static parameters and connections for the source and destination. For example, for theREST API to Google Cloud Storage
Job Template I will insert this record:
insert into streamsets.job_template(
sch_job_template_id,
delete_after_completion,
source_runtime_parameters,
destination_runtime_parameters,
source_connection_info,
destination_connection_info,
create_timestamp
) values (
'c09f728a-2a73-4c7e-b735-2512039a9e6b:8030c2e9-1a39-11ec-a5fe-97c8d4369386',
false,
'{"HTTP_MODE": "POLLING", "HTTP_METHOD": "GET"}',
'{}',
'{}',
'{"GCS_CONNECTION" : "9c960db9-7904-47c4-bbc8-4c95dcf9c959:8030c2e9-1a39-11ec-a5fe-97c8d4369386"}',
CURRENT_TIMESTAMP
);
- Insert a record into the
ingestion_pattern_job_template_relationship
table to join the pattern to the template, like this:
insert into streamsets.ingestion_pattern_job_template_relationship (
ingestion_pattern_id,
job_template_id,
schedule
) select p.ingestion_pattern_id, t.job_template_id, '{}'
from streamsets.ingestion_pattern p,
streamsets.job_template t
where p.source = 'http'
and p.destination = 'gcs'
and t.sch_job_template_id = 'c09f728a-2a73-4c7e-b735-2512039a9e6b:8030c2e9-1a39-11ec-a5fe-97c8d4369386';
-
Edit the value set in this line in the file
python/job_template_service.py
to specify where the application's log will be written to. All modules share this log so, for example, if there are permissions issues writing to the database tables, error messages should appear in this log:log_file = '/tmp/streamsets-job-template-service.log'
-
Edit the value set in this line in the file
python/streamsets_manager.py
as this value sets the maximum time the app will wait for a Job to complete before getting its metrics. Jobs that take longer to complete will be considered as having failed.max_wait_time_for_job_seconds = 4 * 60 * 60 # four hours
- The Service's port number is currently hardcoded to port
8888
in the filejob_template_service.py
; feel free to change that.
To easily test the service, change to the project's python
directory, and start the service in the foreground in its own terminal session using a command like this:
$ python job_template_service.py
You should see brief output like this:
Tail the app's log to see messages, like this:
In a new terminal session, call the service like this, referencing a user-id
, user-run-id
, source-type
, target-type
and any needed runtime parameters. For example, I'll launch two instances of the REST API to Google Cloud Storage
Job Template by including two sets of runtime parameters, like this:
$ curl -X POST \
"http://sequoia.onefoursix.com:8888/streamsets/job-template-runner" \
-H "content-type: application/json" \
-d '{
"user-id": "mark@onefoursix.com",
"user-run-id": "run-123",
"source-type": "http",
"target-type": "gcs",
"runtime-parameters": [
{
"HTTP_URL": "http://api.openweathermap.org/data/2.5/weather",
"CITY": "London",
"GCS_BUCKET": "146_gcs_bucket",
"GCS_FOLDER": "UK"
},
{
"HTTP_URL": "http://api.openweathermap.org/data/2.5/weather",
"CITY": "Phoenix",
"GCS_BUCKET": "146_gcs_bucket",
"GCS_FOLDER": "US"
}
]
}'
If all goes well, the service should return an OK
status:
You should see two Job Template Instances are running:
Look at each instance's details page in Control Hub to confirm the full set of runtime parameters used, including both the static and dynamic values. For example:
Once the instances complete, you should see their metrics in the streamsets.job_run_metrics
table. I'll force quit one of the Job instances to confirm the service distinguishes between instances that terminate normally and those that do not:
SQL> select * from streamsets.job_instance;
Note the user_id
and user_run_id
fields added to the job_instance
table which allows the user to correlate their request with the subsequently written metrics.