Building a DynamoDB to Athena data pipeline with AWS Glue and CDK

  • AWS Glue
  • Jobs
  • Triggers
  • Crawlers
  • Workflows
  • Lambda
  • EventBridge Rules
About 17 min

# Building a DynamoDB to Athena data pipeline with AWS Glue and CDK

Five months ago, we set out to build a proper data pipeline for moving our data from our DynamoDB database to Athena, so that we could query it using a BI tool called Metabase. This was quite a journey, with a number of twists and turns. Something that seems very easy on the whiteboard, took a lot of effort to actually get right.

Building out ETL (extract, transform, and load) pipelines in 2023 is still just as difficult as in the early 2000's. And in some ways it is even harder. Back then I would use Microsoft DTS (Data Transformation Services) , which was then replaced by SSIS (SQL Server Integration Services). These were powerful GUI based applications that let you setup how you wanted to migrate your data, transform it, and then load it into the final location.

Many things have changed since then, and one of the big ones, is the proliferation of IaC, or Infrastructure as Code. As, we set forth on this mission, we wanted to make sure that our data pipelines were written in the same way that our software is built, meaning it is deployed to dev, staging, and then production, and can easily be tested, and written in a very software development way.

# The Problem

In addition to running Commandeer, I am also the CTO of Tumble, which is my third laundry product that I have built over the past 8 years. Over the last year, we have seen a good amount of growth. Usually, when that starts happening the board and other execs want more reporting, and this time is no different. We have built out our system using DynamoDB, which is a strongly-typed NoSQL style database. We had setup DynamoDB Streams, that every time a record is written to a DynamoDB table, it would invoke a lambda, that would then take that record, and write it to an S3 data lake as an individual json file for every record. We would then create some Athena tables based on this data, and had a very basic SQL-queryable system. When we were small, this got the job done to be able to query transaction history, or lookup users, and their account balances.

But, as we started to grow, we started to need more analytics on our data, and more queries, and an actual BI tool. The problem with what we had built was a few things. Firstly, saving individual json files of data makes querying, and most importantly joining tables really slow. And even if we stored the data in parquet, we were storing them as individual files. Even with proper partitioning, this caused queries for transactions joined to users or accounts to sometimes take minutes.

Secondly, all our data in DynamoDB is stored in camelcase. For instance, on the User table, we have firstName and lastName. When this data would get saved to s3, and then crawled for Athena, it would become firstname and lastname. This may not seem like a big deal, but to a data engineer or business analyst, it is like scratching a chalkboard. Because Athena does not support camelcase, everything became one long word. So, we really wanted to have our data converted to snake case. In our database it would be first_name and last_name.

Lastly, every type was cast as a string in our Athena database, so as we started to make more complex queries, we were casting pennies from string to int, and cast string dates to aproper date object. This quickly became untenable, so we needed to come up with a new strategy.

# The Requirements

So we named a few of our requirements above, they were:

  • store data in bulk parquet files to make querying much, much faster
  • use camel_case for column names
  • store all our data in the correct data types, mostly string, date, and int

We then also wanted something else

  • data evolution, if we add a new column to a table in DynamoDB, we want it to be brought into our data lake fairly simply
  • have all of our code for our pipeline in GitHub, and preferably done via CDK
  • build out a medallion architecture, where we would have a bronze, silver, and then gold database.

The first 3 parts were going to be fairly easy. The last three would present some challenges. It would require that we use AWS Glue to do all the heavy lifting of a data pipeline. This was going to be a really big test if the tooling on AWS would be able to do a really hard core data pipeline. I have been in the data space for a long time, and researched and setup a bunch of different of the shelf tools including Dataiku, Databricks, an ETL tool by Tableau, and even looked at just piping the data into BigQuery. But, all of them seemed like they would require very specialized work to be done, and would not be able to just sit nicely in our repo, and be able to be run on our CI/CD pipeline, and deploy out into our dev, staging, and prod environments as simply as if we were deploying a new endpoint on API Gateway connected to a Lambda. And something about firing up EC2 boxes to solve this problem bothered me.

