Hacer un despliegue usando Airflow

| Última modificación: 9 de abril de 2024 | Tiempo de Lectura: 3 minutos

Algunos de nuestros reconocimientos:

Premios KeepCoding

En este artículo vamos a hacer un despliegue usando Airflow de modo local, es decir, trabajando desde un notebook propio, desde nuestro ordenador, etc.

Hacer un despliegue usando Airflow

El primer paso en este proceso de hacer un despliegue usando Airflow es instalar la propia librería de Apache Airflow:

#Hacer un despliegue usando Airflow
! pip install apache - airflow
Hacer un despliegue usando Airflow

Luego tendremos que hacer el llamado o inicializar a la base de datos que tiene internamente. Al final, por detrás, este modelo tiene una base de datos para gestionar todos los metadatos, ejecuciones, etc.:

#Hacer un despliegue usando Airflow
! airflow db init

Una vez nos aparezca el mensaje que diga “inicialization done”, vamos a importar los módulos necesarios para definir nuestro primer DAG:

#Hacer un despliegue usando Airflow
%mkdir /root/airflow/dags
#Hacer un despliegue usando Airflow
#%%writefile /root/airflow/dags/dag.py

from datetime import timedelta

#The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

#Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

A continuación, definiremos los argumentos con los que nuestro DAG va a ejecutarse definiendo el comportamiento que deseemos:

#Hacer un despliegue usando Airflow
%%writefile -a /root/airflow/dags/dag.py

#These args will get passed on to each operator
#Yoy can override them on a per - task basis during operator initialization default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['[email protected]'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries' : 1,
       'retry_delay': timedelta (minutes = 5),
       #  'queue': 'bash_queue',
       #  'pool': 'backfill',
       #  'priority_weight': 10,
       #  'end_date': datetime (2016, 1, 1),
       #  'wait_for_downstream': False,
       #  'dag': dag,
       #  'sla': timedelta (hours = 2),
       #  'execution_timeout': timedelta (seconds = 300),
       #  'on_failure_callback': some_function,
       #  'on_success_callback': some_other_function,
       #  'on_retry_callback': another function,
       #  'sla_miss_callback': yet_another_function,
       #  'trigger_rule': 'all_success',
}

Lo siguiente será crear nuestro objeto DAG, al que le iremos añadiendo tareas usando los operadores:

#Hacer un despliegue usando Airflow
%%writefile -a /root/airflow/dags/dag.py

dag = DAG (
       'basic - usage - tutorial',
       default_args = default_args,
       description = 'A simple tutorial DAG',
       schedule_interval = timedelta (days = 1),
       start_date = days_ago (2),
       tags = ['example'],
)
#Hacer un despliegue usando Airflow
%%writefile -a /root/airflow/dags/dag.py

t1 = BashOperator (
       task_id = 'print_date',
       bash_command = 'date',
       dag = dag,
)

t2 = BashOperator (
       task_id = 'sleep',

       depends_on_past = False,
       bash_command = 'sleep 5',
       retries = 3
       dag = dag,
)

Integración nativa con Jinja

Otra de las principales características a la hora de hacer un despliegue usando Airflow es la integración nativa con Jinja, lo que permite la parametrización de plantillas en tiempo de ejecución:

#Hacer un despliegue usando Airflow
##%% writefile -a /root/airflow/dags/dag.py

templated_command = " " "
{% for i in range (5) %}
       echo "{{ ds }}"
       echo "{{ macros.ds_add (ds, 7)}}"
       echo "{{ params.my_param }}"
{% endfor %}
" " "

t2 = BashOperator (
       task_id = 'templated',
       depends_on_past = False,
       bash_command = 'templated_command',
       params = {'my_param': 'Parameter I passed in'},
       dag = dag,

Una vez tenemos definidas las tareas que conforman nuestro DAG, lo siguiente será establecer el orden de ejecución entre ellas y definir sus dependencias. Por ejemplo:

#Hacer un despliegue usando Airflow
t1.set_downstream (t2)

#This mean that t2 will depend on t1 running successfully to run. It is equivalent to:
t2.set_upstream (t1)

#The bit shift operator can also be used to chain operations:
t1 >> t2

#And the upstream dependency with the bit shift operator:
t2 >> t1

#Chaining multiple dependencies becomes concise with the bit shift operator: 
t1 >> t2 >> t3

#A list of tasks can also be set as dependencies. These operations all have the same efect:
t1.set_downstream ([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Para nuestro caso usaremos la siguiente definición:

#Hacer un despliegue usando Airflow
%%writefile -a /root/airflow/dags/dat.py

t1 >> [t2, t3]

Si hemos hecho todo correctamente en la definición, al ejecutar la siguiente celda no deberíamos recibir nada como salida:

#Hacer un despliegue usando Airflow
! python /root/airflow/dags/dag.py

Ahora que ya tenemos nuestro DAG listo, inicializamos Airflow para empezar a lanzar ejecuciones:

#Hacer un despliegue usando Airflow
#print the list of active DAGs
!airflow dags list

¿Te gustaría seguir aprendiendo sobre Big data?

Si tu meta es seguir aprendiendo acerca de cómo hacer un despliegue usando Airflow y quieres seguir formándote en alguna de las numerosas temáticas del mundo del Big Data, no puedes perderte el Big Data, Inteligencia Artificial & Machine Learning Full Stack Bootcamp. En esta formación intensiva y de alta calidad podrás, en pocos meses, transformarte en un gran profesional IT al aprender sobre despliegue de un modelo en streaming y muchos aspectos más que te darán la oportunidad de destacar en este demandado mercado laboral. ¡Solicita ahora mismo más información y da el salto que catapultará tu perfil profesional!

Sandra Navarro

Business Intelligence & Big Data Advisor & Coordinadora del Bootcamp en Data Science, Big Data & Machine Learning.

Posts más leídos

¡CONVOCATORIA ABIERTA!

Big Data, IA & Machine Learning

Full Stack Bootcamp

Clases en Directo | Profesores en Activo | Temario 100% actualizado