The Prefect Pipeline Code for Building a Basic Data Lake

Earlier this month I wrote an article about building a basic data lake with Prefect and AWS, and promised a follow- up post where I’d share some of the code from the pipeline. This is that post.

First, a quick review: for our org, we set up a basic data lake using Prefect to assist in copying the data from AWS Aurora (MySQL) to S3. Prefect is a workflow orchestration engine, meaning that it can be used to schedule and run tasks (i.e. pieces of code) and report on their success or failure. Since data pipelines can easily fail thanks to any number of things — bad permissions, changes in the underlying data, lack of memory, etc — using an orchestrator such as Prefect allows us to be notified easily if anything goes wrong. The pipeline can also be inspected easily, no need to ssh into a machine and trawl through log files to figure out what’s caused it to fail.

In Prefect, each pipeline is a flow, and the flow performs multiple tasks — like extracting the data, transforming the data, or loading the data. The flow is a directed acyclic graph (DAG), meaning that tasks that are dependent on others won’t be run unless their predecessors complete successfully. Although Prefect has a cloud offering, and a nice UI if you choose to self-host the engine, it’s also possible to run the flow as a python script locally, rather than as part of an orchestrated workflow automation system.

Prefect is run using a server — the orchestration engine — and agents, which work on the tasks. The agents are designed to ‘pull’ — they will poll the server for tasks, and pick one up when it becomes available. We chose to use Docker agents, which pull the flow from a Docker repository before running it on a Docker virtual machine. The advantage with doing so is that our code is all neatly packaged up in a docker image and we don’t have to worry about dependencies when the task runs. This did cause some challenges, however, with setting up the agent on AWS EC2.

Building out the Prefect Flow

Our flow to our data lake is simple: we need to get the connection details for the database, get the data from the database, transform it, and finally write the data out to AWS S3.

Our basic flow looks like this:

Imports

# imports needed for code examples
from prefect import task, Flow, Parameter
from prefect.client.secrets import Secret
import pandas as pd
import pymysql
import boto3

The associated python requirements are:

prefect
pandas
pymysql
boto3

DB Connection Task

When retrieving values we took the approach of getting all the secrets at once, then passing them along to the other tasks as parameters. However, Prefect runs Secret().get() as a task so trying to get them all at once was a little redundant. Our approach was to set up a task returning a dictionary of the secrets (don’t do this, just do Secret(‘secretname’).get() where you need it):

@task
def get_db_vars():
"""Get the secrets."""
secret_db = {
'db_host': Secret("DB_HOST").get(),
'db_name': Secret("DB_NAME").get(),
'db_user': Secret("DB_USER").get(),
'db_pass': Secret("DB_PASS").get()}
return secret_db

MySQL Extract Task

@task
def get_data(db_host, db_name, db_user, db_pass, extract_date=None):
# connect to db
connection = pymysql.connect(
host=db_host,
user=db_user,
password=db_pass,
db=db_name
)
if extract_date is None:
# get all of yesterday’s data
query = '
SELECT * FROM reports
WHERE date_start = SUBDATE(CURRENT_DATE, 1)'
else:
# get data for a given day
query = 'SELECT * FROM reports
WHERE
date_start ={extract_date}'.format(extract_date=extract_date)
#read all the data you want into a dataframe
report_df = pd.read_sql(query, connection)
return report_df

Transformation Task

@task
def transform_data(report_df):
"""Convert data to decoded format."""
if len(report_df) > 0:
# iterate through and decode
report_df['decoded'] = report_df['encoded'].map(decode)
# drop the encoded column
report_df = report_df.drop(['encoded'], axis=1)
# finally, lets turn the dates into proper dates
report_df['date_start'] \=
report_df['date_start'].map(convert_date)
return report_df

Formatted Data Output Task

  1. Our output format is parquet
  2. We ‘partition’ our data in S3 by using a filename that includes a partition name and value. So, if we were writing today.parquet, we’d instead write it in the following file location:
    s3://my-bucket/data/tablename/partition=value/today.parquet
    e.g. s3://datalake/data/reports/date=20210101/today.parquet

This is later used by Glue to catalog the data, and when it is pointed at the data folder it will create a table called ‘reports’ with a partition scheme of ‘date’. A partition is to data lakes what an index is to databases — it helps the system find the data you are after more efficiently.

@task
def write_transformed_data(report_df,
bucket_name='mah-bucket'):
"""Write transformed data to prod bucket."""
# get today's date
prefix_date = datetime.now()
if len(report_df) > 0:
# use the first record for naming instead
prefix_date = report_df['date_start'].iloc[0]
prefix_date = prefix_date.strftime('%Y%m%d')
prefix = 'data/reports/date=' + prefix_date
now = datetime.now().strftime('%Y%m%d-%H%M%S')
filename = prefix + '/' + now + '_reports.parquet'
print('Writing filename', filename)
# write to s3
session = boto3.Session(aws_access_key_id=Secret(
"AWS_CREDENTIALS").get()['ACCESS_KEY'],
aws_secret_access_key=Secret(
"AWS_CREDENTIALS").get()['SECRET_ACCESS_KEY'])
s3_client = session.client('s3')
dataframe_to_s3(s3_client, report_df,
bucket_name, filepath=filename,
format='parquet')

Final Flow

with Flow('Extract reports to S3') as flow:
# parameters for backfill
end_date = Parameter('end_date',
default=str(datetime.now().date()))
# actual flow
secrets = get_db_vars()
data = get_data(db_host=secrets['db_host'],
db_name=secrets['db_name'],
db_user=secrets['db_user'],
db_pass=secrets['db_pass'],
extract_date=end_date)
# write_raw_data(data)
transformed = transform_data(data)
write_transformed_data(transformed)
flow.run()

And, that’s it. You could cut and paste the code out of here and it would almost work: I’ve omitted some of the functions such as convert_date which are arbitrary transformations we do in our pipeline.

Good luck!

Machine Learning, AI and Data Expert. By day I work training machines to think, by night I plot to take over the world. All views expressed here are my own.