What I really wanted was a serverless approach to a data pipeline, and AWS seemed to have all the tools necessary to accomplish this. Now, this is not to say that a tool like Databricks is not needed, as it still is best way to run your notebooks, and for rapid prototyping. If you are a big data company, you also may want to use a tool like this, as software development is not your main focus, so building things in a dev/stg/prod way might not be important to you at all. But from what I have seen, if you do the whole pipeline within their tool, you are going to need people on your team that are experts at the product. It feels like the perfect tool for data scientists to explore data, build out complex pipelines, and much more, but we are a software company, first and foremost, and a TypeScript and Node organization at that, so just getting us to ok using some python for the jobs, was a stretch enough.

As for the Medallion setup, we could have gone with a kinesis stream or even the lambda DynamoDB stream methodology that we had in the past, to build out a full CDC (change data capture) system. In many of our tables however, this is not really needed, and I felt that just getting the data from DynamoDB, getting it all cleaned, and then queryable quickly and with correct data types from our BI tool was more important than the future needs of CDC for the business. In a follow-up article, we may discuss how certain tables in our system, due in fact benefit from CDC, and we will layout how we decided to build that portion of the system.

# The Solution

So with our problem in hand, and our requirements fairly laid out, we set out to build the perfect pipeline. Our DynamoDB is setup in a multi-table way. Because we are a laundry company, we have some tables like Facility, Machine, MachinesTransactions and normal tables like User and UserRole. As we said, what we want to do is take this data, and first dump it to S3 and create a bronze data layer. This would simply be the complete days snapshot of DynamoDB, it would not be transformed at all, though there would be two columns added to each record, an as_of_date which contains YYYY-MM-DD of the date that the data is running, and then the type of bronze. All the rest of the columns would be just as they are in Dynamo.

We would then have a process to take this bronze data, and move it to the silver layer. This would remove columns of object data, for instance. For some caching techniques, we sometimes have things like a facility object stored within the machine table, along with the facilityId. This makes certain business layer things easier on our API, but isn't necessary when moving it to a data lake, as in our BI tool, we will just conduct joins, and want to keep this system in 3NF form. So, we will remove those, and then map all columns from no spacing to camel case for better readability. Lastly, it will create the columns with the proper datatypes. So dates and numbers all around!

This silver data layer will also have the as of date, on it. Meaning if you were to query either silver or bronze, you would see the same records day after day. So if I was in the user table as '1 Bob Wall', and our data pipeline ran the last 7 days, you would see 7 records in there. This is somewhat of a CDC system in and of itself, although if my name changed 4 times on day 3, you would still only see 7 records, so it would still be missing every single change, but if you had to do things like calculate revenue per machine per day, you could use this setup to use the number of machines for the day you are calculating. So if 7 days ago you had 10 machines, and now 7 days later, you have 17, you can easily do the calculation based on the time stamp of the transaction.

The last phase is the gold layer, and this will just take the last days worth of silver and put it into gold. So now, when you query the gold machine table 7 days ago, you would have seen only 10 records, and then if you query it today, you will see 17.

The medallion architecture will work nicely for our needs, and really provides a clear cut group of steps that can run to accomplish this.

# The Implementation

To build this out, we would be using a number of services from AWS.

  • DynamoDB - this is our OLTP NoSQL database that we use to manage our backend
  • S3 - this is the location of our parquet data files that we save for bronze, silver, and gold and our outputted data lake of parquet files
  • Athena - this is the data lake database and querying tool. It allows you to make a relational database based just on a group of s3 files. And then query them like you would a normal relational database.
  • IAM Role - this is the role needed to run our Glue tasks
  • Glue Workflow - this is a workflow of steps that runs to take our data from DynamoDB and get it to our Gold layer. It contains a number of services:
    • Glue Trigger - We have two types of triggers in our setup. SCHEDULED, is the first step in our workflow that kicks off the first job in our process. And CONDITIONAL triggers connect multiple jobs and crawlers.
    • Glue Job - this is a python script that either moves data from DynamoDB to S3, or does Data Quality checks on the data, or moves data between one S3 layer and another
    • Glue Crawler - crawler's are able to be run against an S3 data source, and create Athena tables based on the data it crawls. We had attempted to do this without a crawler for our first step, but Jobs alone can not handle data evolution very easily, and with a crawler, it is literally one checkbox, or CDK config setting away from complete data evolution.

All of the above items ended up going into our CDK system. We ended up with 22 workflows for our 22 tables in DynamoDB that we wanted to have in our data lake.

