How to create a serverless data pipeline with AWS Glue and Lambda

A serverless data pipeline is a data integration architecture that leverages serverless computing services to manage, process, and move data between different stages in a workflow without the need for provisioning or managing servers.

We’ll create a serverless data pipeline using the Lambda function and AWS Glue job. First, let’s get a hang of these services. You may skip this part if you are already familiar with them.

Overview of services

Let's briefly discuss Lambda functions and the AWS Glue jobs.

Lambda: A Lambda function is a serverless computing service that allows us to execute code without provisioning or managing servers. It follows the pay-as-you-go model and only charges for the execution time.

AWS Glue: AWS Glue is a serverless data integration service that allows us to extract, load, and transform data as per our use case requirements. It offers services such as data crawling, data mapping, and data classification. In short, it is one serverless service that has all the required data integration features.

Designing the serverless data pipeline

There are multiple ways to design a serverless data pipeline. In this answer, we’ll discuss a simple approach. The following is the high-level architecture diagram of the infrastructure we’ll create in this Answer:

A serverless data pipeline using AWS Glue and Lambda function
A serverless data pipeline using AWS Glue and Lambda function

Create source and target S3 buckets

We’ll create two S3 buckets as our data source and target. Follow the steps below to create a source S3 bucket:

  • On the AWS Management Console, search “S3” and click “S3” from the search results.

  • Click the “Create bucket” button.

  • Enter a globally unique name for the S3 bucket.

  • Select “US East (N. Virginia) us-east-1” as the AWS region.

  • In the “Block Public Access settings for this bucket” section, uncheck the “Block all public access” option and acknowledge the public access warning by checking it.

  • Scroll down to the end of the page and click the “Create bucket” button.

Repeat the steps to create the target S3 bucket. We can also create the source and target buckets using AWS CLI. Before using the AWS CLI make sure that the user credentials are configured for AWS CLI using the command given below. Ensure you replace <ACCESS_KEY> and <SECRET_ACCESS_KEY> with your credentials.

aws configure set default.region us-east-1 && aws configure set aws_access_key_id <ACCESS_KEY> && aws configure set aws_secret_access_key <SECRET_ACCESS_KEY> && aws configure set default.output json

Run the commands given below to create source and target buckets, respectively.

# Create source bucket
aws s3api create-bucket --bucket "$source_bucket" --region $region
aws s3api put-public-access-block --bucket $source_bucket --public-access-block-configuration "BlockPublicAcls=false,IgnorePublicAcls=false,BlockPublicPolicy=false,RestrictPublicBuckets=false"
# Create destination bucket
aws s3api create-bucket --bucket "$destination_bucket" --region $region
aws s3api put-public-access-block --bucket $destination_bucket --public-access-block-configuration "BlockPublicAcls=false,IgnorePublicAcls=false,BlockPublicPolicy=false,RestrictPublicBuckets=false"
echo "Buckets created successfully!"

Create IAM role

We need an IAM role to give access to the Glue job to the S3 bucket. Follow the steps below to create the IAM role:

  • On the AWS Management Console, search for “IAM” and select “IAM” from the search results.

  • Click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.

  • For “Trusted entity type,” select “Custom trust policy" and enter the following trust policy in the editor:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
  • Click the “Next” button.

  • On the “Add permissions” page, select AmazonS3FullAccess and AWSGlueServiceRole.

  • Enter a name for the role and click the “Create role” button.

We have created a role to give glue job access. We can also create the IAM role using the command given below:

aws iam create-role --role-name GlueJobRole --assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}'
aws iam attach-role-policy --role-name GlueJobRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
aws iam attach-role-policy --role-name GlueJobRole --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
echo "Glue Role created successfully!"

Create a Glue job

We’ll use ETL visual to create the ETL job. The job defines the source, target, and transformation. Our source and target will be S3 buckets, and we'll use a simple SQL query to define the transformation. Follow the steps below to create an ETL job:

  • On the AWS Management Console, search for "Glue" and select "AWS Glue". From the "Data Integration and ETL" section, select "Visual ETL".

  • On the editor add S3 as the source. Enter the location of the source S3 bucket as the source and CSV as the data format.

  • Next, enter the SQL query as the transform and change the query to the query given below. Make sure to replace <SQL_ALIASES> with the SQL aliases assigned to your input source.

select gameID, leagueID,season from <SQL_ALIASES>
  • Finally, the target is an S3 bucket. Configure the location URI of the bucket and select the desired format and compression type of the transformed data.

