[ad_1]
Apache Airflow has change into the de facto library for pipeline orchestration within the Python ecosystem. It has gained recognition, contary to related options, as a result of its simplicity and extensibility. On this article, I’ll try to stipulate its most important ideas and provide you with a transparent understanding of when and find out how to use it.
Why and when ought to I contemplate Airflow?
Think about that you simply wish to construct a machine studying pipeline that consists of a number of steps resembling:
-
Learn a picture dataset from a cloud-based storage
-
Course of the photographs
-
Prepare a deep studying mannequin with the downloaded photos
-
Add the skilled mannequin within the cloud
-
Deploy the mannequin
How would you schedule and automate this workflow? Cron jobs are a easy resolution however they arrive with many issues. Most significantly, they gained’t permit you to scale successfully. However, Airflow presents the power to schedule and scale complicated pipelines simply. It additionally lets you mechanically re-run them after failure, handle their dependencies and monitor them utilizing logs and dashboards.
Earlier than we construct the aforementioned pipeline, let’s perceive the fundamental ideas of Apache Airflow.
What’s Airflow?
Apache Airflow is a device for authoring, scheduling, and monitoring pipelines. Because of this, is a perfect resolution for ETL and MLOps use instances. Instance use instances embrace:
-
Extracting information from many sources, aggregating them, remodeling them, and retailer in an information warehouse.
-
Extract insights from information and show them in an analytics dashboard
-
Prepare, validate, and deploy machine studying fashions
Key elements
When putting in Airflow in its default version, you will notice 4 totally different elements.
-
Webserver: Webserver is Airflow’s person interface (UI), which lets you work together with it with out the necessity for a CLI or an API. From there one can execute, and monitor pipelines, create connections with exterior methods, examine their datasets, and lots of extra.
-
Executor: Executors are the mechanism by which pipelines run. There are numerous differing kinds that run pipelines domestically, in a single machine, or in a distributed trend. A number of examples are
LocalExecutor
,SequentialExecutor
,CeleryExecutor
andKubernetesExecutor
-
Scheduler: The scheduler is answerable for executing totally different duties on the appropriate time, re-running pipelines, backfilling information, making certain duties completion, and so forth.
-
PostgreSQL: A database the place all pipeline metadata is saved. That is usually a Postgres however different SQL databases are supported too.
The simplest strategy to set up Airflow is utilizing docker compose
. You’ll be able to obtain the official docker compose file from right here:
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
Word that Airflow additionally resides on Pypi and could be downloaded utilizing
pip
Fundamental ideas of Airflow
So as to get began with Airflow, one must be accustomed to its most important ideas, which could be a little tough. So let’s attempt to demystify them.
DAGs
All pipelines are outlined as directed acyclic graphs (DAGs). Any time we execute a DAG, a person run is created. Every DAG run is separate from one other and comprises a standing concerning the execution stage of the DAG. Which means that the identical DAGs could be executed many occasions in parallel.
To instantiate a DAG, you should use the DAG
operate or with a context supervisor as follows:
from airflow import DAG
with DAG(
"mlops",
default_args={
"retries": 1,
},
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1)
) as dag:
The context supervisor accepts some world variables concerning the DAG and a few default arguments. The default arguments are handed into all duties and could be overridden on a per-task foundation. The whole listing of parameters could be discovered on the official docs.
On this instance, we outline that the DAG will begin on 1/1/2023 and might be executed every day. The retries
argument ensures that will probably be re-run as soon as after a doable failure.
Duties
Every node of the DAG represents a Job, which means a person piece of code. Every activity might have some upstream and downstream dependencies. These dependencies specific how duties are associated to one another and during which order they need to be executed. At any time when a brand new DAG run is initialized, all duties are initialized as Job situations. Which means that every Job occasion is a selected run for the given activity.
Operators
Operators could be considered as templates for predefined duties as a result of they encapsulate boilerplate code and summary a lot of their logic. Some frequent operators are BashOperator
, PythonOperator
, MySqlOperator
, S3FileTransformOperator
. As you’ll be able to inform, the operators assist you outline duties that comply with a selected sample. For instance, the MySqlOperator
creates a activity to execute a SQL question and the BashOperator
executes a bash script.
Operators are outlined contained in the DAG
context supervisor as beneath. The next code creates two duties, one to execute a bash command and one to execute a MySQL question.
with DAG(
"tutorial"
) as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task2 = MySqlOperator(
task_id="load_table",
sql="/scripts/load_table.sql"
)
Job dependencies
To kind the DAG’s construction, we have to outline dependencies between every activity. A method is to make use of the >>
image as proven beneath:
task1 >> task2 >> task3
Word that one activity might have a number of dependencies:
task1 >> [task2, task3]
The opposite means is thru the set_downstream
, set_upstream
features:
t1.set_downstream([t2, t3])
XComs
XComs, or cross communications, are answerable for communication between duties. XComs objects can push or pull information between duties. Extra particularly, they push information into the metadata database the place different duties can pull from. That’s why there’s a restrict to the quantity of information that may be handed by means of them. Nevertheless, if one must switch giant information, they will use appropriate exterior information storages resembling object storage or NoSQL databases.
Check out the next code. The 2 duties are speaking by way of xcoms
utilizing the ti
argument (quick for activity occasion). The train_model
activity is pushing the model_path
into the metadata database, which is pulled by the deploy_model
activity.
dag = DAG(
'mlops_dag',
)
def train_model(ti):
model_path = train_and_save_model()
ti.xcom_push(key='model_path', worth=model_path)
def deploy_model(ti):
model_path = ti.xcom_pull(key='model_path', task_ids='train_model')
deploy_trained_model(model_path)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
train_model_task >> deploy_model_task
Taskflow
The Taskflow API is a straightforward strategy to outline a activity utilizing the Python decorator @activity
. If all the duty’s logic could be written with Python, then a easy annotation can outline a brand new activity. Taskflow mechanically manages dependencies and communications between different duties.
Utilizing the Taskflow API, we are able to initialize a DAG with the @dag
decorator. Right here is an instance:
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval='@day by day'
)
def mlops():
@activity
def load_data():
. . .
return df
@activity
def preprocessing(information):
. . .
return information
@activity
def match(information):
return None
df = load_data()
information = preprocessing(df)
mannequin = match(information)
dag = mlops()
Word that dependencies between duties are implied by means of every operate arguments. Right here we now have a easy chaining order however issues can get way more complicated. Taskflow API additionally solves the communication drawback between duties, so there’s a restricted want to make use of XComs.
Scheduling
Scheduling jobs is likely one of the core options of Airflow. This may be performed utilizing the schedule_interval
argument which receives a cron expression, a datetime.timedelta
object, or a predefined preset resembling @hourly
, @day by day
and so forth. A extra versatile method is to make use of the lately added timetables that allow you to outline customized schedules utilizing Python.
Right here is an instance of find out how to use the schedule_interval
argument. The beneath DAG might be executed day by day.
@dag(
start_date=datetime(2023,1,1),
schedule_interval = '@day by day',
catchup =False
)
def my_dag():
go
Two crucial ideas you could perceive concerning scheduling are backfilling and catchup.
As soon as we outline a DAG, we arrange a begin date and a schedule interval. If catchup=True
, Airflow will create DAG runs for all schedule intervals from the beginning date till the present date. If catchup=False
, Airflow will schedule solely runs from the present date.
Backfilling extends this concept by enabling us to create previous runs from the CLI no matter the worth of the catchup parameter:
$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>
Connections and Hooks
Airflow supplies a straightforward strategy to configure connections with exterior methods or companies. Connections could be created utilizing the UI, as setting variables, or by means of a config file. They normally require a URL, authentication data and a singular id. Hooks are an API that abstracts communication with these exterior methods. For instance, we are able to outline a PostgreSQL connection by means of the UI as follows:
After which use the PostgresHook
to determine the connection and execute our queries:
pg_hook = PostgresHook(postgres_conn_id='postgres_custom')
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create desk _mytable (ModelID int, ModelName varchar(255)')
cursor.shut()
conn.shut()
Superior ideas
To maintain this tutorial as self-complete as doable, I would like to say a couple of extra superior ideas. I gained’t go into many particulars for every considered one of them however I extremely urge you to verify them out, if you wish to grasp Airflow.
-
Branching: Branching permits you to divide a activity into many alternative duties both for conditioning your workflow or for splitting the processing. The commonest means is
BranchPythonOperator
. -
Job Teams: Job Teams assist you manage your duties in a single unit. It’s an ideal device to simplify your graph view and for repeating patterns.
-
Dynamic Dags: Dags and duties can be constructed in a dynamic means. Since Airflow 2.3, dags and duties could be created at runtime which is right for parallel and input-dependent duties. Jinga templates are additionally supported by Airflow and are a really useful addition to dynamic dags.
-
Unit assessments and logging: Airflow has devoted performance for working unit assessments and logging data
Airflow finest practices
Earlier than we see a hands-on instance, let’s talk about a couple of finest practices that the majority practitioners use.
-
Idempotency: DAGs and duties needs to be idempotent. Reexecuting the identical DAG run with the identical inputs ought to all the time have the identical impact as executing it as soon as.
-
Atomicity: Duties needs to be atomic. Every activity needs to be answerable for a single operation and unbiased from the opposite duties
-
Incremental filtering: Every DAG run ought to course of solely a batch of information supporting incremental extracting and loading. That means doable failures gained’t have an effect on the whole dataset.
-
High-level code: High-level code needs to be averted if it’s not for creating operators or dags as a result of it’s going to have an effect on efficiency and loading time. All code needs to be inside duties, together with imports, database entry, and heavy computations
-
Complexity: DAGs needs to be saved so simple as doable as a result of excessive complexity might influence efficiency or scheduling
Instance of an Airflow pipeline
To display all of the aforementioned ideas, let’s return to the instance workflow talked about at the start of this text. We are going to develop a pipeline that trains a mannequin and deploy it in Kubernetes. Extra particularly, the DAG will include 5 duties:
-
Learn photos from an AWS s3 bucket
-
Preprocess the photographs utilizing Pytorch
-
Fantastic-tune a ResNet mannequin with the downloaded photos
-
Add the mannequin in S3
-
Deploy the mannequin in a Kubernetes Cluster
Word that I can’t embrace all the particular particulars and the required code, solely the components which can be associated to Airflow.
First, let’s begin by defining the DAG. As you’ll be able to see, the pipeline will execute as soon as a day. In case of failure, there might be a single retry after one hour. Furthermore, there might be no catchup even if the pipeline is meant to begin two days in the past.
from airflow import DAG
import datetime
default_args = {
'proprietor': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 1,
'retry_delay': datetime. timedelta(hours=1),
}
dag = DAG(
'resnet_model',
default_args=default_args,
description='A easy DAG to display Airflow with PyTorch and Kubernetes',
schedule_interval='@day by day',
catchup=False
)
The primary activity is answerable for studying the photographs from AWS S3. To perform that, we are able to use the S3Hook
. We first outline the studying performance in a operate after which the corresponding PythonOperator
. Word that right here I exploit the default AWS connection, however typically, you’ll need to outline your individual.
from airflow.suppliers.amazon.aws.hooks.s3 import S3Hook
def read_images_from_s3(**kwargs):
s3_conn = S3Hook(aws_conn_id='aws_default')
photos = []
for obj in s3_conn.get_bucket('mybucket').objects.all():
photos.append(obj.key)
kwargs['ti'].xcom_push(key='photos', worth=photos)
read_images = PythonOperator(
task_id='read_images',
python_callable=read_images_from_s3,
provide_context=True,
dag=dag
)
Subsequent in line are the remodel and match features. I gained’t embrace them right here of their entirety as a result of they’re principally normal Pytorch code.
def preprocess_images(photos, **kwargs):
photos = kwargs['ti'].xcom_pull(task_ids='read_images_from_s3', key='photos')
kwargs['ti'].xcom_push(key='photos', worth=train_images)
def fit_model(preprocessed_images, **kwargs):
train_ photos = kwargs['ti'].xcom_pull(task_ids=preprocess_images, key='train_images')
torch.save(mannequin, 'trained_model.pt')
preprocess = PythonOperator(
task_id='preprocess',
python_callable=preprocess,
provide_context=True,
dag=dag
)
fit_model = PythonOperator(
task_id='fit_model',
python_callable=fit_model,
provide_context=True,
dag=dag
)
As soon as the mannequin is skilled, we have to add it in S3 so we are able to load it and serve requests. This may be performed utilizing the S3FileTransferOperator
, which reads from the native file system and add it to S3.
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
upload_model = S3FileTransferOperator(
task_id='upload_model',
source_base_path='.',
source_key='trained_model.pt',
dest_s3_bucket='my-model-bucket',
dest_s3_key='trained_model.pt',
dag=dag
)
The ultimate step is to create a Kubernetes pod and serve the mannequin. One of the best ways to attain that’s by utilizing the KubernetedPodExecutor
. Assuming that we now have a deployment script that handles the mannequin loading and serving (which won’t analyze right here), we are able to do one thing as follows:
from airflow.suppliers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
deploy_model = KubernetesPodOperator(
namespace='default',
picture='myimage:newest',
identify='deploy-model',
task_id='deploy_model',
cmds=['python', 'deploy.py'],
arguments=[model],
is_delete_operator_pod=True,
hostnetwork=False,
dag=dag
)
The KubernetesPodOperator
makes use of the Kubernetes API to launch a pod in a Kubernetes cluster and execute the deployment script.
As soon as we now have outlined all duties, we merely must create their dependencies and kind the DAG. This is so simple as:
read_images >> preprocess >> fit_model >> upload_model >> deploy_model
And that’s all. This DAG might be initialized by Airflow and could be monitored by means of the UI. The scheduler might be answerable for executing the duties within the appropriate order and on the correct time.
Conclusion
Apache Airflow is a superb information engineering device in my trustworthy opinion. Positive, it has some shortcomings however can be very versatile and scalable. If you wish to dive deeper, I’ve two sources to counsel:
-
A course by IBM on Coursera: ETL and Knowledge Pipelines with Shell, Airflow and Kafka. By the way in which, the whole certification on information engineering by IBM is fairly nice.
-
Knowledge Engineering with AWS Nanodegree from AWS in Udacity. The 4th module specifically focuses closely on Airflow.
Tell us should you’d prefer to see extra tutorials on widespread information engineering libraries. When you’re new to AI Summer time, don’t overlook to comply with us on Twitter or Linkedin, to remain up to date with our newest articles.
* Disclosure: Please be aware that a few of the hyperlinks above could be affiliate hyperlinks, and at no further price to you, we are going to earn a fee should you resolve to make a purchase order after clicking by means of.
[ad_2]