The exact flow for each workflow was the same. In fact, we were able to create just 4 python files that were able to be used for all 22 workflows. Below is the exact breakdown of the workflow. I will use our User table as the example:

Glue Workflow

Glue Workflow 2

  1. 'Run Job DynamoDB User Bronze' Trigger - Scheduled to run at 3:00 AM UTC. This is the initial trigger that kicks off a workflow. We have them all kick off at the same time, but if you have a really large set of data, you may want to stagger them.
    1. This executes our 'DynamoDB User Bronze' job. This job runs a script that grabs all the data from our User table in DynamoDB and puts it into our tumble-dev-analytics-bronze bucket, in the /users/YYYY-MM-DD folder.
  2. 'Run Crawler DynamoDB User Bronze' trigger is fired when the previous job completes.
    1. This starts up of 'DynamoDB User Bronze' Crawler, which will crawl the above folder, and add the data and any new columns to our tumble_analytics_bronze. user table in Athena.
  3. 'Run DQ Job DynamoDB User Bronze' trigger occurs after this.
    1. It kicks off our 'DynamoDB User Bronze DQ' job. It is run to ensure that the data in our bronze user table looks correct. We have certain KPI Fields that we check for, like the id and email. It will also check for how much data has changed from the previous day, and if it doesn't hit configurable thresholds, then it will fail. All this data is then written to it's own S3 bucket, and a data_quality database is all setup on Athena, so we can easily query this. This is a crucial part of the medallion process, as if we don't have clean data at any of the stages, we would rather fail the process, and go investigate it, before marching on and shoving bad data into the next layer.
  4. 'Run Job DynamoDB User Silver' is the next trigger.
    1. It starts up our 'DynamoDB User Silver' job. This fires off our script, that is in place to do our table mappings, and data types coercion. This will then create the athena table with all this data. Because we are doing explicit mappings in this step, we do not need to use a Crawler, like in the bronze step. Because at the bronze step, we may get new columns coming in, we needed to. This is a nuance of the system, and could be done in a few ways.
  5. 'Run DQ Job DynamoDB User Silver' is then triggered.
    1. This kicks off the 'DynamoDB User Silver DQ' job. The beauty of the codebase is that, the file that is run for this step, is the exact same as the one for bronze, so not only are we using just one python file for all 3 layers, but also for each workflow. So, that is 66 separate jobs that are all using the exact same piece of code. So, this process really scales well, as adding new tables to our flow, is as simple as adding in one record in our config json file.
  6. 'Run Job DynamoDB User Gold' is then triggered upon successful completion of the previous step.
    1. This starts up our 'DynamoDB User Gold' job, which calls our file. This grabs the newest set of data in our silver lake, by the max as of date. This then writes to our gold data lake, overwriting the previous data. So now we have a totally fresh and clean set of data in gold.
  7. 'Run DQ Job DynamoDB User Gold' is then triggered.
    1. This fires off the 'DynamoDB User Gold DQ' job which again uses the and validates our gold data in the same way we validated our bronze and silver data.

As you can see from above, this is a complex setup in terms of number of moving parts. The beauty of a workflow in Glue is that you can string them all together. But to inventory it, for 1 workflow, we have:

  • 7 Triggers - 1 Scheduled to start the workflow, and then 6 triggers next steps
  • 1 Crawler - used to crawl bronze
  • 6 Jobs - 3 for setting up the bronze, silver, and gold medallion layers, and 3 to DQ (data quality) check the created layers.

So, 14 separate steps per workflow, or 308 total resources needed to execute our 22 workflows. While this could be done in the AWS console, properly maintaining it would be a nightmare. This is where the CDK, and putting it into code becomes indispensable.

# Enter the CDK

For those of you unfamiliar with the CDK, this is AWS's Cloud Development Kit. The way to deploy infrastructure on AWS has always been done with CloudFormation, using either json or yml. While this works, it is very cumbersome for building large amounts of infra. And developers have been longing for a tool, that you can write code to do it. Before CDK came along, you had many choices to deploy with. You could write vanilla CloudFormation code in json or yml, or use a third-party tool like Terraform, Serverless Framework, or Pulumi. These tools would then compile down to CloudFormation. But, they all had certain issues. Firstly, they were mostly still just yml or json. Which as a developer is way better than say XML, but still very limiting versus plain old code. In fact, there are tons of resources out there, showing unique ways that people tried to have some of their yml act like code.

