Over a million developers have joined DZone.

Data Import Handler – How to import data from SQL databases (part 2)

· Java Zone

What every Java engineer should know about microservices: Reactive Microservices Architecture.  Brought to you in partnership with Lightbend.

In the first part we were able to index the information contained in the database. In the second part we will try to extend the functionality by adding incremental imports.

There was a little over 1 million documents, and the import took less than half an hour. In principle, there could we end the issue of data import, but imagine that we would like this data to be indexed on an ongoing basis, as far as they change in the source. I won’t be, of course, true RTS (real time search) – there will be interval between the change in the data and time they will be indexed in the search system, but let’s assume that update every hour is sufficient. The first thing we must do in order to implement incremental indexing the database preparation.

Database preparation

Incremental indexing needs to obtain information from the database – what documents have changed since the last indexation. If we are lucky, such data is available – if you are unlucky, you must modify the existing database structure. Depending on the database structure we have several options. In our practice we used most often:

adding an additional column with the exact date of last modification, which were automatically updated (eg trigger or default/update it in mysql), or (worse solution) manually (by application)
create a queue of orders – to write (eg with a trigger) revised identifiers of the documents in a separate table

In both solutions we need to pay attention to the changes of all entities that are included in the document.

Returning to our example from the first part of the article (Polish wikipedia, imported into a PostgreSQL database, the mediawiki application tables), our structure looks like this:

Table “page“:

page_idintegernot null default nextval(‘page_page_id_seq’::regclass)
page_titletextnot null
page_counterbigintnot null default 0
page_is_redirectsmallintnot null default 0
page_is_newsmallintnot null default 0
page_randomnumeric(15,14)not null default random()
page_touched timestamp with time zone 
page_latest integernot null
page_lenintegernot null

Table: “revision“:

rev_idintegernot null default nextval('revision_rev_id_seq'::regclass)
rev_userintegernot null
rev_user_texttextnot null
rev_timestamptimestamp with time zonenot null
rev_minor_editsmallintnot null default 0
rev_deletedsmallintnot null default 0

Table: “pagecontent“:

old_idintegernot null default nextval('text_old_id_seq'::regclass)

The first table contains a column “page_touched” Second: “rev_timestamp” what appears to be exactly what we need: the date of modification. The third table does not have such a field, but contains the texts for a specific version of the page – these texts do not change over time – when a user modifies a page, there is only the new version.

Let us recall the definition of the source from the first part of the article:

  <dataSource driver="org.postgresql.Driver"
     password="secret" />
    <entity name="page" query="SELECT page_id, page_title from page">
      <field column="page_id" name="id" />
      <field column="page_title" name="name" />
      <entity name="revision" query="select rev_id from revision where rev_page=${page.page_id}">
        <entity name="pagecontent" query="select old_text from pagecontent where old_id=${revision.rev_id}">
          <field column="old_text" name="text" />

What we need to do is add the definitions of queries used in incremental indexing. Nothing could be simpler:

 <dataSource driver="org.postgresql.Driver" url="jdbc:postgresql://localhost:5432/wikipedia" user="wikipedia" password="secret" />
  <entity name="page" query="SELECT page_id, page_title from page" deltaQuery="select page_id from page where page_touched > '${dataimporter.last_index_time}'" deltaImportQuery="SELECT page_id, page_title from page where page_id=${dataimporter.delta.page_id}">
   <field column="page_id" name="id" />
   <field column="page_title" name="name" />
   <entity name="revision" query="select rev_id from revision where rev_page=${page.page_id}" deltaQuery="select rev_id from revision where rev_timestamp > '${dataimporter.last_index_time}'" parentDeltaQuery="select page_id from page where page_id=${revision.rev_page}">
    <entity name="pagecontent" query="select old_text from pagecontent where old_id=${revision.rev_id}" deltaQuery="select old_id from pagecontent where old_id < 0">
     <field column="old_text" name="text" />


Well – there are easier things:)

Comparing those two files the only difference that we see is an additional definitions of two attributes:

  • deltaQuery – query responsible for returning the IDs of those records that have changed since the last crawl (full or incremental) – the last crawl time is provided by DIH in the variable: ${dataimporter.last_index_time}. This query is used by Solr to find those records that have changed.
  • deltaImportQuery – query requesting data for a given record identified by ID that is avaiable as a DIH variable: ${dataimport.delta.id}.
  • parentDeltaQuery – query requesting data for the parent entity record. With these queries Solr is able to retrieve all the data that make up the document, regardless of the entity from which they originate. This is necessary because the indexing engine is not possible to modify the indexed data – so we need to index the entire document, regardless of the fact that some data has not changed.

In our example we do not remove the documents. Therefore, we eliminated the problem of taking deleted documents into account and of course the process of documents deleting from the index. If this functionality will be needed, we can use the process described above (with the queue of orders) and add another attribute. In the DIH configuration we would use the attribute:

  • deletedPkQuery – provides identifiers of deleted items.

In the next part of the article we will also sort out issues of cooperation with the database, we will try revise our database integration and do it in a slightly different way.


Microservices for Java, explained. Revitalize your legacy systems (and your career) with Reactive Microservices Architecture, a free O'Reilly book. Brought to you in partnership with Lightbend.


Published at DZone with permission of Marek Rogoziński, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}