DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Using Open Source for Data Integration and Automated Synchronizations
  • Apache Airflow Configuration and Tuning
  • Auto Remediation of GuardDuty Findings for a Compromised ECS Cluster in AWSVPC Network Mode
  • Containerize Gradle Apps and Deploy to Kubernetes With JKube Kubernetes Gradle Plugin

Trending

  • Docker Base Images Demystified: A Practical Guide
  • A Developer's Guide to Mastering Agentic AI: From Theory to Practice
  • How Large Tech Companies Architect Resilient Systems for Millions of Users
  • Beyond Code Coverage: A Risk-Driven Revolution in Software Testing With Machine Learning
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Running Apache Airflow DAG with Docker

Running Apache Airflow DAG with Docker

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

By 
Jyoti Sachdeva user avatar
Jyoti Sachdeva
·
Aug. 07, 20 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
13.5K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.

What is Airflow?

Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures.

Before going forward, let's get familiar with the terms:

  • Task or Operator: A defined unit of work.
  • Task instance: An individual run of a single task. The states could be running, success, failed, skipped, and up for retry.
  • DAG (Directed Acyclic Graph): A set of tasks with an execution order.
  • DAG Run: Individual DAG run.
  • Web Server: It is the UI of airflow, it also allows us to manage users, roles, and different configurations for the Airflow setup.
  • Scheduler: Schedules the jobs or orchestrates the tasks. It uses the DAGs object to decide what tasks need to be run, when, and where.
  • Executor: Executes the tasks. There are different types of executors: 
  • Sequential: Runs one task instance at a time.
  • Local: Runs tasks by spawning processes in a controlled fashion in different modes.
  • Celery:  An asynchronous task queue/job queue based on distributed message passing. For CeleryExecutor, one needs to set up a queue (Redis, RabbitMQ or any other task broker supported by Celery) on which all the celery workers running keep on polling for any new tasks to run
  • Kubernetes:  Provides a way to run Airflow tasks on Kubernetes, Kubernetes launch a new pod for each task.
  • Metadata Database: Stores the Airflow states. Airflow uses SqlAlchemy and Object Relational Mapping (ORM) written in Python to connect to the metadata database.

Now that we are familiar with the terms, let's get started.

Any time, you run into permission issue execute:

Shell
 




xxxxxxxxxx
1


 
1
chmod 777 <file>



Let's create the scripts folder and create a script called airflow-entrypoint.sh for running initdb and webserver.

scripts/airflow-entrypoint.sh

Shell
 




xxxxxxxxxx
1


 
1
#!/usr/bin/env bash
2

          
3
airflow upgradedb
4

          
5
airflow webserver



Let's now create the docker-compose file:

docker-compose.yml

Plain Text
 




xxxxxxxxxx
1
102
99


 
1
version: "2.1"
2

          
3
services:
4

          
5
  postgres:
6

          
7
    image: postgres:12
8

          
9
    environment:
10

          
11
      - POSTGRES_USER=airflow
12

          
13
      - POSTGRES_PASSWORD=airflow
14

          
15
      - POSTGRES_DB=airflow
16

          
17
    ports:
18

          
19
      - "5433:5432"
20

          
21
  scheduler:
22

          
23
    image: apache/airflow
24

          
25
    restart: always
26

          
27
    depends_on:
28

          
29
      - postgres
30

          
31
      - webserver
32

          
33
    env_file:
34

          
35
      - .env
36

          
37
    ports:
38

          
39
      - "8793:8793"
40

          
41
    volumes:
42

          
43
      - ./dags:/opt/airflow/dags
44

          
45
      - ./airflow-logs:/opt/airflow/logs
46

          
47
    command: scheduler
48

          
49
    healthcheck:
50

          
51
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
52

          
53
      interval: 30s
54

          
55
      timeout: 30s
56

          
57
      retries: 3
58

          
59
  webserver:
60

          
61
    image: apache/airflow
62

          
63
    hostname: webserver
64

          
65
    restart: always
66

          
67
    depends_on:
68

          
69
      - postgres
70

          
71
    env_file:
72

          
73
      - .env
74

          
75
    volumes:
76

          
77
      - ./dags:/opt/airflow/dags
78

          
79
      - ./scripts:/opt/airflow/scripts
