DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • The Generic Way To Convert Between Java and PostgreSQL Enums
  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years
  • How to Build a Full-Stack App With Next.js, Prisma, Postgres, and Fastify
  • How to Store Text in PostgreSQL: Tips, Tricks, and Traps

Trending

  • Unlocking the Potential of Apache Iceberg: A Comprehensive Analysis
  • Unlocking AI Coding Assistants Part 3: Generating Diagrams, Open API Specs, And Test Data
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning
  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  1. DZone
  2. Data Engineering
  3. Databases
  4. Part 2: Airflow DAGs for Migrating PostgreSQL Data to Distributed SQL

Part 2: Airflow DAGs for Migrating PostgreSQL Data to Distributed SQL

In this post we are going to build a simple Airflow DAG that detects new records that have been inserted into PostgreSQL and migrate them to YugabyteDB.

By 
Jimmy Guerrero user avatar
Jimmy Guerrero
·
Jun. 03, 20 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
12.5K Views

Join the DZone community and get the full member experience.

Join For Free

Welcome to part two of our series on how to integrate Apache Airflow and YugabyeDB. In part one we showed you how to get Airflow configured to use YuagbyteDB as a backend. In this second post we’ll show you how to build an Airflow workflow that will migrate data between PostgreSQL and YugabyteDB.

What’s YugabyteDB? It is an open source, high-performance distributed SQL database built on a scalable and fault-tolerant design inspired by Google Spanner. Yugabyte’s SQL API (YSQL) is PostgreSQL wire compatible.

About the Demo

In this post we are going to build a simple Airflow DAG – or a Directed Acyclic Graph – that detects new records that have been inserted into PostgreSQL and migrate them to YugabyteDB. In a subsequent post we’ll dive deeper into DAGs and create more complex YugabyteDB workflows.

We’ll cover the following steps in this post:

  • Install PostgreSQL
  • Configure GCP firewall rules
  • Configure Airflow database connections
  • Create an Airflow task file
  • Run the task
  • Monitor and verify the results

Prerequisites

Below is the environment that we’ll be using for the purposes of this blog.

  • YugabyteDB – version 2.1.6
  • Apache Airflow – version 1.10.10
  • PostgreSQL – version 10.12
  • A Google Cloud Platform account

Note: For the purposes of this demo, we aren’t creating a particularly secure deployment, instead we are focusing on demonstrating how to wire everything up with the least amount of fuss. In a production deployment you’ll want to enforce additional security measures throughout the stack.

Step 1: Deploy a VM for PostgreSQL

For the purposes of this demo, I have specified the following configuration for the Google Compute Engine VM that will be hosting my PostgreSQL server.

  • Name: postgresqlvm
  • Machine Type: n1-standard-2 (2vCPU, 7.5 GB memory)
  • OS: Ubuntu 18.04 LTS
  • Disk: 100 GB
  • Firewall: Allow HTTP/HTTPS traffic

Step 2: Install PostgreSQL on the VM

Install PostgreSQL

To install PostgreSQL on the postgresqlvm VM run the following commands:

Java
 




x


 
1
$ cd ~/
2
$ sudo apt-get install postgresql-contrib


Verify that PostgreSQL is installed by running the following command:

Java
 




xxxxxxxxxx
1


 
1
$ psql --version
2
 
          
3
psql (PostgreSQL) 10.12 (Ubuntu 10.12-0ubuntu0.18.04.1)


Configure PostgreSQL for remote access

By default, PostgreSQL doesn’t allow remote connections. In order for Airflow to communicate with PostgreSQL, we’ll need to change this setting.

To enable remote connections we’ll need to make a few tweaks to the pg_hba.conf file using the following steps:

Java
 




xxxxxxxxxx
1


 
1
$ cd  ../etc/postgresql/10/main/
2
$ sudo vim pg_hba.conf


Scroll down to the bottom of the file and add the following lines:

Java
 




xxxxxxxxxx
1


 
1
# IPv4 remote connections:
2
host   all   all 0.0.0.0/0   md5


Next, edit the postgresql.conf file.

Java
 




xxxxxxxxxx
1


 
1
$ sudo vim postgresql.conf


Scroll down to the line that begins with #listen_addresses = 'localhost' in the Connections and Authentication section.

Uncomment the line and replace localhost with *

Java
 




xxxxxxxxxx
1


 
1
listen_addresses = '*'


Finally, restart PostgreSQL.

Java
 




xxxxxxxxxx
1


 
1
$ sudo /etc/init.d/postgresql restart


Download the sample Northwind database and data

There are DDL and INSERT scripts we’ll need to download onto our PostgreSQL VM so we can build our demo.

Java
 




