{{announcement.body}}
{{announcement.title}}

Part 2: Airflow DAGs for Migrating PostgreSQL Data to Distributed SQL

DZone 's Guide to

Part 2: Airflow DAGs for Migrating PostgreSQL Data to Distributed SQL

In this post we are going to build a simple Airflow DAG that detects new records that have been inserted into PostgreSQL and migrate them to YugabyteDB.

· Database Zone ·
Free Resource

Welcome to part two of our series on how to integrate Apache Airflow and YugabyeDB. In part one we showed you how to get Airflow configured to use YuagbyteDB as a backend. In this second post we’ll show you how to build an Airflow workflow that will migrate data between PostgreSQL and YugabyteDB.

What’s YugabyteDB? It is an open source, high-performance distributed SQL database built on a scalable and fault-tolerant design inspired by Google Spanner. Yugabyte’s SQL API (YSQL) is PostgreSQL wire compatible.

About the Demo

In this post we are going to build a simple Airflow DAG – or a Directed Acyclic Graph – that detects new records that have been inserted into PostgreSQL and migrate them to YugabyteDB. In a subsequent post we’ll dive deeper into DAGs and create more complex YugabyteDB workflows.

We’ll cover the following steps in this post:

  • Install PostgreSQL
  • Configure GCP firewall rules
  • Configure Airflow database connections
  • Create an Airflow task file
  • Run the task
  • Monitor and verify the results

Prerequisites

Below is the environment that we’ll be using for the purposes of this blog.

Note: For the purposes of this demo, we aren’t creating a particularly secure deployment, instead we are focusing on demonstrating how to wire everything up with the least amount of fuss. In a production deployment you’ll want to enforce additional security measures throughout the stack.

Step 1: Deploy a VM for PostgreSQL

For the purposes of this demo, I have specified the following configuration for the Google Compute Engine VM that will be hosting my PostgreSQL server.

  • Name: postgresqlvm
  • Machine Type: n1-standard-2 (2vCPU, 7.5 GB memory)
  • OS: Ubuntu 18.04 LTS
  • Disk: 100 GB
  • Firewall: Allow HTTP/HTTPS traffic

Step 2: Install PostgreSQL on the VM

Install PostgreSQL

To install PostgreSQL on the postgresqlvm VM run the following commands:

Java
 




x


 
1
$ cd ~/
2
$ sudo apt-get install postgresql-contrib


Verify that PostgreSQL is installed by running the following command:

Java
 




xxxxxxxxxx
1


 
1
$ psql --version
2
 
          
3
psql (PostgreSQL) 10.12 (Ubuntu 10.12-0ubuntu0.18.04.1)


Configure PostgreSQL for remote access

By default, PostgreSQL doesn’t allow remote connections. In order for Airflow to communicate with PostgreSQL, we’ll need to change this setting.

To enable remote connections we’ll need to make a few tweaks to the pg_hba.conf file using the following steps:

Java
 




xxxxxxxxxx
1


 
1
$ cd  ../etc/postgresql/10/main/
2
$ sudo vim pg_hba.conf


Scroll down to the bottom of the file and add the following lines:

Java
 




xxxxxxxxxx
1


 
1
# IPv4 remote connections:
2
host   all   all 0.0.0.0/0   md5


Next, edit the postgresql.conf file.

Java
 




xxxxxxxxxx
1


 
1
$ sudo vim postgresql.conf


Scroll down to the line that begins with #listen_addresses = 'localhost' in the Connections and Authentication section.

Uncomment the line and replace localhost with *

Java
 




xxxxxxxxxx
1


 
1
listen_addresses = '*'


Finally, restart PostgreSQL.

Java
 




xxxxxxxxxx
1


 
1
$ sudo /etc/init.d/postgresql restart


Download the sample Northwind database and data

There are DDL and INSERT scripts we’ll need to download onto our PostgreSQL VM so we can build our demo.

Java
 




xxxxxxxxxx
1


 
1
$ wget https://raw.githubusercontent.com/yugabyte/yugabyte-db/master/sample/northwind_ddl.sql
2
 
          



Java
 




xxxxxxxxxx
1


1
$ wget https://raw.githubusercontent.com/yugabyte/yugabyte-db/master/sample/northwind_data.sql


Create a password for the default PostgreSQL user

For Apache Airflow to be able to connect to the PostgreSQL database, we need to create a password for the default postgres user which by default has none. Either execute the following script in your shell or use your favorite GUI tool.

Log into PostgreSQL.

