DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Data Engineering
  3. Data
  4. CLI for Indexing Data From Postgres to Elasticsearch

CLI for Indexing Data From Postgres to Elasticsearch

How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

Avi Aryan user avatar by
Avi Aryan
·
Feb. 05, 18 · Tutorial
Like (4)
Save
Tweet
Share
21.86K Views

Join the DZone community and get the full member experience.

Join For Free

ElasticSearch is fantastic for indexing and filtering data. But hey, you have your data on a Postgres DB in production. How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

I am going to answer these questions in this post. To start off, yes, it is indeed possible. We at appbase.io have made an awesome CLI tool called ABC, which will allow you to do this with a single command.

abc import --src_type=postgres --src_uri=<uri> <elasticsearch_uri>

That's it. Seriously, this is all you need to sync a Postgres database to an ElasticSearch index. Here's a video showing the process.

The Steps

The first step is to install ABC if you have not done so already. So, go to the GitHub releases page for ABC and download the most recent version. It's a single no-dependancy binary so put it anywhere you like. We recommended putting it inside a PATH directory so that it can be access from anywhere in the terminal.

Ensure that ABC is working by running the following command.

abc version

Now, let's take a Postgres database and we are going to sync it to ElasticSearch index hosted on Appbase.io.

First, we are going to create a database called 'users'.

CREATE USER test_user with password 'test_pass';
CREATE DATABASE users with OWNER test_user;
\c users;

Next, we are going to create a table called 'users' inside the database 'users'.

CREATE TABLE users (
    email varchar(500) NOT NULL,
    name varchar(100) NOT NULL,
    bio text,
    PRIMARY KEY(email)
);

After that, we will add some sample data to it.

insert into users values ('foo@bar.com', 'foo', 'bar');
insert into users values ('zak@barker.com', 'zak', 'ceo');

The table looks like as follows now:

The Postgres test source is now complete. Its URL is:

postgresql://test_user:test_pass@127.0.0.1:5432/users

Next, we are going to create the sink ElasticSearch index. We go to appbase.io and create a new app called abcpostgrestest. The complete URL to this index looks like the following.

https://USER:PASS@scalr.api.appbase.io/abcpostgrestest

So now, we have both the source and the sink ready. It's time for some ABC magic. Using this source and sink, we can build the import command. It will be as follows:

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Once you run this command, you should see that the command will finish in some time with no errors. Now if you visit appbaseio dashboard, you can see that the data has been transferred to the target ElasticSearch index.

Voila. Everything works. The data has been transferred to ElasticSearch and that too without doing anything at all. Next, we will see how to make ABC listen to the changes in the Postgres database.

Indexing Real-Time Data Changes From Postgres

If you are using Postgres as your production database system, there are good chances that your data is constantly changing. How to sync the Elasticsearch index with all the changes?

ABC has a nifty tail mode that allows synchronizing the Postgres database in real-time to an Elasticsearch index. It uses the replication slot feature of Postgres to be able to do this.

It can be enabled by passing a --tail switch.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Now, run the import command again with the tail option. You might see an error with this text:

must be superuser or replication role to use replication slots

Don't panic. This happens because you don't have replication setup on your Postgres database which is required for the tailing feature. So, you will now have to make the database user as superuser. (Read ALTER ROLE.)

psql

After running the above command, you should be inside the psql shell. Now, run the following command (where 'test_user' is your database username).

psql> ALTER ROLE test_user WITH SUPERUSER;

Once that is done, you need to create some replication slots. Open postgresql.conf in the text editor and change the wal_level and max_replication_slots as follows. You will have to restart your Postgres server after this. This step enables the feature of replication slots.

wal_level=logical
max_replication_slots=1

After this, you will actually create a replication slot. Go back in the psql shell, connect to the source database, and run the following command.

\c users;
select * from pg_create_logical_replication_slot('users_replication_slot', 'test_decoding');
SELECT * FROM pg_replication_slots;

You should see database users in the replication slot row now. This means that the replication slot is properly setup.

Now let's try the tail again.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" --replication_slot="users_replication_slot" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Run the import again. It should work this time. 

Also, notice that the import process won't exit when finished now. It will keep on running and will listen to the changes. Let's add a new entry to the database and see if it reflects on the appbaseio dashboard.

\c users;
insert into users values ('tony@stark.com', 'tony stark', 'iron man');

And yes, it works. You should see a new entry in the app dashboard when you hit reload.

Try making some more changes and all of them will be reflected on the appbaseio based Elasticsearch cluster. ��

Transforming Data Before Indexing Into Elasticsearch

There are times when you don't need the data to go as it is from source to the sink. You might like to change the target type name (i.e. users to accounts) or you might like to remove certain fields (i.e. bio) or create new fields. For all this, we have the transforms feature in ABC. It can be used by the transform_file parameter.

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@127.0.0.1:5432/users" --transform_file="transform_file.js" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

The transform_file parameter takes the path to the transform file. That file in turn contains the JavaScript transforms that should be applied to the pipeline. Let's take the contents of transform_file.js as follows.

t.Source("source", source, "/.*/")
 .Transform(omit({"fields":["bio"]}))
 .Save("sink", sink, "/.*/")

In the above transform, you can see that we are going to omit the bio field from the data transfer. Now when we run the new import command, we should have the following result.

As you can see, the bio field was omitted when data reached the sink. More documentation on transform file can be found on GitHub. It supports lots of inbuilt functions like omit and even supports running custom JavaScript code as a transform. It's a good idea to explore its documentation.

Further Reading

ABC's README is a good place to start if you want to learn more about the application. You can also have a look at the Postgres adaptor docs. Furthermore, you may star the repo on GitHub and watch it to stay tuned for updates.

Data (computing) PostgreSQL Elasticsearch Command-line interface Database

Published at DZone with permission of Avi Aryan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Kotlin Is More Fun Than Java And This Is a Big Deal
  • What Should You Know About Graph Database’s Scalability?
  • Key Considerations When Implementing Virtual Kubernetes Clusters
  • A Complete Guide to AngularJS Testing

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: