DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Securing Your Software Supply Chain with JFrog and Azure
Register Today

Trending

  • Building a Flask Web Application With Docker: A Step-by-Step Guide
  • Observability Architecture: Financial Payments Introduction
  • Reactive Programming
  • Auditing Tools for Kubernetes

Trending

  • Building a Flask Web Application With Docker: A Step-by-Step Guide
  • Observability Architecture: Financial Payments Introduction
  • Reactive Programming
  • Auditing Tools for Kubernetes
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Running Apache Airflow DAG with Docker

Running Apache Airflow DAG with Docker

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

Jyoti Sachdeva user avatar by
Jyoti Sachdeva
·
Aug. 07, 20 · Tutorial
Like (4)
Save
Tweet
Share
12.68K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

What is Airflow?

Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures.

Before going forward, let's get familiar with the terms:

  • Task or Operator: A defined unit of work.
  • Task instance: An individual run of a single task. The states could be running, success, failed, skipped, and up for retry.
  • DAG (Directed Acyclic Graph): A set of tasks with an execution order.
  • DAG Run: Individual DAG run.
  • Web Server: It is the UI of airflow, it also allows us to manage users, roles, and different configurations for the Airflow setup.
  • Scheduler: Schedules the jobs or orchestrates the tasks. It uses the DAGs object to decide what tasks need to be run, when, and where.
  • Executor: Executes the tasks. There are different types of executors: 
  • Sequential: Runs one task instance at a time.
  • Local: Runs tasks by spawning processes in a controlled fashion in different modes.
  • Celery:  An asynchronous task queue/job queue based on distributed message passing. For CeleryExecutor, one needs to set up a queue (Redis, RabbitMQ or any other task broker supported by Celery) on which all the celery workers running keep on polling for any new tasks to run
  • Kubernetes:  Provides a way to run Airflow tasks on Kubernetes, Kubernetes launch a new pod for each task.
  • Metadata Database: Stores the Airflow states. Airflow uses SqlAlchemy and Object Relational Mapping (ORM) written in Python to connect to the metadata database.

Now that we are familiar with the terms, let's get started.

Any time, you run into permission issue execute:

Shell
 




xxxxxxxxxx
1


 
1
chmod 777 <file>



Let's create the scripts folder and create a script called airflow-entrypoint.sh for running initdb and webserver.

scripts/airflow-entrypoint.sh

Shell
 




xxxxxxxxxx
1


 
1
#!/usr/bin/env bash
2

          
3
airflow upgradedb
4

          
5
airflow webserver



Let's now create the docker-compose file:

docker-compose.yml

Plain Text
 




xxxxxxxxxx
1
102
99


 
1
version: "2.1"
2

          
3
services:
4

          
5
  postgres:
6

          
7
    image: postgres:12
8

          
9
    environment:
10

          
11
      - POSTGRES_USER=airflow
12

          
13
      - POSTGRES_PASSWORD=airflow
14

          
15
      - POSTGRES_DB=airflow
16

          
17
    ports:
18

          
19
      - "5433:5432"
20

          
21
  scheduler:
22

          
23
    image: apache/airflow
24

          
25
    restart: always
26

          
27
    depends_on:
28

          
29
      - postgres
30

          
31
      - webserver
32

          
33
    env_file:
34

          
35
      - .env
36

          
37
    ports:
38

          
39
      - "8793:8793"
40

          
41
    volumes:
42

          
43
      - ./dags:/opt/airflow/dags
44

          
45
      - ./airflow-logs:/opt/airflow/logs
46

          
47
    command: scheduler
48

          
49
    healthcheck:
50

          
51
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
52

          
53
      interval: 30s
54

          
55
      timeout: 30s
56

          
57
      retries: 3
58

          
59
  webserver:
60

          
61
    image: apache/airflow
62

          
63
    hostname: webserver
64

          
65
    restart: always
66

          
67
    depends_on:
68

          
69
      - postgres
70

          
71
    env_file:
72

          
73
      - .env
74

          
75
    volumes:
76

          
77
      - ./dags:/opt/airflow/dags
78

          
79
      - ./scripts:/opt/airflow/scripts
80

          
81
      - ./airflow-logs:/opt/airflow/logs
