Table of contents
- Link to future weeks
- Brainstorm: Practices I'm pulling in from past software projects
- Notes: Docker and Postgres
Link to future weeks
Brainstorm: Practices I'm pulling in from past software projects
Take notes in Markdown
After I complete a unit of work, I take notes in a format that allows me to show code and Markdown text in the same document via a GitHub gist, Jupyter Notebooks, or Quarto.
Google Colab is a great place to take notes because Colab notebooks are free (you can even access free GPUs if necessary and linked to your Google account).
“Try three before Slack”
When I taught elementary and middle school, I had a rule in my classroom called “try three before me”. This meant that during work time, students should try three sources of information before they asked me a question. I coached students in how to do this before the rule went into effect.
Here is a checklist I use before I ask questions in the DE Slack or StackOverflow:
- Look through the FAQ
- Search the DataTalksClub DE Zoomcamp Slack channel
- Copy the error to my clipboard and Google it
- Google the error with terms like “reddit” or “stackoverflow” to find results from these particular sites
- Re-read materials or re-watch videos
Ask the best possible question
The good thing about “trying three before Slack” is that you’ll gain clarity on the best possible question to ask.
Here is how I ask questions in StackOverflow and Slack to provide context and reduce back-and-forths:
“I am trying to accomplish ___ (very specific step). I expected ___ but got ___ (error message or issue). I tried to resolve the error by ___, ___, and ___ (write a quick list of the three steps you took to solve your problem). Can someone explain how to ___ (if necessary, explain the information you need to move forward)."
Have a growth mindset
In my experience, schools and businesses incentivize getting things “right” right away. People who take on challenging work or actually take time to grapple with concepts are not rewarded. This actually goes against all research we have about learning. In order to learn, you need to engage in tasks that provide a bit of struggle. As you struggle, new connections form in your brain - this is how we learn.
Notes: Docker and Postgres
Introduction to Docker
Why Docker?
The Docker website says it best:
“A container is a standard unit of software that packages up code and all its dependencies so the application runs quickly and reliably from one computing environment to another. A Docker container image is a lightweight, standalone, executable package of software that includes everything needed to run an application: code, runtime, system tools, system libraries and settings.”
Containers are very useful to DEs because they allow us to bundle computing environments, dependencies, and other important elements into tidy packages that can be run by anyone (no matter which operating system they are using or packages they have already installed).
Docker containers are also STATELESS. This means that when you start and stop a container, you lose any changes. When we create more sophisticated Docker instructions, we will mount volumes.
Our first Dockerfile and Python pipeline
A data pipeline is a process of moving data from one place (the source) to a destination (such as a data warehouse).
What is a Dockerfile?
A Dockerfile
is a text document that contains all of the commands to
assemble an image. Docker’s Dockerfile
Reference is a
helpful guide to understanding these files.
Dockerfile Syntax
Instructions are written in uppercase and specific arguments are lowercase.
INSTRUCTION arguments
Our first Dockerfile
FROM python:3.9
RUN pip install pandas
WORKDIR /app
COPY pipeline.py pipeline.py
ENTRYPOINT [ "python", "pipeline.py" ]
Understanding each instruction
INSTRUCTION | argument | What’s happening? |
---|---|---|
FROM | python:3.9 | Establishes a base image |
RUN | pip install pandas | Runs PIP to install Pandas |
WORKDIR | /app | Changes the current director to /app |
COPY | pipeline.py pipeline.py | Copies pipeline.py from our local host to app/pipeline.py in the container |
ENTRYPOINT | “python”, “pipeline.py” | Runs the python command to start the Python interpreter and runs pipeline.py |
Our simple Python pipeline (pipeline.py)
import sys
import pandas as pandas
print(sys.argv)
day = sys.argv[1]
# Fancy stuff with pandas
print(f'Job finished successfully for day = {day}')
Ingesting NY Taxi Data to Postgres
Key packages to install
# You might need to use pip3 instead of pip (depends on your environment)
pip install pgcli
pip install notebook
pip install SQLAlchemy
pip install psycopg2-binary # You might not need this - I had to install it
Useful documentation
Docker commands to spin up database
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v /home/candace/Desktop/Code/zoomcamp/docker_test/ny_taxi_postgres_data \
-p 5432:5432 \
postgres:13
Using pgcli to connect to postgres
pgcli -h localhost -p 5432 -u root -d ny_taxi
CSV link
Link to CSV backup:
Jupyter Notebook (upload_date.ipynb)
# Import Pandas with the alias pd
import pandas as pd
# Check Pandas version
pd.__version__
# Create a Pandas dataframe with the first 100 rows of the csv
# See Pandas documentation or YouTube videos for info about dataframes
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100)
# Show dataframe
df
# Print schema
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))
# Change schema so that the pickup and dropoff datetimes are modeled as DATETIME instead of TEXT in the database schema
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Import create_engine from SQLalchemy
# I had to import psycopg2 but you might not have to
from sqlalchemy import create_engine
import psycopg2
# Create a SQLalchemy engine using the local Docker container server we have running at port 5432
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
# Connect to engine
engine.connect()
# Create a dataframe iterator that will iterate over the csv in chunks of 100,000
# See Corey Schafer video above for info about iterators
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100_000)
# Import time module so we can calculate how long each iteration takes
from time import time
# Establish a while loop
while True:
# Capture the start time
t_start = time()
# This is an iterable - it's moving to the next iteration
df = next(df_iter)
# Change schema to reflect datetime (vs. text)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
#Use the 'to_sql' method - connect via the engine. If a record already exists, append it
df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
# Capture end time of iteration
t_end = time()
# Print how long that iteration took
print('inserted another chunk...took %.3f second' % (t_end - t_start))
# Note - The code will end with an exception
Connecting pgAdmin and Postgres
Note: This section was particularly hard for me because I did not have the right argument behind the -v flag. I consulted the FAQ and used a volume name instead of a path:
-v ny_taxi_postgres_data:/var/lib/postgresql/data
Docker Networks
We need to use a Docker network so that one container that is running pgAdmin can communicate with another container that has the database (with a mounted volume).
Create a network
docker network create pg-network
Run Postgres
##Network
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB="ny_taxi" \
-v /home/candace/Desktop/Code/zoomcamp/docker_test/ny_taxi_postgres_data \
-p 5432:5432 \
--network=pg-network \
--name pg-database \
postgres:13
Run pgAdmin
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
--network=pg-network \
--name pgadmin2 \
dpage/pgadmin4
Putting the ingestion script into Docker
Converting the Jupyter notebook to a Python script
Jupyter notebooks are useful because they help us:
- Create a computational narrative that others can understand and modify independently
- Document our actions and learning with both code snippets and text
- Run pieces of a script or a program line by line (this helps with troubleshooting)
We used a Jupyter notebook to write and use a Python script. A script is a set of instructions for an external system (a script can be deleted without changing or disabling the target system). Scripts are an important part of data pipelines because we need sets of instructions to manipulate data (including downloading, ingesting, etc).
Now, we need to take the instructions in the Jupyter notebook, copy-paste them into a python file (ours will be called ingest_data.py) and clean up the instructions so a Python interpreter can read them.
Parametrizing the script with argparse
After creating the ingest_data.py file, copying the Jupyter code blocks over, and cleaning up the file (see video), we need to use argparse.
Why argparse?
When we created our Jupyter notebook, we were making it for a very specific use case. We used these arguments to specify the name of the file ('yellow_tripdata_2021-01.csv'), postgres-related information for the engine (username, password, port, and name - 'postgresql://root:root@localhost:5432/ny_taxi'), and other information.
To make our script more extendable, we need to make it so that many of these pieces of information are variables that a user can pass through when they run the script. Python's argparse module allows us to communite with the user - the script will use argparse to know which arguments to take in from the user and where to pass them into the script. This is a great primer on argparse from the perspective of a data scientist.
This is what our script looks like after pulling it from our Jupyter notebook into a .py file, cleaning it up, and using argparse:
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
from time import time
import argparse
import os
# Main function - ingests csv data to postgres db
def main(params):
# define parameters
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
csv_name = 'output.csv.gz'
# download the csv
os.system(f"wget {url} -O {csv_name}")
# prepare data for ingestion
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100_000)
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
df.to_sql(name=table_name, con=engine, if_exists='append')
# while loop for data ingestion
while True:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print('inserted another chunk...took %.3f second' % (t_end - t_start))
# main function
if __name__ == "__main__":
# parse arguments - user, password, host, port, database name , table name, url of the csv
parser = argparse.ArgumentParser()
parser.add_argument('--user', help='username for postgres')
parser.add_argument('--password', help='password for postgres')
parser.add_argument('--host', help='host for postgres')
parser.add_argument('--port', help='port for postgres')
parser.add_argument('--db', help='database name for postgres')
parser.add_argument('--table_name', help='name of the table where we will write the results to')
parser.add_argument('--url', help='url of the csv file')
args = parser.parse_args()
main(args)
Since this script is a Python program, we had to use 'if name == "main'. Check out Corey Schafer's YouTube video about 'if name == "main' for more information about it.
To test the script, we have to drop our existing table (via pgadmin) and run this in a terminal:
URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
python ingest_data.py \
--user=root \
--password=root \
--host=localhost \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}