CLI for Indexing Data From Postgres to Elasticsearch

DZone 's Guide to

CLI for Indexing Data From Postgres to Elasticsearch

How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

· Database Zone ·
Free Resource

ElasticSearch is fantastic for indexing and filtering data. But hey, you have your data on a Postgres DB in production. How do you copy all data from Postgres to Elastic? Even better, how do you keep both the data stores in sync? Is it even possible?

I am going to answer these questions in this post. To start off, yes, it is indeed possible. We at appbase.io have made an awesome CLI tool called ABC, which will allow you to do this with a single command.

abc import --src_type=postgres --src_uri=<uri> <elasticsearch_uri>

That's it. Seriously, this is all you need to sync a Postgres database to an ElasticSearch index. Here's a video showing the process.

The Steps

The first step is to install ABC if you have not done so already. So, go to the GitHub releases page for ABC and download the most recent version. It's a single no-dependancy binary so put it anywhere you like. We recommended putting it inside a PATH directory so that it can be access from anywhere in the terminal.

Ensure that ABC is working by running the following command.

abc version

Now, let's take a Postgres database and we are going to sync it to ElasticSearch index hosted on Appbase.io.

First, we are going to create a database called 'users'.

CREATE USER test_user with password 'test_pass';
CREATE DATABASE users with OWNER test_user;
\c users;

Next, we are going to create a table called 'users' inside the database 'users'.

    email varchar(500) NOT NULL,
    name varchar(100) NOT NULL,
    bio text,
    PRIMARY KEY(email)

After that, we will add some sample data to it.

insert into users values ('foo@bar.com', 'foo', 'bar');
insert into users values ('zak@barker.com', 'zak', 'ceo');

The table looks like as follows now:

The Postgres test source is now complete. Its URL is:


Next, we are going to create the sink ElasticSearch index. We go to appbase.io and create a new app called abcpostgrestest. The complete URL to this index looks like the following.


So now, we have both the source and the sink ready. It's time for some ABC magic. Using this source and sink, we can build the import command. It will be as follows:

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Once you run this command, you should see that the command will finish in some time with no errors. Now if you visit appbaseio dashboard, you can see that the data has been transferred to the target ElasticSearch index.

Voila. Everything works. The data has been transferred to ElasticSearch and that too without doing anything at all. Next, we will see how to make ABC listen to the changes in the Postgres database.

Indexing Real-Time Data Changes From Postgres

If you are using Postgres as your production database system, there are good chances that your data is constantly changing. How to sync the Elasticsearch index with all the changes?

ABC has a nifty tail mode that allows synchronizing the Postgres database in real-time to an Elasticsearch index. It uses the replication slot feature of Postgres to be able to do this.

It can be enabled by passing a --tail switch.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Now, run the import command again with the tail option. You might see an error with this text:

must be superuser or replication role to use replication slots

Don't panic. This happens because you don't have replication setup on your Postgres database which is required for the tailing feature. So, you will now have to make the database user as superuser. (Read ALTER ROLE.)


After running the above command, you should be inside the psql shell. Now, run the following command (where 'test_user' is your database username).


Once that is done, you need to create some replication slots. Open postgresql.conf in the text editor and change the wal_level and max_replication_slots as follows. You will have to restart your Postgres server after this. This step enables the feature of replication slots.


After this, you will actually create a replication slot. Go back in the psql shell, connect to the source database, and run the following command.

\c users;
select * from pg_create_logical_replication_slot('users_replication_slot', 'test_decoding');
SELECT * FROM pg_replication_slots;

You should see database users in the replication slot row now. This means that the replication slot is properly setup.

Now let's try the tail again.

abc import --tail --src_type=postgres --src_uri="postgresql://test_user:test_pass@" --replication_slot="users_replication_slot" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

Run the import again. It should work this time. 

Also, notice that the import process won't exit when finished now. It will keep on running and will listen to the changes. Let's add a new entry to the database and see if it reflects on the appbaseio dashboard.

\c users;
insert into users values ('tony@stark.com', 'tony stark', 'iron man');

And yes, it works. You should see a new entry in the app dashboard when you hit reload.

Try making some more changes and all of them will be reflected on the appbaseio based Elasticsearch cluster. ��

Transforming Data Before Indexing Into Elasticsearch

There are times when you don't need the data to go as it is from source to the sink. You might like to change the target type name (i.e. users to accounts) or you might like to remove certain fields (i.e. bio) or create new fields. For all this, we have the transforms feature in ABC. It can be used by the transform_file parameter.

abc import --src_type=postgres --src_uri="postgresql://test_user:test_pass@" --transform_file="transform_file.js" "https://USER:PASS@scalr.api.appbase.io/abcpostgrestest"

The transform_file parameter takes the path to the transform file. That file in turn contains the JavaScript transforms that should be applied to the pipeline. Let's take the contents of transform_file.js as follows.

t.Source("source", source, "/.*/")
 .Save("sink", sink, "/.*/")

In the above transform, you can see that we are going to omit the bio field from the data transfer. Now when we run the new import command, we should have the following result.

As you can see, the bio field was omitted when data reached the sink. More documentation on transform file can be found on GitHub. It supports lots of inbuilt functions like omit and even supports running custom JavaScript code as a transform. It's a good idea to explore its documentation.

Further Reading

ABC's README is a good place to start if you want to learn more about the application. You can also have a look at the Postgres adaptor docs. Furthermore, you may star the repo on GitHub and watch it to stay tuned for updates.

database, elasticsearch, indexing data, postgres, tutorial

Published at DZone with permission of Avi Aryan , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}