82

          
83
    ports:
84

          
85
      - "8080:8080"
86

          
87
    entrypoint: ./scripts/airflow-entrypoint.sh
88

          
89
    healthcheck:
90

          
91
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
92

          
93
      interval: 30s
94

          
95
      timeout: 30s
96

          
97
      retries: 32
98

          



Now, create a .env file for environment variables.

.env

Plain Text
 




xxxxxxxxxx
1
11


 
1
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
2

          
3
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
4

          
5
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
6

          
7
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
8

          
9
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
10

          
11
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=10



At last, create a folder called dags and a file hello-airflow.py

hello-airflow.py

Python
 




xxxxxxxxxx
1
119


 
1
import codecs
2

          
3
import logging
4

          
5
from datetime import timedelta
6

          
7
from airflow import DAG
8

          
9
from airflow.operators.python_operator import PythonOperator
10

          
11
from airflow.utils import dates
12

          
13

          
14

          
15
logging.basicConfig(format="%(name)s-%(levelname)s-%(asctime)s-%(message)s", level=logging.INFO)
16

          
17

          
18

          
19
logger = logging.getLogger(__name__)
20

          
21
logger.setLevel(logging.INFO)
22

          
23

          
24

          
25
def create_dag(dag_id):
26

          
27
    default_args = {
28

          
29
        "owner": "jyoti",
30

          
31
        "description": (
32

          
33
            "DAG to explain airflow concepts"
34

          
35
        ),
36

          
37
        "depends_on_past": False,
38

          
39
        "start_date": dates.days_ago(1),
40

          
41
        "retries": 1,
42

          
43
        "retry_delay": timedelta(minutes=1),
44

          
45
        "provide_context": True,
46

          
47
    }
48

          
49

          
50

          
51
    new_dag = DAG(
52

          
53
        dag_id,
54

          
55
        default_args=default_args,
56

          
57
        schedule_interval=timedelta(minutes=5),
58

          
59
    )
60

          
61

          
62

          
63
    def task_1(**kwargs):
64

          
65
        logger.info('=====Executing Task 1=============')
66

          
67
        return kwargs['message']
68

          
69

          
70

          
71
    def task_2(**kwargs):
72

          
73
        logger.info('=====Executing Task 2=============')
74

          
75
        task_instance = kwargs['ti']
76

          
77
        result = task_instance.xcom_pull(key=None, task_ids='Task_1')
78

          
79
        logger.info('Extracted the value from task 1')
80

          
81
        logger.info(result)
82

          
83

          
84

          
85
    with new_dag:
86

          
87
        task1 = PythonOperator(task_id='Task_1',
88

          
89
                                                    python_callable=task_1,
90

          
91
                                                    op_kwargs=
92

          
93
                                                    {
94

          
95
                                                        'message': 'hellow airflow'
96

          
97
                                                    },
98

          
99
                                                    provide_context=True)
100

          
101

          
102

          
103
        task2 = PythonOperator(task_id='Task_2',
104

          
105
                                            python_callable=task_2,
106

          
107
                                            op_kwargs=None,
108

          
109
                                            provide_context=True)
110

          
111
        task2.set_upstream(task1)
112

          
113
        return new_dag
114

          
115

          
116

          
117
dag_id = "hello_airflow1"
118

          
119
globals()[dag_id] = create_dag(dag_id)



We have created two tasks(operators) here, task 2 will execute when task 1 completes, defined by task2.set_upstream(task1).

Task 1 pushes the data to task 2 for that we just need to set provide_context=True. The data will be pulled in task 2 using the task instance and the task id.

task_instance.xcom_pull(key=None, task_ids='Task_1')

That is all, now run docker-compose -f docker-compose.yml up and access the airflow on http://0.0.0.0:8080 



Thanks for reading!

Task (computing) Apache Airflow Docker (software)

Published at DZone with permission of Jyoti Sachdeva. See the original article here.

Opinions expressed by DZone contributors are their own.

Trending

  • Building a Flask Web Application With Docker: A Step-by-Step Guide
  • Observability Architecture: Financial Payments Introduction
  • Reactive Programming
  • Auditing Tools for Kubernetes

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: