The Prefect Pipeline Code for Building a Basic Data Lake

Building out the Prefect Flow

Imports

# imports needed for code examples
from prefect import task, Flow, Parameter
from prefect.client.secrets import Secret
import pandas as pd
import pymysql
import boto3
prefect
pandas
pymysql
boto3

DB Connection Task

@task
def get_db_vars():
"""Get the secrets."""
secret_db = {
'db_host': Secret("DB_HOST").get(),
'db_name': Secret("DB_NAME").get(),
'db_user': Secret("DB_USER").get(),
'db_pass': Secret("DB_PASS").get()}
return secret_db

MySQL Extract Task

@task
def get_data(db_host, db_name, db_user, db_pass, extract_date=None):
# connect to db
connection = pymysql.connect(
host=db_host,
user=db_user,
password=db_pass,
db=db_name
)
if extract_date is None:
# get all of yesterday’s data
query = '
SELECT * FROM reports
WHERE date_start = SUBDATE(CURRENT_DATE, 1)'
else:
# get data for a given day
query = 'SELECT * FROM reports
WHERE
date_start ={extract_date}'.format(extract_date=extract_date)
#read all the data you want into a dataframe
report_df = pd.read_sql(query, connection)
return report_df

Transformation Task

@task
def transform_data(report_df):
"""Convert data to decoded format."""
if len(report_df) > 0:
# iterate through and decode
report_df['decoded'] = report_df['encoded'].map(decode)
# drop the encoded column
report_df = report_df.drop(['encoded'], axis=1)
# finally, lets turn the dates into proper dates
report_df['date_start'] \=
report_df['date_start'].map(convert_date)
return report_df

Formatted Data Output Task

  1. Our output format is parquet
  2. We ‘partition’ our data in S3 by using a filename that includes a partition name and value. So, if we were writing today.parquet, we’d instead write it in the following file location:
    s3://my-bucket/data/tablename/partition=value/today.parquet
    e.g. s3://datalake/data/reports/date=20210101/today.parquet
@task
def write_transformed_data(report_df,
bucket_name='mah-bucket'):
"""Write transformed data to prod bucket."""
# get today's date
prefix_date = datetime.now()
if len(report_df) > 0:
# use the first record for naming instead
prefix_date = report_df['date_start'].iloc[0]
prefix_date = prefix_date.strftime('%Y%m%d')
prefix = 'data/reports/date=' + prefix_date
now = datetime.now().strftime('%Y%m%d-%H%M%S')
filename = prefix + '/' + now + '_reports.parquet'
print('Writing filename', filename)
# write to s3
session = boto3.Session(aws_access_key_id=Secret(
"AWS_CREDENTIALS").get()['ACCESS_KEY'],
aws_secret_access_key=Secret(
"AWS_CREDENTIALS").get()['SECRET_ACCESS_KEY'])
s3_client = session.client('s3')
dataframe_to_s3(s3_client, report_df,
bucket_name, filepath=filename,
format='parquet')

Final Flow

with Flow('Extract reports to S3') as flow:
# parameters for backfill
end_date = Parameter('end_date',
default=str(datetime.now().date()))
# actual flow
secrets = get_db_vars()
data = get_data(db_host=secrets['db_host'],
db_name=secrets['db_name'],
db_user=secrets['db_user'],
db_pass=secrets['db_pass'],
extract_date=end_date)
# write_raw_data(data)
transformed = transform_data(data)
write_transformed_data(transformed)
flow.run()

--

--

--

Machine Learning, AI and Data Expert. By day I work training machines to think, by night I plot to take over the world. All views expressed here are my own.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How to calculate exact cost and time for your next UX/UI freelance project

Simple GIT flow who works

Finally, InsureDAO testnet is out on Rinkeby testnet today, 6th August at 4:00 PM (UTC) !!

How A Data Enrichment API Can Bring You Better Marketing Results

How are big companies using Kubernetes?

A glimpse of Graph Algorithms. What is practically possible?

Creating a Network Topology Setup in such a way that System A can ping to Two Systems=>System B…

READ/DOWNLOAD%?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Kat Hempstalk

Kat Hempstalk

Machine Learning, AI and Data Expert. By day I work training machines to think, by night I plot to take over the world. All views expressed here are my own.

More from Medium

Introducing SAND-wich: An open-source project that’s focused on making your NFT collection really…

Perfect Opportunity for OTT and VOD… But What About Technological Barriers?

How to Use Slack Bot to Automatically Fill Attendance in Google Sheet

Flight Tracking Data — Where to Get It