A whole Apache Airflow tutorial: constructing information pipelines with Python

Advertisements

[ad_1]

Apache Airflow has change into the de facto library for pipeline orchestration within the Python ecosystem. It has gained recognition, contary to related options, as a result of its simplicity and extensibility. On this article, I’ll try to stipulate its most important ideas and provide you with a transparent understanding of when and find out how to use it.

Why and when ought to I contemplate Airflow?

Think about that you simply wish to construct a machine studying pipeline that consists of a number of steps resembling:

  1. Learn a picture dataset from a cloud-based storage

  2. Course of the photographs

  3. Prepare a deep studying mannequin with the downloaded photos

  4. Add the skilled mannequin within the cloud

  5. Deploy the mannequin

How would you schedule and automate this workflow? Cron jobs are a easy resolution however they arrive with many issues. Most significantly, they gained’t permit you to scale successfully. However, Airflow presents the power to schedule and scale complicated pipelines simply. It additionally lets you mechanically re-run them after failure, handle their dependencies and monitor them utilizing logs and dashboards.

Earlier than we construct the aforementioned pipeline, let’s perceive the fundamental ideas of Apache Airflow.

What’s Airflow?

Apache Airflow is a device for authoring, scheduling, and monitoring pipelines. Because of this, is a perfect resolution for ETL and MLOps use instances. Instance use instances embrace:

  • Extracting information from many sources, aggregating them, remodeling them, and retailer in an information warehouse.

  • Extract insights from information and show them in an analytics dashboard

  • Prepare, validate, and deploy machine studying fashions

Key elements

When putting in Airflow in its default version, you will notice 4 totally different elements.

  1. Webserver: Webserver is Airflow’s person interface (UI), which lets you work together with it with out the necessity for a CLI or an API. From there one can execute, and monitor pipelines, create connections with exterior methods, examine their datasets, and lots of extra.

  2. Executor: Executors are the mechanism by which pipelines run. There are numerous differing kinds that run pipelines domestically, in a single machine, or in a distributed trend. A number of examples are LocalExecutor, SequentialExecutor, CeleryExecutor and KubernetesExecutor

  3. Scheduler: The scheduler is answerable for executing totally different duties on the appropriate time, re-running pipelines, backfilling information, making certain duties completion, and so forth.

  4. PostgreSQL: A database the place all pipeline metadata is saved. That is usually a Postgres however different SQL databases are supported too.




airflow-ui

The simplest strategy to set up Airflow is utilizing docker compose. You’ll be able to obtain the official docker compose file from right here:

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

Word that Airflow additionally resides on Pypi and could be downloaded utilizing pip

Fundamental ideas of Airflow

So as to get began with Airflow, one must be accustomed to its most important ideas, which could be a little tough. So let’s attempt to demystify them.

DAGs

All pipelines are outlined as directed acyclic graphs (DAGs). Any time we execute a DAG, a person run is created. Every DAG run is separate from one other and comprises a standing concerning the execution stage of the DAG. Which means that the identical DAGs could be executed many occasions in parallel.




dag-example

To instantiate a DAG, you should use the DAG operate or with a context supervisor as follows:

from airflow import DAG

with DAG(

"mlops",

default_args={

"retries": 1,

},

schedule=timedelta(days=1),

start_date=datetime(2023, 1, 1)

) as dag:

The context supervisor accepts some world variables concerning the DAG and a few default arguments. The default arguments are handed into all duties and could be overridden on a per-task foundation. The whole listing of parameters could be discovered on the official docs.

On this instance, we outline that the DAG will begin on 1/1/2023 and might be executed every day. The retries argument ensures that will probably be re-run as soon as after a doable failure.

Duties

Every node of the DAG represents a Job, which means a person piece of code. Every activity might have some upstream and downstream dependencies. These dependencies specific how duties are associated to one another and during which order they need to be executed. At any time when a brand new DAG run is initialized, all duties are initialized as Job situations. Which means that every Job occasion is a selected run for the given activity.




complex-dag

Operators

