{{announcement.body}}
{{announcement.title}}

Running Apache Airflow DAG with Docker

DZone 's Guide to

Running Apache Airflow DAG with Docker

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

· Open Source Zone ·
Free Resource

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

What is Airflow?

Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures.

Before going forward, let's get familiar with the terms:

  • Task or Operator: A defined unit of work.
  • Task instance: An individual run of a single task. The states could be running, success, failed, skipped, and up for retry.
  • DAG (Directed Acyclic Graph): A set of tasks with an execution order.
  • DAG Run: Individual DAG run.
  • Web Server: It is the UI of airflow, it also allows us to manage users, roles, and different configurations for the Airflow setup.
  • Scheduler: Schedules the jobs or orchestrates the tasks. It uses the DAGs object to decide what tasks need to be run, when, and where.
  • Executor: Executes the tasks. There are different types of executors: 
  • Sequential: Runs one task instance at a time.
  • Local: Runs tasks by spawning processes in a controlled fashion in different modes.
  • Celery:  An asynchronous task queue/job queue based on distributed message passing. For CeleryExecutor, one needs to set up a queue (Redis, RabbitMQ or any other task broker supported by Celery) on which all the celery workers running keep on polling for any new tasks to run
  • Kubernetes:  Provides a way to run Airflow tasks on Kubernetes, Kubernetes launch a new pod for each task.
  • Metadata Database: Stores the Airflow states. Airflow uses SqlAlchemy and Object Relational Mapping (ORM) written in Python to connect to the metadata database.

Now that we are familiar with the terms, let's get started.

Any time, you run into permission issue execute:

Shell
 




xxxxxxxxxx
1


 
1
chmod 777 <file>



Let's create the scripts folder and create a script called airflow-entrypoint.sh for running initdb and webserver.

scripts/airflow-entrypoint.sh

Shell
 




xxxxxxxxxx
1


 
1
#!/usr/bin/env bash
2
 
          
3
airflow upgradedb
4
 
          
5
airflow webserver



Let's now create the docker-compose file:

docker-compose.yml

Plain Text
 




xxxxxxxxxx
1
102
99


1
version: "2.1"
2
 
          
3
services:
4
 
          
5
  postgres:
6
 
          
7
    image: postgres:12
8
 
          
9
    environment:
10
 
          
11
      - POSTGRES_USER=airflow
12
 
          
13
      - POSTGRES_PASSWORD=airflow
14
 
          
15
      - POSTGRES_DB=airflow
16
 
          
17
    ports:
18
 
          
19
      - "5433:5432"
20
 
          
21
  scheduler:
22
 
          
23
    image: apache/airflow
24
 
          
25
    restart: always
26
 
          
27
    depends_on:
28
 
          
29
      - postgres
30
 
          
31
      - webserver
32
 
          
33
    env_file:
34
 
          
35
      - .env
36
 
          
37
    ports:
38
 
          
39
      - "8793:8793"
40
 
          
41
    volumes:
42
 
          
43
      - ./dags:/opt/airflow/dags
44
 
          
45
      - ./airflow-logs:/opt/airflow/logs
46
 
          
47
    command: scheduler
48
 
          
49
    healthcheck:
50
 
          
51
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
52
 
          
53
      interval: 30s
54
 
          
55
      timeout: 30s
56
 
          
57
      retries: 3
58
 
          
59
  webserver:
60
 
          
61
    image: apache/airflow
62
 
          
63
    hostname: webserver
64
 
          
65
    restart: always
66
 
          
67
    depends_on:
68
 
          
69
      - postgres
70
 
          
71
    env_file:
72
 
          
73
      - .env
74
 
          
75
    volumes:
76
 
          
77
      - ./dags:/opt/airflow/dags
78
 
          
79
      - ./scripts:/opt/airflow/scripts
80
 
          
81
      - ./airflow-logs:/opt/airflow/logs
82
 
          
83
    ports:
84
 
          
85
      - "8080:8080"
86
 
          
87
    entrypoint: ./scripts/airflow-entrypoint.sh
88
 
          
89
    healthcheck:
90
 
          
91
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
92
 
          
93
      interval: 30s
94
 
          
95
      timeout: 30s
96
 
          
97
      retries: 32
98
 
          



Now, create a .env file for environment variables.

.env

Plain Text
 




xxxxxxxxxx
1
11


 
1
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
2
 
          
3
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
4
 
          
5
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
6
 
          
7
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
8
 
          
9
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
10
 
          
11
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=10



At last, create a folder called dags and a file hello-airflow.py

hello-airflow.py

Python
 




xxxxxxxxxx
1
119


 
1
import codecs
2
 
          
3
import logging
4
 
          
5
from datetime import timedelta
6
 
          
7
from airflow import DAG
8
 
          
9
from airflow.operators.python_operator import PythonOperator
10
 
          
11
from airflow.utils import dates
12
 
          
13
 
          
14
 
          
15
logging.basicConfig(format="%(name)s-%(levelname)s-%(asctime)s-%(message)s", level=logging.INFO)
16
 
          
17
 
          
18
 
          
19
logger = logging.getLogger(__name__)
20
 
          
21
logger.setLevel(logging.INFO)
22
 
          
23
 
          
24
 
          
25
def create_dag(dag_id):
26
 
          
27
    default_args = {
28
 
          
29
        "owner": "jyoti",
30
 
          
31
        "description": (
32
 
          
33
            "DAG to explain airflow concepts"
34
 
          
35
        ),
36
 
          
37
        "depends_on_past": False,
38
 
          
39
        "start_date": dates.days_ago(1),
40
 
          
41
        "retries": 1,
42
 
          
43
        "retry_delay": timedelta(minutes=1),
44
 
          
45
        "provide_context": True,
46
 
          
47
    }
48
 
          
49
 
          
50
 
          
51
    new_dag = DAG(
52
 
          
53
        dag_id,
54
 
          
55
        default_args=default_args,
56
 
          
57
        schedule_interval=timedelta(minutes=5),
58
 
          
59
    )
60
 
          
61
 
          
62
 
          
63
    def task_1(**kwargs):
64
 
          
65
        logger.info('=====Executing Task 1=============')
66
 
          
67
        return kwargs['message']
68
 
          
69
 
          
70
 
          
71
    def task_2(**kwargs):
72
 
          
73
        logger.info('=====Executing Task 2=============')
74
 
          
75
        task_instance = kwargs['ti']
76
 
          
77
        result = task_instance.xcom_pull(key=None, task_ids='Task_1')
78
 
          
79
        logger.info('Extracted the value from task 1')
80
 
          
81
        logger.info(result)
82
 
          
83
 
          
84
 
          
85
    with new_dag:
86
 
          
87
        task1 = PythonOperator(task_id='Task_1',
88
 
          
89
                                                    python_callable=task_1,
90
 
          
91
                                                    op_kwargs=
92
 
          
93
                                                    {
94
 
          
95
                                                        'message': 'hellow airflow'
96
 
          
97
                                                    },
98
 
          
99
                                                    provide_context=True)
100
 
          
101
 
          
102
 
          
103
        task2 = PythonOperator(task_id='Task_2',
104
 
          
105
                                            python_callable=task_2,
106
 
          
107
                                            op_kwargs=None,
108
 
          
109
                                            provide_context=True)
110
 
          
111
        task2.set_upstream(task1)
112
 
          
113
        return new_dag
114
 
          
115
 
          
116
 
          
117
dag_id = "hello_airflow1"
118
 
          
119
globals()[dag_id] = create_dag(dag_id)



We have created two tasks(operators) here, task 2 will execute when task 1 completes, defined by task2.set_upstream(task1).

Task 1 pushes the data to task 2 for that we just need to set provide_context=True. The data will be pulled in task 2 using the task instance and the task id.

task_instance.xcom_pull(key=None, task_ids='Task_1')

That is all, now run docker-compose -f docker-compose.yml up and access the airflow on http://0.0.0.0:8080 



Thanks for reading!

Topics:
airflow, pipeline, pipeline architecture, python

Published at DZone with permission of Jyoti Sachdeva . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}