In this topic, we will go through the process of building a very simple DAG by fetching a .csv file of the California housing dataset from GitHub and doing some very basic preprocessing on it.
The setup
To start off, we will use the official documentation to perform the setup with docker-compose. At first, fetch the .yaml compose file via
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.0/docker-compose.yaml'
The modification to the file will make it into this file that should be downloaded.
We changed the executor to a LocalExecutor instead of CeleryExecutor for convenience at this point, also removing redis since it's not needed for our purposes (LocalExecutor makes everything run on the same machine, Celery can distribute the tasks across multiple nodes, and that's where a message broker, e.g., redis, would be required).
There was an added volume, - ${AIRFLOW_PROJ_DIR:-.}/dataset:/opt/airflow/dataset, such that the results of fetching the files could be seen locally (but it's not a good practice in general, we simply did it to make everything stay in the same directory).
This is an extended Airflow compose since we want to use a few external packages, thus, the requirements.txt will be as follows:
numpy==1.24.2
pendulum
requests==2.28.2
pandas
And your Dockerfile in the same directory will look like this:
FROM apache/airflow:2.7.1
COPY requirements.txt /requirements.txt
RUN pip install --user --upgrade pip
RUN pip install --no-cache-dir --user -r /requirements.txt
Next, you have to set the necessary directories inside of the working directory and create the .evn file:
mkdir -p ./dags ./logs ./plugins ./config ./dataset
echo -e "AIRFLOW_UID=$(id -u)" > .env
Then, you initialize the database and create the first user with login airflow and the password airflow by running:
docker compose up airflow-init
At this point, before we actually up the container, we are ready to create the DAG itself.
Creating the DAG
The full DAG file, stored in the created dags directory and named the first_dag.py, will look as follows (we will look closely at the contents in a second):
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
import requests
import csv
import os
import pandas as pd
dir_path="/opt/airflow/dataset"
default_args = {
'owner': "root",
'retries': 5,
'retry_delay': timedelta(minutes = 2)
}
def converter(x):
if x=='<1H OCEAN':
return 0
if x=='INLAND':
return 1
if x=='NEAR OCEAN':
return 2
if x=='NEAR BAY':
return 3
else:
return 4
def fetch_and_save_csv():
url = "https://raw.githubusercontent.com/sonarsushant/California-House-Price-Prediction/master/housing.csv"
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.HTTPError as errh:
print(f"HTTP Error: {errh}")
except requests.exceptions.ConnectionError as errc:
print(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
print(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
print(f"Unknown Error: {err}")
else:
# Split content by line, then each line by comma
content = response.content.decode('utf-8').split('\n')
content = [item.split(',') for item in content]
try:
os.mkdir(dir_path)
except OSError:
print(f'Creation of the directory {dir_path} failed')
with open(f'{dir_path}/california_housing.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(content)
def preprocess():
df = pd.read_csv(f'{dir_path}/california_housing.csv', converters={'ocean_proximity':converter})
df.to_csv(f'{dir_path}/processed_california_housing.csv')
with DAG(
dag_id = 'first_dag',
default_args = default_args,
description = 'Description',
start_date = datetime(2023, 10, 15, 19),
schedule_interval = timedelta(days=1),
catchup=False
) as dag:
fetch = PythonOperator(
task_id='fetch_and_save_csv',
python_callable=fetch_and_save_csv
)
preprocess_file = PythonOperator(
task_id='preprocess',
python_callable=preprocess
)
fetch >> preprocess_file
So, to go through the file step by step. We start with the imports, and then we hard-code the dir_pathvariable to actually see where the files end up. We then define the default arguments for the DAG itself. The converter() function does a very simple transformation of the ocean_proximity feature.
After that, we fetch the .csv file from GitHub via fetch_and_save_csv, which will later be the first task in the DAG pipeline. We also define the preprocess(), our second task in the DAG, that applies the converter() function on the dataset and saves the new file.
We define the DAG itself starting from with DAG line by passing some default arguments, and use the PythonOperator two times, and then chain the created tasks.
Running the DAG
Now you can actually up the compose (docker compose up --build) and navigate to http://localhost:8080, and trigger the DAG. This process might take some time, but once the DAG is run, you will see two files in the dataset directory: the california_housing.csv and processed_california_housing.csv. The former contains the original fetched file, and the latter contains the file after encoding the ocean_proximity feature.
Conclusion
As a result, you are now familiar with setting up the Airflow compose file, defining some preliminary variables, and making a very simple DAG.