After the configuration, name the job and click save. Do not run it, as we'll run it through our Lambda function. We can use AWS CLI to create the Glue job using the commands given below in main.sh file. However, the AWS CLI command requires the path to the script of the ETL job. The script.py file for this job is given below. Upload this file to a bucket and replace the <SCRIPT_BUCKET> on line 6 with the name of the bucket.

main.sh
script.py
aws glue create-job \
--name $etl_job_name \
--role $role_arn \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://<SCRIPT_BUCKET>"
}'
echo "Glue job created successfully!"

Create a role for the Lambda function

Before creating a role we must create a policy. Follow the steps below to create a policy:

  • On the AWS Management Console, search for “IAM” and select “IAM” from the search results.

  • Click “Policies” under the “Access management” heading in the left menu bar and click the “Create policy” button.

  • Select JSON as the policy editor and enter the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:564384594376:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:564384594376:log-group:/aws/lambda/my-answer-function:*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:StartJobRun",
"glue:GetJobRuns"
],
"Resource": "*"
}
]
}
  • Enter the policy name and click “Create policy” to create the policy.

Follow the steps below to create a role for Lambda function:

  • On the IAM Console, click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.

  • Click “Roles” under the “Access management” heading in the left menu bar and click the “Create role” button.

  • Select “Lambda” as the “Service or use case”.

  • Click the “Next” button.

  • On the “Add permissions” page, select the policy we created for the Lambda function.

  • Enter a name for the role and click the “Create role” button.

We've successfully created a role for our Lambda function. We can also create the IAM role for Lambda function using the command given below:

aws iam create-role --role-name LambdaRole --assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}'
aws iam attach-role-policy --role-name LambdaRole --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
aws iam attach-role-policy --role-name LambdaRole --policy-arn arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess
echo "Glue Role created successfully!"

Create a Lambda function

This Lambda function will be triggered as soon as the data is added to the S3 bucket. Follow the steps to launch the Lambda function:

  • Open the AWS Management Console for “IAMLabUser.” Search for “Lambda” and select “Lambda” from the search results. Click the “Create function” button.

  • On the “Create function” page, select “Author from scratch".

  • Enter the basic information such as name, runtime, and architecture. The code given in the widget below is for python 3.12.

  • Select the existing role we created in previous step for our Lambda function.

  • Leave the remaining settings as they are and click the “Create function” button at the end of the page.

Add a trigger to the function

To invoke the function when an object is added to the bucket, we’ll add the trigger to the function. Follow the steps below to add a trigger:

  • On the main page of the Lambda function, click “Add Trigger”.

  • Select the S3 bucket as the trigger source, select the name of the source bucket, and select the bucket name.

  • Select "All object create events" as the event type and add the trigger.

Function code

Add the following function code.

import boto3
import json
glue_client = boto3.client('glue')
job_name = '<ETL_JOB_NAME>'
def is_job_running(glue_client, job_name):
# Get the job runs for the specified job
response = glue_client.get_job_runs(JobName=job_name)
# Check if there are any running job runs
running_job_runs = [run for run in response['JobRuns'] if run['JobRunState'] == 'RUNNING']
return len(running_job_runs) > 0
def lambda_handler(event, context):
if is_job_running(glue_client, job_name):
print(f"The Glue job '{job_name}' is already running.")
return {
'statusCode': 200,
'body': json.dumps('The Glue job is already running.')
}
else:
# Start the Glue job run
response = glue_client.start_job_run(JobName=job_name)
print(f"Started the Glue job run with run ID: {response['JobRunId']}")
return response

Here is a brief explanation of the code:

  • Line 3: Define a glue client using boto3 SDK.

  • Line 5–12: This function checks if the job is already running to avoid exceeding the concurrent run limit.

  • Line 14–28: Defines the Lambda function handler that checks if the job is already running. If not, it kicks off the Glue Job.

We have added the function code to run the Glue job. However, our Lambda function needs access to run the job. Thus, edit the role of the Lambda function and add the AWSGlueServiceRole policy to it. We can also use AWS CLI to create the Lambda function and triggers using the commands given below.

role_arn = $(aws iam get-role --role-name "LambdaRole" --query 'Role.Arn' --output text)
# Create the Lambda function
aws lambda create-function \
--function-name $lambda_function \
--runtime python3.12 \
--region $region \
--handler lambda_function.handler \
--zip-file "fileb://function.zip" \
--role $role_arn
# Add resource-based permissions to Lambda function to allow S3 to invoke it
aws lambda add-permission \
--function-name $lambda_function \
--statement-id "AllowS3InvokeLambda" \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::$source_bucket \
--region $region
# Get ARN of the Lambda function
lambda_function_arn=$(aws lambda list-functions --query "Functions[?FunctionName=='$lambda_function'].FunctionArn" --output text)
NOTIFICATION_CONFIGUARTION = '{"LambdaFunctionConfigurations": [{"Id": "1","LambdaFunctionArn": "$lambda_function_arn","Events": ["s3:ObjectCreated:*"]}]}'
# Create trigger in the Lambda function.
aws s3api put-bucket-notification-configuration `
--bucket $source_bucket`
--notification-configuration $NOTIFICATION_CONFIGURATION
echo "Lambda function created successfully!"

Test the data pipeline

To test the pipeline, download the .csv file given in the widget below and upload it to the S3 bucket.

"gameID","leagueID","season","date"
"81","4","2015",2015-08-08 15:45:00
"82","1","2015",2015-08-08 18:00:00
"83","3","2015",2015-08-08 18:00:00
"84","5","2015",2015-08-08 18:00:00
"85","1","2015",2015-08-08 18:00:00
"86","4","2015",2015-08-08 20:30:00
"87","2","2015",2015-08-09 16:30:00
"88","3","2015",2015-08-09 16:30:00
"89","5","2015",2015-08-09 19:00:00
"90","5","2015",2015-08-10 23:00:00
"91","1","2015",2015-08-14 22:45:00
"92","1","2015",2015-08-15 15:45:00
"93","2","2015",2015-08-15 18:00:00
"94","5","2015",2015-08-15 18:00:00
"95","4","2015",2015-08-15 18:00:00
"96","3","2015",2015-08-15 18:00:00
"97","1","2015",2015-08-15 18:00:00
"98","3","2015",2015-08-16 16:30:00
"99","2","2015",2015-08-16 19:00:00

Once you add the file to the source bucket, open the target bucket. You'll notice the transformed data in the target bucket. The code snippet below shows the transformed .csv file.

{"gameID":"81","leagueID":"4","season":"2015"}
{"gameID":"82","leagueID":"1","season":"2015"}
{"gameID":"83","leagueID":"3","season":"2015"}
{"gameID":"84","leagueID":"5","season":"2015"}
{"gameID":"85","leagueID":"1","season":"2015"}
{"gameID":"86","leagueID":"4","season":"2015"}
{"gameID":"87","leagueID":"2","season":"2015"}
{"gameID":"88","leagueID":"3","season":"2015"}
{"gameID":"89","leagueID":"5","season":"2015"}
{"gameID":"90","leagueID":"5","season":"2015"}
{"gameID":"91","leagueID":"1","season":"2015"}
{"gameID":"92","leagueID":"1","season":"2015"}
{"gameID":"93","leagueID":"2","season":"2015"}
{"gameID":"94","leagueID":"5","season":"2015"}
{"gameID":"95","leagueID":"4","season":"2015"}
{"gameID":"96","leagueID":"3","season":"2015"}
{"gameID":"97","leagueID":"1","season":"2015"}
{"gameID":"98","leagueID":"3","season":"2015"}
{"gameID":"99","leagueID":"2","season":"2015"}
Transformed games.csv data

We can also upload the file to the S3 bucket using the commands given below:

aws s3 cp games.csv s3://$source_bucket
echo "Uploaded data to source s3 bucket!"
AWS CLI commands to uplaod file to S3 bucket

We have successfully created a serverless data pipeline using AWS Glue and Lambda functions with S3 buckets as the source and target.

Monitor using CloudWatch

We can use CloudWatch to monitor the execution of our Lambda function. Follow the steps below to view the logs generated by the Lambda function:

  • On the AWS Management Console, search for “Lambda” and select “Lambda” from the search results to navigate to the Lambda console.

  • Click the name of your function to open the function dashboard.

  • Switch to the “Monitor“ tab and click “View CloudWatch logs” to open the log group of the Lambda function.

  • Open the log stream to view the logs generated by the Lambda function.

We can use these logs to debug issues or to monitor the overall execution of our Lambda function.

Conclusion

Serverless data pipelines offer several benefits, revolutionizing how organizations handle data processing and integration. The key advantages are scalability and cost optimization, as serverless architectures automatically scale based on demand and only charge for the execution time.

Free Resources

Copyright ©2024 Educative, Inc. All rights reserved