Operators could be considered as templates for predefined duties as a result of they encapsulate boilerplate code and summary a lot of their logic. Some frequent operators are BashOperator, PythonOperator, MySqlOperator, S3FileTransformOperator. As you’ll be able to inform, the operators assist you outline duties that comply with a selected sample. For instance, the MySqlOperator creates a activity to execute a SQL question and the BashOperator executes a bash script.

Operators are outlined contained in the DAG context supervisor as beneath. The next code creates two duties, one to execute a bash command and one to execute a MySQL question.

with DAG(

"tutorial"

) as dag:

task1 = BashOperator(

task_id="print_date",

bash_command="date",

)

task2 = MySqlOperator(

task_id="load_table",

sql="/scripts/load_table.sql"

)

Job dependencies

To kind the DAG’s construction, we have to outline dependencies between every activity. A method is to make use of the >> image as proven beneath:

task1 >> task2 >> task3

Word that one activity might have a number of dependencies:

task1 >> [task2, task3]

The opposite means is thru the set_downstream, set_upstream features:

t1.set_downstream([t2, t3])

XComs

XComs, or cross communications, are answerable for communication between duties. XComs objects can push or pull information between duties. Extra particularly, they push information into the metadata database the place different duties can pull from. That’s why there’s a restrict to the quantity of information that may be handed by means of them. Nevertheless, if one must switch giant information, they will use appropriate exterior information storages resembling object storage or NoSQL databases.




xcoms

Check out the next code. The 2 duties are speaking by way of xcoms utilizing the ti argument (quick for activity occasion). The train_model activity is pushing the model_path into the metadata database, which is pulled by the deploy_model activity.

dag = DAG(

'mlops_dag',

)

def train_model(ti):

model_path = train_and_save_model()

ti.xcom_push(key='model_path', worth=model_path)

def deploy_model(ti):

model_path = ti.xcom_pull(key='model_path', task_ids='train_model')

deploy_trained_model(model_path)

train_model_task = PythonOperator(

task_id='train_model',

python_callable=train_model,

dag=dag

)

deploy_model_task = PythonOperator(

task_id='deploy_model',

python_callable=deploy_model,

dag=dag

)

train_model_task >> deploy_model_task

Taskflow

The Taskflow API is a straightforward strategy to outline a activity utilizing the Python decorator @activity. If all the duty’s logic could be written with Python, then a easy annotation can outline a brand new activity. Taskflow mechanically manages dependencies and communications between different duties.

Utilizing the Taskflow API, we are able to initialize a DAG with the @dag decorator. Right here is an instance:

@dag(

start_date=datetime(2023, 1, 1),

schedule_interval='@day by day'

)

def mlops():

@activity

def load_data():

. . .

return df

@activity

def preprocessing(information):

. . .

return information

@activity

def match(information):

return None

df = load_data()

information = preprocessing(df)

mannequin = match(information)

dag = mlops()

Word that dependencies between duties are implied by means of every operate arguments. Right here we now have a easy chaining order however issues can get way more complicated. Taskflow API additionally solves the communication drawback between duties, so there’s a restricted want to make use of XComs.

Scheduling

Scheduling jobs is likely one of the core options of Airflow. This may be performed utilizing the schedule_interval argument which receives a cron expression, a datetime.timedelta object, or a predefined preset resembling @hourly, @day by day and so forth. A extra versatile method is to make use of the lately added timetables that allow you to outline customized schedules utilizing Python.

Right here is an instance of find out how to use the schedule_interval argument. The beneath DAG might be executed day by day.

@dag(

start_date=datetime(2023,1,1),

schedule_interval = '@day by day',

catchup =False

)

def my_dag():

go

Two crucial ideas you could perceive concerning scheduling are backfilling and catchup.

As soon as we outline a DAG, we arrange a begin date and a schedule interval. If catchup=True, Airflow will create DAG runs for all schedule intervals from the beginning date till the present date. If catchup=False, Airflow will schedule solely runs from the present date.

Backfilling extends this concept by enabling us to create previous runs from the CLI no matter the worth of the catchup parameter:

