How Data Engineering Shapes Modern Healthcare Research
Let’s get started.
Deploy the resources on AWS:
- Clone the repository: https://github.com/kumarahul98/ddb-glue-hudi-pipeline
- Deploy the SAM template
- cd aws-sam-template
- sam build && sam deploy –guided
- Go to the AWS S3 console and locate the bucket we just created
- Now upload the glue script file from report
aws-glue-job-script/glue-job-script.py
to s3 bucket.
s3://bucket_name/scripts/glue-job-script.py

Before we start the Glue Job, lets look at the glues script:
Config section
db_name = "ddb_streaming_glue_database"
kinesis_table_name = "ddb_streaming_glue_table"
output_table_name = "ecom_ddb_data"
record_key = "PK_SK"
precomb = "PK_SK"
s3_bucket = args["S3_BUCKET_NAME"]
s3_path_hudi = f"s3a://{s3_bucket}/{output_table_name}/"
s3_path_spark = f"s3://{s3_bucket}/spark_checkpoints/{output_table_name}/"
method = "upsert"
table_type = "MERGE_ON_READ"
window_size = "10 seconds"
starting_position_of_kinesis_iterator = "trim_horizon"
connection_options = {
"hoodie.datasource.hive_sync.database": db_name,
"hoodie.table.name": output_table_name,
"hoodie.datasource.write.storage.type": table_type,
"hoodie.datasource.write.recordkey.field": record_key,
"hoodie.datasource.write.operation": method,
...
}
Defines variables such as:
- DynamoDB/Kinesis source database and table names.
- S3 bucket paths for storing Hudi outputs and Spark checkpoints.
- Hudi configuration options like,
- hoodie.datasource.hive_sync.database: Syncs the Hudi table with the specified Hive database.
- hoodie.table.name: Name of the Hudi table where data will be written.
- hoodie.datasource.write.storage.type: Uses “MERGE_ON_READ” for efficient data writing with support for updates.
- hoodie.datasource.write.recordkey.field: Specifies “PK_SK” as the unique record key.
- hoodie.datasource.write.operation: Performs an “upsert” to update existing records or insert new ones.
Batch Processing Logic
kinesis_dynamic_frame = DynamicFrame.fromDF(
data_frame, glueContext, "from_kinesis_data_frame"
)
kinesis_spark_df = kinesis_dynamic_frame.toDF()
# Remove top level nesting from event
selected_fields = kinesis_spark_df.selectExpr("dynamodb.NewImage")
ecom_dynamic_frame = DynamicFrame.fromDF(selected_fields, glueContext, "dyf")
# remove Dynamodb Nesting
unnest_ddb_json_df = ecom_dynamic_frame.unnest_ddb_json()
kinesis_spark_df = unnest_ddb_json_df.toDF()
# concat pk and sk to get unique id column
df = kinesis_spark_df.withColumn(record_key, F.concat(F.col("PK"), F.col("SK")))
df.write.format("hudi").options(**connection_options).mode("append").save(
s3_path_hudi
)
This code processes data from a Kinesis stream with the following steps:
a. Convert Kinesis data to Spark DataFrame: The Kinesis stream data (data_frame) is converted into a Spark DataFrame using AWS Glue’s DynamicFrame to make it compatible with Spark processing.
b. Select nested DynamoDB data: It selects the NewImage field from the dynamodb event, which contains the actual data changes.
c. Unnest the JSON structure: The nested structure of the DynamoDB JSON data is flattened to remove ddb nesting and make it easier to process.
d. Create unique identifier: It concatenates the PK and SK fields to form a unique identifier for each record.
e. Finally write the data into hudi table.
Start the Pipeline:
- Move to the
dummy-data-script
folder, install dependencies and run the script.- cd dummy-data-script
- pip3 install -r requirements.txt -t .
- python3 ecom-dummy-data.py
Verify the data in dynamo DB tables

- Go to the AWS glue console and start the glue Job.

Note: we can also monitor logs under output logs
section.
- Now give it a couple of minutes and we should start seeing data in the S3 bucket.

Verify Data:
Now let’s move to the Athena query editor, you should see two table there
- *_rt: real time
- *_ro: read optimized
The selection between RT and RO tables depends on your specific use case and requirements. Run the following query to verify data:
SELECT * FROM "ddb_streaming_glue_database"."ecom_ddb_data_rt";

Conclusion
We built a serverless streaming pipeline using AWS services, including DynamoDB, Kinesis, and AWS Glue. By leveraging a serverless architecture, we eliminated the need for managing infrastructure, allowing us to focus on the core functionality of our data processing workflows.
Using Kinesis for real-time data capture and AWS Glue for seamless ETL operations, we created a scalable and efficient pipeline that automatically adjusts to varying data loads. This approach not only enhances our agility in responding to changing data requirements but also significantly reduces operational overhead.
However, it’s important to note that this pipeline is not production-ready. We can take this further by splitting different DynamoDB entities into separate tables, optimizing the processing and storage of data. This blog serves as a foundation for your journey into stream processing with AWS Glue. For just a couple of hundred dollars a month, you can establish a live processing pipeline that is serverless and reduces operational overhead.
The following resources were referenced: