The Prefect Pipeline Code for Building a Basic Data Lake

Kat Hempstalk
5 min readAug 18, 2021

Earlier this month I wrote an article about building a basic data lake with Prefect and AWS, and promised a follow- up post where I’d share some of the code from the pipeline. This is that post.

First, a quick review: for our org, we set up a basic data lake using Prefect to assist in copying the data from AWS Aurora (MySQL) to S3. Prefect is a workflow orchestration engine, meaning that it can be used to schedule and run tasks (i.e. pieces of code) and report on their success or failure. Since data pipelines can easily fail thanks to any number of things — bad permissions, changes in the underlying data, lack of memory, etc — using an orchestrator such as Prefect allows us to be notified easily if anything goes wrong. The pipeline can also be inspected easily, no need to ssh into a machine and trawl through log files to figure out what’s caused it to fail.

In Prefect, each pipeline is a flow, and the flow performs multiple tasks — like extracting the data, transforming the data, or loading the data. The flow is a directed acyclic graph (DAG), meaning that tasks that are dependent on others won’t be run unless their predecessors complete successfully. Although Prefect has a cloud offering, and a nice UI if you choose to self-host the engine, it’s also possible to run the flow as a python script locally, rather than as part of an orchestrated workflow automation system.

Prefect is run using a server — the orchestration engine — and agents, which work on the tasks. The agents are designed to ‘pull’ — they will poll the server for tasks, and pick one up when it becomes available. We chose to use Docker agents, which pull the flow from a Docker repository before running it on a Docker virtual machine. The advantage with doing so is that our code is all neatly packaged up in a docker image and we don’t have to worry about dependencies when the task runs. This did cause some challenges, however, with setting up the agent on AWS EC2.

Building out the Prefect Flow

In Prefect, you construct a series of tasks into a flow. Each task is really just a python function with a decorator to recognise it as a task and ensure Prefect monitors it correctly. The flow then strings together multiple tasks and runs it as a complete program.

Our flow to our data lake is simple: we need to get the connection details for the database, get the data from the database, transform it, and finally write the data out to AWS S3.

Our basic flow looks like this:

Imports

Here are the imports we used (our complete code is part of a proprietary system, so no GitHub repository link is available, sorry):

# imports needed for code examples
from prefect import task, Flow, Parameter
from prefect.client.secrets import Secret
import pandas as pd
import pymysql
import boto3

The associated python requirements are:

prefect
pandas
pymysql
boto3

DB Connection Task

The first task is to get the secrets from the Prefect Secrets store ready for passing into the other tasks. It’s a typical pattern when dealing with cloud systems (e.g. Prefect, GitHub Actions) to get passwords from their secrets store instead of hard coding it or storing it in a risky way. When running Prefect locally, secrets were provided via the TOML format Prefect config file (~/.prefect/config.toml). In cloud, they are stored in a Hashicorp Vault and entered via the Prefect cloud UI.

When retrieving values we took the approach of getting all the secrets at once, then passing them along to the other tasks as parameters. However, Prefect runs Secret().get() as a task so trying to get them all at once was a little redundant. Our approach was to set up a task returning a dictionary of the secrets (don’t do this, just do Secret(‘secretname’).get() where you need it):

@task
def get_db_vars():
"""Get the secrets."""
secret_db = {
'db_host': Secret("DB_HOST").get(),
'db_name': Secret("DB_NAME").get(),
'db_user': Secret("DB_USER").get(),
'db_pass': Secret("DB_PASS").get()}
return secret_db

MySQL Extract Task

After getting the secrets, our first real pipeline task is a MySQL extraction: we log into our database, grab a day’s worth of data, and return it as a pandas dataframe. Note the only thing that is ‘Prefect’ syntax here is the decorator for the function denoting it as a ‘task’.

@task
def get_data(db_host, db_name, db_user, db_pass, extract_date=None):
# connect to db
connection = pymysql.connect(
host=db_host,
user=db_user,
password=db_pass,
db=db_name
)
if extract_date is None:
# get all of yesterday’s data
query = '
SELECT * FROM reports
WHERE date_start = SUBDATE(CURRENT_DATE, 1)'
else:
# get data for a given day
query = 'SELECT * FROM reports
WHERE
date_start ={extract_date}'.format(extract_date=extract_date)
#read all the data you want into a dataframe
report_df = pd.read_sql(query, connection)
return report_df

Transformation Task

Ah, the bit where the magic happens. Again, this task is relatively straightforward: it takes in the pandas dataframe, deserialises the data and makes sure dates and values are in the right format, and then returns it back to the main flow. Again, the only thing prefect-like about this is the task decorator for the function.

@task
def transform_data(report_df):
"""Convert data to decoded format."""
if len(report_df) > 0:
# iterate through and decode
report_df['decoded'] = report_df['encoded'].map(decode)
# drop the encoded column
report_df = report_df.drop(['encoded'], axis=1)
# finally, lets turn the dates into proper dates
report_df['date_start'] \=
report_df['date_start'].map(convert_date)
return report_df

Formatted Data Output Task

The final step in our flow: outputting the transformed data frame to S3. The key things to note are:

  1. Our output format is parquet
  2. We ‘partition’ our data in S3 by using a filename that includes a partition name and value. So, if we were writing today.parquet, we’d instead write it in the following file location:
    s3://my-bucket/data/tablename/partition=value/today.parquet
    e.g. s3://datalake/data/reports/date=20210101/today.parquet

This is later used by Glue to catalog the data, and when it is pointed at the data folder it will create a table called ‘reports’ with a partition scheme of ‘date’. A partition is to data lakes what an index is to databases — it helps the system find the data you are after more efficiently.

@task
def write_transformed_data(report_df,
bucket_name='mah-bucket'):
"""Write transformed data to prod bucket."""
# get today's date
prefix_date = datetime.now()
if len(report_df) > 0:
# use the first record for naming instead
prefix_date = report_df['date_start'].iloc[0]
prefix_date = prefix_date.strftime('%Y%m%d')
prefix = 'data/reports/date=' + prefix_date
now = datetime.now().strftime('%Y%m%d-%H%M%S')
filename = prefix + '/' + now + '_reports.parquet'
print('Writing filename', filename)
# write to s3
session = boto3.Session(aws_access_key_id=Secret(
"AWS_CREDENTIALS").get()['ACCESS_KEY'],
aws_secret_access_key=Secret(
"AWS_CREDENTIALS").get()['SECRET_ACCESS_KEY'])
s3_client = session.client('s3')
dataframe_to_s3(s3_client, report_df,
bucket_name, filepath=filename,
format='parquet')

Final Flow

These three tasks make up our final flow, which is basically a standard ETL (Extract, Transform, Load) data pipeline task.

with Flow('Extract reports to S3') as flow:
# parameters for backfill
end_date = Parameter('end_date',
default=str(datetime.now().date()))
# actual flow
secrets = get_db_vars()
data = get_data(db_host=secrets['db_host'],
db_name=secrets['db_name'],
db_user=secrets['db_user'],
db_pass=secrets['db_pass'],
extract_date=end_date)
# write_raw_data(data)
transformed = transform_data(data)
write_transformed_data(transformed)
flow.run()

And, that’s it. You could cut and paste the code out of here and it would almost work: I’ve omitted some of the functions such as convert_date which are arbitrary transformations we do in our pipeline.

Good luck!

--

--

Kat Hempstalk

Machine Learning, AI and Data Expert. By day I work training machines to think, by night I plot to take over the world. All views expressed here are my own.