xxxxxxxxxx
1


 
1
$ wget https://raw.githubusercontent.com/yugabyte/yugabyte-db/master/sample/northwind_ddl.sql
2
 
          



Java
 




xxxxxxxxxx
1


 
1
$ wget https://raw.githubusercontent.com/yugabyte/yugabyte-db/master/sample/northwind_data.sql


Create a password for the default PostgreSQL user

For Apache Airflow to be able to connect to the PostgreSQL database, we need to create a password for the default postgres user which by default has none. Either execute the following script in your shell or use your favorite GUI tool.

Log into PostgreSQL.

Java
 




xxxxxxxxxx
1


 
1
$ sudo -u postgres psql


Assign postgres a password:

Java
 




xxxxxxxxxx
1


 
1
postgres=# ALTER USER postgres WITH PASSWORD 'password';
2
ALTER ROLE


Create the Northwind sample database

Create a database called northwind that we’ll be using for our demo.

Java
 




xxxxxxxxxx
1


 
1
postgres=# CREATE DATABASE northwind;



Java
 




xxxxxxxxxx
1


 
1
postgres=# \c northwind;


Create the Northwind objects and load the data

Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/jimmy/northwind_ddl.sql



Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/jimmy/northwind_data.sql


You should now see 14 tables loaded with data.

At this point we have PostgreSQL running on a Google Compute Engine virtual machine with remote connections enabled and the Northwind sample database built.

Step 3: Install Northwind on YugabyteDB

Log into YugabyteDB

Java
 




xxxxxxxxxx
1


 
1
$ kubectl --namespace yb-demo exec -it yb-tserver-0 -- /home/yugabyte/bin/ysqlsh -h yb-tserver-0


Create the Northwind sample database

Create a database called northwind that we’ll be using for our demo.

Java
 




xxxxxxxxxx
1


 
1
yugabyte=# CREATE DATABASE northwind;



Java
 




xxxxxxxxxx
1


 
1
yugabyte=# \c northwind;


Create the Northwind objects and load the data

Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/yugabyte/share/northwind_ddl.sql



Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/yugabyte/share/northwind_data.sql


You should now see 14 tables populated with data.

Step 4: Setup Ingress and Egress Firewall Rules for PostgreSQL and YugabyteDB

The next two steps involve opening up ingress and egress points for PostgreSQL and YugabyteDB in GCP. This can be accomplished in the same way we illustrated in the first post, part 1 step 2, where we configured the Airflow networking rules. Inside of the GCP Console, navigate to VPC Network > Firewall Rules.

PostgreSQL – 2 rules

  • Names: postgres-ingress and postgres-egress
  • Direction of Traffic: ingress and egress
  • Targets: All instances on the network
  • Source IP ranges: 0.0.0.0/0. <External GCP IP of Airflow VM>/32
  • Protocols and ports: tcp 5432

YugabyteDB – 2 rules

  • Names: yugabyte-ingress and yugabyte-egress
  • Direction of Traffic: ingress and egress
  • Targets: All instances on the network
  • Source & Destination IP ranges: 0.0.0.0/0. <External GCP IP of Airflow VM>/32
  • Protocols and ports: tcp 5433

Step 5: Add Airflow Connections to Postgres and YugabyteDB

To add the connection configuration that Apache Airflow will use to connect to the PostgreSQL and YugabyteDB databases, go to Admin > Connections in the Airflow UI.

Select Create.

Add an airflow_postgres connection with the following configuration:

  • Conn Id: airflow_postgres
  • Conn Type: Postgres
  • Host: <postgresqlvm’s External IP>
  • Schema: northwind
  • Login: postgres
  • Password: password
  • Port: 5432

Repeat the process to add an airflow_yugabyte connection with the following configuration:

  • Conn Id: airflow_yugabyte
  • Conn Type: Postgres
  • Host: <yugabytedbgke’s External IP>
  • Schema: northwind
  • Login: yugabyte
  • Password: password
  • Port: 5433

Step 6: Create an Apache Airflow Task File to Migrate Data

Airflow task files are written in Python and need to be placed in ${AIRFLOW_ HOME} /dags. To create a Python file called db_migration.py by running the following commands:

Java
 




xxxxxxxxxx
1


 
1
$ mkdir ${AIRFLOW_HOME}/dags && cd ${AIRFLOW_HOME}/dags 
2
$ touch db_migration.py
3
$ vim db_migration.py


Add the following code to the db_migration.py file:

Java
 




xxxxxxxxxx
1
28


 
1
from datetime import datetime, timedelta
2
from airflow import DAG
3
from airflow.operators.postgres_operator import PostgresOperator
4
from airflow.hooks.postgres_hook import PostgresHook
5
dag_params = {
6
    'dag_id': 'postgres_yugabyte_migration',
7
    'start_date':datetime(2020, 4, 20),
8
    'schedule_interval': timedelta(seconds=60)
9
}
10
with DAG(**dag_params) as dag:
11
    src = PostgresHook(postgres_conn_id='airflow_postgres')
