close button
Streaming DynamoDB Data into a Hudi Table: AWS Glue in Action
rahulkumar.png Rahul Kumar
5 min read Oct 14, 2024

Streaming DynamoDB Data into a Hudi Table: AWS Glue in Action

Let’s get started.

Deploy the resources on AWS:

  1. Clone the repository: https://github.com/kumarahul98/ddb-glue-hudi-pipeline
  2. Deploy the SAM template
    1. cd aws-sam-template
    2. sam build && sam deploy –guided
  3. Go to the AWS S3 console and locate the bucket we just created
  4. Now upload the glue script file from report aws-glue-job-script/glue-job-script.py to s3 bucket.

s3://bucket_name/scripts/glue-job-script.py

blog.drawio.png

Before we start the Glue Job, lets look at the glues script:

Config section


db_name = "ddb_streaming_glue_database"
kinesis_table_name = "ddb_streaming_glue_table"
output_table_name = "ecom_ddb_data"
record_key = "PK_SK"
precomb = "PK_SK"

s3_bucket = args["S3_BUCKET_NAME"]
s3_path_hudi = f"s3a://{s3_bucket}/{output_table_name}/"
s3_path_spark = f"s3://{s3_bucket}/spark_checkpoints/{output_table_name}/"

method = "upsert"
table_type = "MERGE_ON_READ"
window_size = "10 seconds"
starting_position_of_kinesis_iterator = "trim_horizon"

connection_options = {
"hoodie.datasource.hive_sync.database": db_name,
"hoodie.table.name": output_table_name,
"hoodie.datasource.write.storage.type": table_type,
"hoodie.datasource.write.recordkey.field": record_key,
"hoodie.datasource.write.operation": method,
...
}

Defines variables such as:

  • DynamoDB/Kinesis source database and table names.
  • S3 bucket paths for storing Hudi outputs and Spark checkpoints.
  • Hudi configuration options like,
    • hoodie.datasource.hive_sync.database: Syncs the Hudi table with the specified Hive database.
    • hoodie.table.name: Name of the Hudi table where data will be written.
    • hoodie.datasource.write.storage.type: Uses “MERGE_ON_READ” for efficient data writing with support for updates.
    • hoodie.datasource.write.recordkey.field: Specifies “PK_SK” as the unique record key.
    • hoodie.datasource.write.operation: Performs an “upsert” to update existing records or insert new ones.

Batch Processing Logic


kinesis_dynamic_frame = DynamicFrame.fromDF(
            data_frame, glueContext, "from_kinesis_data_frame"
        )

kinesis_spark_df = kinesis_dynamic_frame.toDF()
# Remove top level nesting from event
selected_fields = kinesis_spark_df.selectExpr("dynamodb.NewImage")

ecom_dynamic_frame = DynamicFrame.fromDF(selected_fields, glueContext, "dyf")

# remove Dynamodb Nesting
unnest_ddb_json_df = ecom_dynamic_frame.unnest_ddb_json()

kinesis_spark_df = unnest_ddb_json_df.toDF()

# concat pk and sk to get unique id column
df = kinesis_spark_df.withColumn(record_key, F.concat(F.col("PK"), F.col("SK")))

df.write.format("hudi").options(**connection_options).mode("append").save(
                s3_path_hudi
            )

This code processes data from a Kinesis stream with the following steps:

a. Convert Kinesis data to Spark DataFrame: The Kinesis stream data (data_frame) is converted into a Spark DataFrame using AWS Glue’s DynamicFrame to make it compatible with Spark processing.
b. Select nested DynamoDB data: It selects the NewImage field from the dynamodb event, which contains the actual data changes.
c. Unnest the JSON structure: The nested structure of the DynamoDB JSON data is flattened to remove ddb nesting and make it easier to process.
d. Create unique identifier: It concatenates the PK and SK fields to form a unique identifier for each record.
e. Finally write the data into hudi table.

Start the Pipeline:

  • Move to the dummy-data-script folder, install dependencies and run the script.
    1. cd dummy-data-script
    2. pip3 install -r requirements.txt -t .
    3. python3 ecom-dummy-data.py

Verify the data in dynamo DB tables

blog.drawio.png
  • Go to the AWS glue console and start the glue Job.
blog.drawio.png

Note: we can also monitor logs under output logs section.

  • Now give it a couple of minutes and we should start seeing data in the S3 bucket.
blog.drawio.png

Verify Data:

Now let’s move to the Athena query editor, you should see two table there
- *_rt: real time - *_ro: read optimized

The selection between RT and RO tables depends on your specific use case and requirements. Run the following query to verify data:

SELECT * FROM "ddb_streaming_glue_database"."ecom_ddb_data_rt";

blog.drawio.png

Conclusion

We built a serverless streaming pipeline using AWS services, including DynamoDB, Kinesis, and AWS Glue. By leveraging a serverless architecture, we eliminated the need for managing infrastructure, allowing us to focus on the core functionality of our data processing workflows.

Using Kinesis for real-time data capture and AWS Glue for seamless ETL operations, we created a scalable and efficient pipeline that automatically adjusts to varying data loads. This approach not only enhances our agility in responding to changing data requirements but also significantly reduces operational overhead.

However, it’s important to note that this pipeline is not production-ready. We can take this further by splitting different DynamoDB entities into separate tables, optimizing the processing and storage of data. This blog serves as a foundation for your journey into stream processing with AWS Glue. For just a couple of hundred dollars a month, you can establish a live processing pipeline that is serverless and reduces operational overhead.

The following resources were referenced:

  1. Streaming Ingestion from MongoDB Atlas into Hudi Github
  2. AWS Glue streaming ETL jobs using AWS Blog
Application Modernization Icon

Innovate faster, and go farther with serverless data engineering and analytics services. Explore limitless possibilities with AntStack's Data engineering and modernization solutions. Empowering your business to achieve your most audacious goals. Build better.

Talk to us

Author(s)

Tags

Your Digital Journey deserves a great story.

Build one with us.

Recommended Blogs

Cookies Icon

These cookies are used to collect information about how you interact with this website and allow us to remember you. We use this information to improve and customize your browsing experience, as well as for analytics.

If you decline, your information won’t be tracked when you visit this website. A single cookie will be used in your browser to remember your preference.