$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>




dag-runs

Connections and Hooks

Airflow supplies a straightforward strategy to configure connections with exterior methods or companies. Connections could be created utilizing the UI, as setting variables, or by means of a config file. They normally require a URL, authentication data and a singular id. Hooks are an API that abstracts communication with these exterior methods. For instance, we are able to outline a PostgreSQL connection by means of the UI as follows:




connections

After which use the PostgresHook to determine the connection and execute our queries:

pg_hook = PostgresHook(postgres_conn_id='postgres_custom')

conn = pg_hook.get_conn()

cursor = conn.cursor()

cursor.execute('create desk _mytable (ModelID int, ModelName varchar(255)')

cursor.shut()

conn.shut()

Superior ideas

To maintain this tutorial as self-complete as doable, I would like to say a couple of extra superior ideas. I gained’t go into many particulars for every considered one of them however I extremely urge you to verify them out, if you wish to grasp Airflow.

  • Branching: Branching permits you to divide a activity into many alternative duties both for conditioning your workflow or for splitting the processing. The commonest means is BranchPythonOperator.

  • Job Teams: Job Teams assist you manage your duties in a single unit. It’s an ideal device to simplify your graph view and for repeating patterns.

  • Dynamic Dags: Dags and duties can be constructed in a dynamic means. Since Airflow 2.3, dags and duties could be created at runtime which is right for parallel and input-dependent duties. Jinga templates are additionally supported by Airflow and are a really useful addition to dynamic dags.

  • Unit assessments and logging: Airflow has devoted performance for working unit assessments and logging data

Airflow finest practices

Earlier than we see a hands-on instance, let’s talk about a couple of finest practices that the majority practitioners use.

  1. Idempotency: DAGs and duties needs to be idempotent. Reexecuting the identical DAG run with the identical inputs ought to all the time have the identical impact as executing it as soon as.

  2. Atomicity: Duties needs to be atomic. Every activity needs to be answerable for a single operation and unbiased from the opposite duties

  3. Incremental filtering: Every DAG run ought to course of solely a batch of information supporting incremental extracting and loading. That means doable failures gained’t have an effect on the whole dataset.

  4. High-level code: High-level code needs to be averted if it’s not for creating operators or dags as a result of it’s going to have an effect on efficiency and loading time. All code needs to be inside duties, together with imports, database entry, and heavy computations

  5. Complexity: DAGs needs to be saved so simple as doable as a result of excessive complexity might influence efficiency or scheduling

Instance of an Airflow pipeline

To display all of the aforementioned ideas, let’s return to the instance workflow talked about at the start of this text. We are going to develop a pipeline that trains a mannequin and deploy it in Kubernetes. Extra particularly, the DAG will include 5 duties:

  1. Learn photos from an AWS s3 bucket

  2. Preprocess the photographs utilizing Pytorch

  3. Fantastic-tune a ResNet mannequin with the downloaded photos

  4. Add the mannequin in S3

  5. Deploy the mannequin in a Kubernetes Cluster

Word that I can’t embrace all the particular particulars and the required code, solely the components which can be associated to Airflow.

First, let’s begin by defining the DAG. As you’ll be able to see, the pipeline will execute as soon as a day. In case of failure, there might be a single retry after one hour. Furthermore, there might be no catchup even if the pipeline is meant to begin two days in the past.

from airflow import DAG

import datetime

default_args = {

'proprietor': 'airflow',

'depends_on_past': False,

'start_date': airflow.utils.dates.days_ago(2),

'retries': 1,

'retry_delay': datetime. timedelta(hours=1),

}

dag = DAG(

'resnet_model',

default_args=default_args,

description='A easy DAG to display Airflow with PyTorch and Kubernetes',

schedule_interval='@day by day',

catchup=False

)

The primary activity is answerable for studying the photographs from AWS S3. To perform that, we are able to use the S3Hook. We first outline the studying performance in a operate after which the corresponding PythonOperator. Word that right here I exploit the default AWS connection, however typically, you’ll need to outline your individual.

from airflow.suppliers.amazon.aws.hooks.s3 import S3Hook

def read_images_from_s3(**kwargs):

s3_conn = S3Hook(aws_conn_id='aws_default')

photos = []

for obj in s3_conn.get_bucket('mybucket').objects.all():

photos.append(obj.key)

kwargs['ti'].xcom_push(key='photos', worth=photos)

read_images = PythonOperator(

task_id='read_images',

python_callable=read_images_from_s3,

provide_context=True,

dag=dag

)

Subsequent in line are the remodel and match features. I gained’t embrace them right here of their entirety as a result of they’re principally normal Pytorch code.

def preprocess_images(photos, **kwargs):

photos = kwargs['ti'].xcom_pull(task_ids='read_images_from_s3', key='photos')

kwargs['ti'].xcom_push(key='photos', worth=train_images)

def fit_model(preprocessed_images, **kwargs):

train_ photos = kwargs['ti'].xcom_pull(task_ids=preprocess_images, key='train_images')

torch.save(mannequin, 'trained_model.pt')

preprocess = PythonOperator(

task_id='preprocess',

python_callable=preprocess,

provide_context=True,

dag=dag

)

fit_model = PythonOperator(

task_id='fit_model',

python_callable=fit_model,

provide_context=True,

dag=dag

)

As soon as the mannequin is skilled, we have to add it in S3 so we are able to load it and serve requests. This may be performed utilizing the S3FileTransferOperator, which reads from the native file system and add it to S3.

from airflow.operators.s3_file_transform_operator import S3FileTransformOperator

upload_model = S3FileTransferOperator(

task_id='upload_model',

source_base_path='.',

source_key='trained_model.pt',

dest_s3_bucket='my-model-bucket',

dest_s3_key='trained_model.pt',

dag=dag

)

The ultimate step is to create a Kubernetes pod and serve the mannequin. One of the best ways to attain that’s by utilizing the KubernetedPodExecutor. Assuming that we now have a deployment script that handles the mannequin loading and serving (which won’t analyze right here), we are able to do one thing as follows:

from airflow.suppliers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

deploy_model = KubernetesPodOperator(

namespace='default',

picture='myimage:newest',

identify='deploy-model',

task_id='deploy_model',

cmds=['python', 'deploy.py'],

arguments=[model],

is_delete_operator_pod=True,

hostnetwork=False,

dag=dag

)

The KubernetesPodOperator makes use of the Kubernetes API to launch a pod in a Kubernetes cluster and execute the deployment script.

As soon as we now have outlined all duties, we merely must create their dependencies and kind the DAG. This is so simple as:

read_images >> preprocess >> fit_model >> upload_model >> deploy_model

And that’s all. This DAG might be initialized by Airflow and could be monitored by means of the UI. The scheduler might be answerable for executing the duties within the appropriate order and on the correct time.

Conclusion

Apache Airflow is a superb information engineering device in my trustworthy opinion. Positive, it has some shortcomings however can be very versatile and scalable. If you wish to dive deeper, I’ve two sources to counsel:

  1. A course by IBM on Coursera: ETL and Knowledge Pipelines with Shell, Airflow and Kafka. By the way in which, the whole certification on information engineering by IBM is fairly nice.

  2. Knowledge Engineering with AWS Nanodegree from AWS in Udacity. The 4th module specifically focuses closely on Airflow.

Tell us should you’d prefer to see extra tutorials on widespread information engineering libraries. When you’re new to AI Summer time, don’t overlook to comply with us on Twitter or Linkedin, to remain up to date with our newest articles.

Deep Studying in Manufacturing Ebook ?

Discover ways to construct, prepare, deploy, scale and keep deep studying fashions. Perceive ML infrastructure and MLOps utilizing hands-on examples.

Be taught extra

* Disclosure: Please be aware that a few of the hyperlinks above could be affiliate hyperlinks, and at no further price to you, we are going to earn a fee should you resolve to make a purchase order after clicking by means of.



[ad_2]