But, AWS, has been making great strides in the open-source world, and has released CDK as a TypeScript coding tool, that can be used to manage all their services. It still compiles down to CloudFormation code. As an early adopter of Serverless systems, I have used all the other tools, and CDK has nailed it in so many ways. It is the only tool I will now use for deploying to AWS. It really makes things trivial, that used to be a headache, and most importantly, I know longer need to combine tools like Terraform and Serverless Framework to do things. For instance, you can NOT create long lived items like S3 and DynamoDB with Serverless Framework. It will work the first time, but then your code will start bombing out because the resource already exists. Meaning, their tool wants you to not have any resources that are long-lived, and because both S3 and Dynamo are for data storage, the tables and buckets, obviously need to be long lived. So, we would have to run Terraform for these, but then use Serverless Framework for deploying our API Gateway, AppSync, Lambdas, and other items that we want destroyed and recreated every time. Long story, short, it was a real pain in the ass.

# Structuring our CDK code base

Now that we decided what tool we would use for the job, now was the actual implementation. We have already been using CDK for our backend system, so we had several CloudFormation stacks already out in the wild. This included two stacks for our DynamoDB tables, a stack for or IoT setup, API Gateway, EventBridge Rules (CRON), our migrations, SQS, and Step Functions.

When you are building out your stacks, you do want to carefully architect them in that some things, like a Lambda, is basically stateless, meaning, if you want to deploy a new version, you could easily destroy your stack, delete your lambda, and then redeploy, and you will be back as good as new. But, if you try to delete a stack with an S3 bucket, thankfully it won't destroy it, because you have files in it. But, now, you can't just recreate your stack, you need to first import that resource into the stack, and then create it. The other thing, is that stacks can only have so many resources. For instance, we have two DynamoDB stacks, because we have a bunch of tables, and our deployment started blowing up after we hit the limit on our first stack. This is an important consideration for making your stacks, because you always want to future proof your code base.

With all that said, how should we structure our stacks for Glue. Well, we ended up with 3 stacks.

CloudFormation Stacks

# Stack 1 - S3 buckets and assets and Athena databases

Our first stack helps create the infrastructure around glue. This is a good time to bring up naming conventions as will. You will start to fall upon different conventions for different pieces of your system, for instance, S3 buckets are global in nature, so we have always found it important to put the environment directly into the name. I'll explain this a bit more below. But for things like a DynamoDB table, we just call it User, not UserDev. This is all a delicate balancing act as well, but a general rule of thumb is that if the service is global, like S3 or IAM, then baking in the environment will end up being quite helpful.

So back to our project, we needed to create 6 buckets for this process. The first bucket is for our python scripts used in jobs. This just holds the scripts that we use. It is called tumble-dev-gluescript-job for our dev environment for instance. We then run 3 BucketDeployment commands to take our python scripts from code and put them there. These will then be used later in our jobs.

The next bucket is the tumble-dev-glue-assets bucket. This is what our workflows and jobs will write to, it is for under the hood writes to the system.

The last 4 buckets control our medallion setup.

  • tumble-dev-analytics-bronze
  • tumble-dev-analytics-silver
  • tumble-dev-analytics-gold
  • tumble-dev-data-quality

Below, you can see a nice example of our bucket structure, and the actual parquet user data in bronze.

S3 Buckets

Now that we have our buckets created, we also create our Athena databases.


The four databases are for our bronze, silver, and gold data, and then a summary database for our data quality. You can also see a bucket called tumble-dev-athena in the above image. We actually created this in a prior stack, as we were still using Athena before this, so I didn't cover it in a list. But, because Athena is 100% serverless, you actually need a bucket to write the SQL queries you write to, and the results of the queries too. We set this up to allow for that.

# Stack 2 - Jobs, Crawlers, IAM Role, and EventBridge Rule Lambda

In stack 2, we are starting to piece together our workflow, or at least pieces of it. The first thing we need to do is create a proper IAM Role that allows our jobs to do their work. If you have dealt with AWS, you already know that the arn is everything. In this case, we need our glue role to be able to read and write from our S3 buckets, and access data in DynamoDB. Setting up proper roles is a long topic, but CDK does make it quite easy.

We also have set a lambda that uses EventBridge Rules, this lambda writes to slack for us, and anytime a job or crawler in the workflow completes, we get a notification about it. You can see the code for it below. This is a really powerful way to handle alerting.

// Start Notification systems
const glueAlertLambda = new lambda.GlueNotificationHandler(this, extraProps);
  new eventbridge.Rule(this, 'GlueNotificationHandlerRule', {
  enabled: true,
  description: 'GlueNotificationHandler',
  ruleName: `tumble-${envName.valueAsString}-glue-alert-notification`,
  eventPattern: {
    source: ['aws.glue'],
    detailType: ['Glue Job State Change', 'Glue Crawler State Change']
  targets: [new targets.LambdaFunction(glueAlertLambda.instance)]

If you check out the code in our lambda, you can see it is pretty simple, we check for a FAILED or TIMEOUT, and if so, send a slack message to our tumble-dev room.

import { Handler } from 'aws-lambda';
import { ConfigApi } from '@/handlers/_base/config';
import { LoggerService } from '@/handlers/_base/services/logger/loggerService';
import { SlackService } from '@/handlers/_base/services/slack';

const process: Handler = async (event, context) => {
  LoggerService.debug('glueNotificationHandler/handler', { event, context });

  const config = ConfigApi.loadByEnvironment();
  const slackService = new SlackService(config);
  const jobState = event?.detail?.state;
  const jobName = event?.detail?.jobName || event?.detail?.crawlerName;

  // send slack notification if the event state is failed
  if (jobState && ['FAILED', 'TIMEOUT'].indexOf(jobState) !== -1) {
    const message = `${jobName} Workflow ${jobState} - ${JSON.stringify(event)}`;
    await slackService.sendMessage(slackService._eventRoom, message);

export { process };

Visibility into the system is quite important, and for years now, I have used slack as our main source of receiving errors. People have often talked about how this doesn't scale well, but I have found no better way to get developers to solve for issues. Getting repeated errors blowing up your slack will make everyone fix things.

Now that we have our security and notifications setup, it is time to setup our jobs and crawlers. As you recall, we have 6 jobs, and 1 crawler for each of our 22 tables. Having to write this all out would be a nightmare, and maintaining it, equally so. But alas, we were able to make it that our 22 jobs were all stored in one file. Below is our User job props.

  scopeId: 'User',
  jobName: 'DynamoDB User',
  bucketFolder: 'user',
  tableName: 'user',
  dynamoTableName: 'User',
  columnsToRemove: 'roles,machineTransactions,account',
  dataTypes: 'shouldReceiveNotification:bool,email_verified:bool',
  bronzeWorkerType: 'G.1X',
  bronzeDqWorkerType: 'G.1X',
  bronzeKpiFields: 'id,as_of_date,email,isActive',
  bronzeDateTimeFields: 'createdAt,as_of_date',
  silverWorkerType: 'G.1X',
  silverDqWorkerType: 'G.1X',
  silverKpiFields: 'id,as_of_date,email,is_active',
  silverDateTimeFields: 'created_at,as_of_date',
  goldWorkerType: 'G.1X',
  goldDqWorkerType: 'G.1X',
  goldKpiFields: 'id,as_of_date,email,is_active',
  goldDateTimeFields: 'created_at,as_of_date',
  applyMapping: 'id:string:id:string-email:string:email:string-firstName:string:first_name:string-lastName:string:last_name:string-isOnline:string:is_online:string-picture:string:picture:string-phoneNumber:string:phone_number:string-identity:string:identity:string-isActive:boolean:is_active:boolean-integrationlatency:string:integration_latency:int-amount:string:amount:int-isPhoneNumberVerified:boolean:is_phone_number_verified:boolean-shouldReceiveNotification:boolean:should_receive_notification:boolean-createdAt:string:created_at:timestamp-createdBy:string:created_by:string-updatedAt:string:updated_at:timestamp-updatedBy:string:updated_by:string-deletedAt:string:deleted_at:timestamp-deletedBy:string:deleted_by:string'

This gives us all the information we need to create our 3 medallion layer jobs, our 3 DQ jobs, and our Crawler. We simply loop through all these job props, and create the jobs, crawlers, and DQ checks. Below, you can see a code snippet for creating the bronze job, bronze DQ check, and the bronze crawler.

// Create the bronze job
new bronzeJob.Bronze(this, {
  workerType: jobProp.bronzeWorkerType,

// Create the bronze crawler
new bronzeJob.Crawler(this, jobParam);

// Create the bronze data quality job
new dqJob.DQCheck(this, {
  scopeId: `${jobProp.scopeId}Bronze`,
  jobName: `${jobProp.jobName} Bronze`,
  stepName: 'bronze',
  kpiFields: jobProp.bronzeKpiFields,
  dateTimeFields: jobProp.bronzeDateTimeFields,
  workerType: jobProp.bronzeDqWorkerType,

This has really made the system quite scaleable, as to add another table into our pipeline, we just need to come up with the above json record, and on our next push of code to dev, the workflow will exist. This means, that we still get all the benefits of configuration, but the brains of our code base is in beautiful, wonderful TypeScript!

# Stack 3 - Workflows and Triggers

Now comes the part of stitching this all together. We need to create a workflow, and then 1 scheduled trigger to start the process, and then 6 conditional triggers that connect between the jobs and crawlers. Here you can see the workflow creation.

 //Create the Workflow
 const cfnWorkflow = new glue.CfnWorkflow(this, `${jobProp.tableName}workflow`, {
   description: `${jobProp.tableName} Etl pipeline`,
   name: `${jobProp.dynamoTableName} Workflow`,

That is very simple. Next we create the trigger that runs every day at 3AM UTC, you can also see that we have our staging run at 4AM UTC. This is because we run dev and staging in the same AWS account but in different regions. Because Glue is completely managed, you have a limit of what they call DPU's. You can only have 100 DPU's running at a time. This is why as the number of workflows grow, you might need to stagger them as well. Otherwise, you will get jobs erroring out because of this.

//Create the Bronze Scheduled Trigger at 3AM for dev,prod and 4AM for stg
const glue_trigger_workflow = new glue.CfnTrigger(this, `glue-trigger-${jobProp.tableName}-bronze`, {
  name: `Run Job ${jobProp.jobName} Bronze`,
  description: 'This start the bronze job.',
  actions: [{
    jobName: `${jobProp.jobName} Bronze`
  type: 'SCHEDULED',
  schedule: Schedule.cron({ hour: (envName.valueAsString == 'stg') ? '4' : '3', minute: '0' }).expressionString,
  startOnCreation: true,


You can see that this is of type SCHEDULED, and the action if to call the Bronze Job. When this job completes, as we have already talked about, we will want to kick off our crawler. This is done via a conditional type trigger.

//Create the Bronze Crawler
const glue_trigger_crawler_bronze = new glue.CfnTrigger(this, `glue-trigger_bronze_crawler_${jobProp.tableName}`, {
  name: `Run Crawler ${jobProp.jobName} Bronze`,
  actions: [{
    crawlerName: `${jobProp.jobName} Bronze Crawler`,
  predicate: {
    conditions: [{
      logicalOperator: 'EQUALS',
      jobName: `${jobProp.jobName} Bronze`,
      state: 'SUCCEEDED',
    logical: 'ANY',
  type: 'CONDITIONAL',
  startOnCreation: true,

You can see that the action is to call the bronze crawler. And that the type is CONDITIONAL, and the condition being in the predicate section, where the bronze job has succeeded.

This continues on for our DQ checks and the silver and gold layers.

# Monitoring

Once this code is deployed out to AWS, it will become pretty hands off. Every morning we have our data feed run, and we get notified if something fails. Below, you can see that we have had our User Workflow running for the last 4 days on dev, and it has been chugging right along.

Worfklow Runs

You can also query this data in our data quality Athena database, to check specific columns, or overall job runs.

Athena Query

And you will also now receive errors in slack. For instance, this is an example, where we have our DQ check throw an error if the phone_number for a user is not found.

Slack Alert

# Conclusion

Four months ago, we set out to create an ETL pipeline to move our data from the NoSQL DynamoDB realm, to an Athena SQL based system. It ended up being a lot more difficult than how it was written on the whiteboard that fateful day in October. But, we are very happy with the results and feel that we have a system that can now scale with our company as we continue to grow. And for us, most importantly, any of our backend developers can help to contribute to our data pipeline, without even having to leave our backend repository.

Last update: February 1, 2023 16:15