Java
 




xxxxxxxxxx
1


 
1
$ sudo -u postgres psql


Assign postgres a password:

Java
 




xxxxxxxxxx
1


 
1
postgres=# ALTER USER postgres WITH PASSWORD 'password';
2
ALTER ROLE


Create the Northwind sample database

Create a database called northwind that we’ll be using for our demo.

Java
 




xxxxxxxxxx
1


 
1
postgres=# CREATE DATABASE northwind;



Java
 




xxxxxxxxxx
1


1
postgres=# \c northwind;


Create the Northwind objects and load the data

Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/jimmy/northwind_ddl.sql



Java
 




xxxxxxxxxx
1


1
northwind=# \i /home/jimmy/northwind_data.sql


You should now see 14 tables loaded with data.

At this point we have PostgreSQL running on a Google Compute Engine virtual machine with remote connections enabled and the Northwind sample database built.

Step 3: Install Northwind on YugabyteDB

Log into YugabyteDB

Java
 




xxxxxxxxxx
1


 
1
$ kubectl --namespace yb-demo exec -it yb-tserver-0 -- /home/yugabyte/bin/ysqlsh -h yb-tserver-0


Create the Northwind sample database

Create a database called northwind that we’ll be using for our demo.

Java
 




xxxxxxxxxx
1


 
1
yugabyte=# CREATE DATABASE northwind;



Java
 




xxxxxxxxxx
1


1
yugabyte=# \c northwind;


Create the Northwind objects and load the data

Java
 




xxxxxxxxxx
1


 
1
northwind=# \i /home/yugabyte/share/northwind_ddl.sql



Java
 




xxxxxxxxxx
1


1
northwind=# \i /home/yugabyte/share/northwind_data.sql


You should now see 14 tables populated with data.

Step 4: Setup Ingress and Egress Firewall Rules for PostgreSQL and YugabyteDB

The next two steps involve opening up ingress and egress points for PostgreSQL and YugabyteDB in GCP. This can be accomplished in the same way we illustrated in the first post, part 1 step 2, where we configured the Airflow networking rules. Inside of the GCP Console, navigate to VPC Network > Firewall Rules.

PostgreSQL – 2 rules

  • Names: postgres-ingress and postgres-egress
  • Direction of Traffic: ingress and egress
  • Targets: All instances on the network
  • Source IP ranges: 0.0.0.0/0. <External GCP IP of Airflow VM>/32
  • Protocols and ports: tcp 5432

YugabyteDB – 2 rules

  • Names: yugabyte-ingress and yugabyte-egress
  • Direction of Traffic: ingress and egress
  • Targets: All instances on the network
  • Source & Destination IP ranges: 0.0.0.0/0. <External GCP IP of Airflow VM>/32
  • Protocols and ports: tcp 5433

Step 5: Add Airflow Connections to Postgres and YugabyteDB

To add the connection configuration that Apache Airflow will use to connect to the PostgreSQL and YugabyteDB databases, go to Admin > Connections in the Airflow UI.

Select Create.

Add an airflow_postgres connection with the following configuration:

  • Conn Id: airflow_postgres
  • Conn Type: Postgres
  • Host: <postgresqlvm’s External IP>
  • Schema: northwind
  • Login: postgres
  • Password: password
  • Port: 5432

Repeat the process to add an airflow_yugabyte connection with the following configuration:

  • Conn Id: airflow_yugabyte
  • Conn Type: Postgres
  • Host: <yugabytedbgke’s External IP>
  • Schema: northwind
  • Login: yugabyte
  • Password: password
  • Port: 5433

Step 6: Create an Apache Airflow Task File to Migrate Data

Airflow task files are written in Python and need to be placed in ${AIRFLOW_ HOME} /dags. To create a Python file called db_migration.py by running the following commands:

Java
 




xxxxxxxxxx
1


 
1
$ mkdir ${AIRFLOW_HOME}/dags && cd ${AIRFLOW_HOME}/dags 
2
$ touch db_migration.py
3
$ vim db_migration.py


Add the following code to the db_migration.py file:

Java
 




xxxxxxxxxx
1
28


 
1
from datetime import datetime, timedelta
2
from airflow import DAG
3
from airflow.operators.postgres_operator import PostgresOperator
4
from airflow.hooks.postgres_hook import PostgresHook
5
dag_params = {
6
    'dag_id': 'postgres_yugabyte_migration',
7
    'start_date':datetime(2020, 4, 20),
8
    'schedule_interval': timedelta(seconds=60)
9
}
10
with DAG(**dag_params) as dag:
11
    src = PostgresHook(postgres_conn_id='airflow_postgres')
12
    dest = PostgresHook(postgres_conn_id='airflow_yugabyte')
13
    src_conn = src.get_conn()
14
    cursor = src_conn.cursor()
15
    dest_conn = dest.get_conn()
16
    dest_cursor = dest_conn.cursor()
17
    dest_cursor.execute("SELECT MAX(product_id) FROM products;")
18
    product_id = dest_cursor.fetchone()[0]
19
    if product_id is None:
20
        product_id = 0
21
    cursor.execute("SELECT * FROM products WHERE product_id > %s", [product_id])
22
    dest.insert_rows(table="products", rows=cursor)
23
    dest_cursor.execute("SELECT MAX(order_id) FROM orders;")
24
    order_id = dest_cursor.fetchone()[0]
25
    if order_id is None:
26
        order_id = 0
27
    cursor.execute("SELECT * FROM orders WHERE order_id > %s", [order_id])
28
    dest.insert_rows(table="orders", rows=cursor)


The DAG above finds the new product_id and order_id’s in PostgreSQL and then updates the same product and order tables in YugabyteDB with the rows greater than that maximum id. The job above is scheduled to run every minute starting on today’s date.

Step 7: Verifying and Scheduling the Task Using the Airflow Web UI

It might take a minute or two to populate, but the task should now be available under the DAGs tab.

Manually execute the task by clicking on the execute button as show below:

You can also verify that the task has been executed by looking at the log files located at ${AIRFLOW_HOME}/logs/scheduler/latest/db_migration.py.log

Finally, enable the one minute schedule of the postgres_yugabyte_migration task by toggling the On button as show below:

Step 8: Verifying Updates and Monitoring DAGs

Verify the number of rows in the product and order tables in both the PostgreSQL and YugabyteDB with a simple SELECT COUNT(*). You should see the following:

  • Products: 77 rows
  • Orders: 830 rows

Log into the PostgreSQL database and update the products and orders tables with two additional rows in each table.

Java
 




xxxxxxxxxx
1


 
1
INSERT INTO
2
   PUBLIC.products (product_id, product_name, supplier_id, category_id, quantity_per_unit, unit_price, units_in_stock, units_on_order, reorder_level, discontinued) 
3
VALUES
4
  (80, 'Salty Dog Chips', 2, 2, '6 - 12 oz bags', 22, 3, 50, 20, 0),
5
  (81, 'Neals Dog Treats', 2, 2, '24 - 8 oz bags', 17, 4, 100, 20, 0);




Java
 




xxxxxxxxxx
1


 
1
INSERT INTO
2
   public.orders (order_id, customer_id, employee_id, order_date, required_date, shipped_date, ship_via, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) 
3
VALUES
4
  (12002, 'BLONP', 4, '1996-11-22', '1996-12-20', '1996-12-02', 3, 131.69999695, 'Blondel père et fils', '24, place Kléber', 'Strasbourg', NULL, '67000', 'France'),
5
  (12003, 'LEHMS', 8, '1997-05-12', '1997-06-09', '1997-05-14', 2, 27.94000053, 'Lehmanns Marktstand', 'Magazinweg 7', 'Frankfurt a.M.', NULL, '60528', 'Germany') ;


After the next scheduled run of the task, you can query YugabyteDB and find both the products and orders tables and verify that they have been updated with the new records.

  • Products: 79 rows
  • Orders: 832 rows

As the DAG continues to run, you can insert additional data on the PostgreSQL side, have Airflow move the data to YugabyteDB, and track the runs in the Airflow UI by going to Browse > Dag Runs.

What’s Next?

That’s it! If you have worked through the steps in part one and part two (this post) of this series, you now have the following deployed:

  • Airflow running on a Google Compute Engine VM with a YugabyteDB backend running on Google Kubernetes Engine
  • PostgreSQL running on a Google Compute Engine VM connected to Airflow
  • A DAG that runs every minute that detects updates to PostgreSQL and migrates them to YugabyteDB

We are just getting started! Stay tuned for an upcoming blog post where we will dive deeper into creating more complex DAGs that integrate Airflow, YugabyteDB, and a variety of other cloud native technologies.

Topics:
apache airflow, distributed sql, distributed sql databases, postgres database, postgresql, yugabyte, yugabyte db, yugabytedb

Published at DZone with permission of Jimmy Guerrero . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}