80

          
81
      - ./airflow-logs:/opt/airflow/logs
82

          
83
    ports:
84

          
85
      - "8080:8080"
86

          
87
    entrypoint: ./scripts/airflow-entrypoint.sh
88

          
89
    healthcheck:
90

          
91
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
92

          
93
      interval: 30s
94

          
95
      timeout: 30s
96

          
97
      retries: 32
98

          



Now, create a .env file for environment variables.

.env

Plain Text
 




xxxxxxxxxx
1
11


 
1
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
2

          
3
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
4

          
5
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
6

          
7
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
8

          
9
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
10

          
11
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=10



At last, create a folder called dags and a file hello-airflow.py

hello-airflow.py

Python
 




xxxxxxxxxx
1
119


 
1
import codecs
2

          
3
import logging
4

          
5
from datetime import timedelta
6

          
7
from airflow import DAG
8

          
9
from airflow.operators.python_operator import PythonOperator
10

          
11
from airflow.utils import dates
12

          
13

          
14

          
15
logging.basicConfig(format="%(name)s-%(levelname)s-%(asctime)s-%(message)s", level=logging.INFO)
16

          
17

          
18

          
19
logger = logging.getLogger(__name__)
20

          
21
logger.setLevel(logging.INFO)
22

          
23

          
24

          
25
def create_dag(dag_id):
26

          
27
    default_args = {
28

          
29
        "owner": "jyoti",
30

          
31
        "description": (
32

          
33
            "DAG to explain airflow concepts"
34

          
35
        ),
36

          
37
        "depends_on_past": False,
38

          
39
        "start_date": dates.days_ago(1),
40

          
41
        "retries": 1,
42

          
43
        "retry_delay": timedelta(minutes=1),
44

          
45
        "provide_context": True,
46

          
47
    }
48

          
49

          
50

          
51
    new_dag = DAG(
52

          
53
        dag_id,
54

          
55
        default_args=default_args,
56

          
57
        schedule_interval=timedelta(minutes=5),
58

          
59
    )
60

          
61

          
62

          
63
    def task_1(**kwargs):
64

          
65
        logger.info('=====Executing Task 1=============')
66

          
67
        return kwargs['message']
68

          
69

          
70

          
71
    def task_2(**kwargs):
72

          
73
        logger.info('=====Executing Task 2=============')
74

          
75
        task_instance = kwargs['ti']
76

          
77
        result = task_instance.xcom_pull(key=None, task_ids='Task_1')
78

          
79
        logger.info('Extracted the value from task 1')
80

          
81
        logger.info(result)
82

          
83

          
84

          
85
    with new_dag:
86

          
87
        task1 = PythonOperator(task_id='Task_1',
88

          
89
                                                    python_callable=task_1,
90

          
91
                                                    op_kwargs=
92

          
93
                                                    {
94

          
95
                                                        'message': 'hellow airflow'
96

          
97
                                                    },
98

          
99
                                                    provide_context=True)
100

          
101

          
102

          
103
        task2 = PythonOperator(task_id='Task_2',
104

          
105
                                            python_callable=task_2,
106

          
107
                                            op_kwargs=None,
108

          
109
                                            provide_context=True)
110

          
111
        task2.set_upstream(task1)
112

          
113
        return new_dag
114

          
115

          
116

          
117
dag_id = "hello_airflow1"
118

          
119
globals()[dag_id] = create_dag(dag_id)



We have created two tasks(operators) here, task 2 will execute when task 1 completes, defined by task2.set_upstream(task1).

Task 1 pushes the data to task 2 for that we just need to set provide_context=True. The data will be pulled in task 2 using the task instance and the task id.

task_instance.xcom_pull(key=None, task_ids='Task_1')

That is all, now run docker-compose -f docker-compose.yml up and access the airflow on http://0.0.0.0:8080 



Thanks for reading!

Task (computing) Apache Airflow Docker (software)

Published at DZone with permission of Jyoti Sachdeva. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using Open Source for Data Integration and Automated Synchronizations
  • Apache Airflow Configuration and Tuning
  • Auto Remediation of GuardDuty Findings for a Compromised ECS Cluster in AWSVPC Network Mode
  • Containerize Gradle Apps and Deploy to Kubernetes With JKube Kubernetes Gradle Plugin

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!