12
    dest = PostgresHook(postgres_conn_id='airflow_yugabyte')
13
    src_conn = src.get_conn()
14
    cursor = src_conn.cursor()
15
    dest_conn = dest.get_conn()
16
    dest_cursor = dest_conn.cursor()
17
    dest_cursor.execute("SELECT MAX(product_id) FROM products;")
18
    product_id = dest_cursor.fetchone()[0]
19
    if product_id is None:
20
        product_id = 0
21
    cursor.execute("SELECT * FROM products WHERE product_id > %s", [product_id])
22
    dest.insert_rows(table="products", rows=cursor)
23
    dest_cursor.execute("SELECT MAX(order_id) FROM orders;")
24
    order_id = dest_cursor.fetchone()[0]
25
    if order_id is None:
26
        order_id = 0
27
    cursor.execute("SELECT * FROM orders WHERE order_id > %s", [order_id])
28
    dest.insert_rows(table="orders", rows=cursor)


The DAG above finds the new product_id and order_id’s in PostgreSQL and then updates the same product and order tables in YugabyteDB with the rows greater than that maximum id. The job above is scheduled to run every minute starting on today’s date.

Step 7: Verifying and Scheduling the Task Using the Airflow Web UI

It might take a minute or two to populate, but the task should now be available under the DAGs tab.

Manually execute the task by clicking on the execute button as show below:

You can also verify that the task has been executed by looking at the log files located at ${AIRFLOW_HOME}/logs/scheduler/latest/db_migration.py.log

Finally, enable the one minute schedule of the postgres_yugabyte_migration task by toggling the On button as show below:

Step 8: Verifying Updates and Monitoring DAGs

Verify the number of rows in the product and order tables in both the PostgreSQL and YugabyteDB with a simple SELECT COUNT(*). You should see the following:

  • Products: 77 rows
  • Orders: 830 rows

Log into the PostgreSQL database and update the products and orders tables with two additional rows in each table.

Java
 




xxxxxxxxxx
1


 
1
INSERT INTO
2
   PUBLIC.products (product_id, product_name, supplier_id, category_id, quantity_per_unit, unit_price, units_in_stock, units_on_order, reorder_level, discontinued) 
3
VALUES
4
  (80, 'Salty Dog Chips', 2, 2, '6 - 12 oz bags', 22, 3, 50, 20, 0),
5
  (81, 'Neals Dog Treats', 2, 2, '24 - 8 oz bags', 17, 4, 100, 20, 0);




Java
 




xxxxxxxxxx
1


 
1
INSERT INTO
2
   public.orders (order_id, customer_id, employee_id, order_date, required_date, shipped_date, ship_via, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) 
3
VALUES
4
  (12002, 'BLONP', 4, '1996-11-22', '1996-12-20', '1996-12-02', 3, 131.69999695, 'Blondel père et fils', '24, place Kléber', 'Strasbourg', NULL, '67000', 'France'),
5
  (12003, 'LEHMS', 8, '1997-05-12', '1997-06-09', '1997-05-14', 2, 27.94000053, 'Lehmanns Marktstand', 'Magazinweg 7', 'Frankfurt a.M.', NULL, '60528', 'Germany') ;


After the next scheduled run of the task, you can query YugabyteDB and find both the products and orders tables and verify that they have been updated with the new records.

  • Products: 79 rows
  • Orders: 832 rows

As the DAG continues to run, you can insert additional data on the PostgreSQL side, have Airflow move the data to YugabyteDB, and track the runs in the Airflow UI by going to Browse > Dag Runs.

What’s Next?

That’s it! If you have worked through the steps in part one and part two (this post) of this series, you now have the following deployed:

  • Airflow running on a Google Compute Engine VM with a YugabyteDB backend running on Google Kubernetes Engine
  • PostgreSQL running on a Google Compute Engine VM connected to Airflow
  • A DAG that runs every minute that detects updates to PostgreSQL and migrates them to YugabyteDB

We are just getting started! Stay tuned for an upcoming blog post where we will dive deeper into creating more complex DAGs that integrate Airflow, YugabyteDB, and a variety of other cloud native technologies.

PostgreSQL Database Data (computing) Java (programming language) sql Virtual Machine YugabyteDB Google Compute Engine Apache Airflow Connection (dance)

Published at DZone with permission of Jimmy Guerrero. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • The Generic Way To Convert Between Java and PostgreSQL Enums
  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years
  • How to Build a Full-Stack App With Next.js, Prisma, Postgres, and Fastify
  • How to Store Text in PostgreSQL: Tips, Tricks, and Traps

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!