DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Databases
  4. Kafka JDBC Source Connector for Large Data

Kafka JDBC Source Connector for Large Data

In this article we are going to look at how we can use Kafka’s JDBC Source Connector to query a large table/view in batches both for new records and updated ones and publish them to a topic using Avro Schema.

Ion Pascari user avatar by
Ion Pascari
·
Nov. 29, 22 · Tutorial
Like (7)
Save
Tweet
Share
6.88K Views

Join the DZone community and get the full member experience.

Join For Free

Recently I had a task to migrate some data from an old monolith’s Oracle database to a microservice with a PostgreSQL database. The problem was that the data needed for migration had a parent table with around 2 million records with 150 columns and on top of that, everything was brought into view with a payload column aggregating data from various tables in XML. As you can imagine, the SELECT from that view was pretty slow, and by pretty, I mean insanely slow which was not going to work very well for the connector. So, in this article we’ll take a look at a similar simplified use case and how can we deal with it.

Use Case

We have a course-catalogue application with a PostgreSQL database that deals with instructors and their courses. Now we need to migrate some legacy instructors from another PostgreSQL database that soon is going to be decommissioned. So we have instructors-legacy-db and the course-catalog-db. In our case, both databases won’t be that overwhelmed with records, with just about 200 records for the instructors-legacy-db, but for the sake of the example, just imagine that instructors-legacy-db is that table with 2 million cumbersome records. 

Right, here is the docker-compose.yml

YAML
 
version: '3'
services:
  course-catalog-operational-db:
    image: postgres:13.3
    container_name: course-catalog-operational-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: course-catalog-db
    ports:
      - "5433:5432"
  instructors-legacy-db:
    image: postgres:13.3
    container_name: instructors-legacy-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: instructors-db
    ports:
      - "5434:5432"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql


Here is the course-catalog-db/instructors table:

SQL
 
create table instructors
(
    id          integer   not null
        primary key,
    created_at  timestamp not null,
    updated_at  timestamp not null,
    description varchar(3000),
    name        varchar(255),
    summary     varchar(3000)
);


And here is the instructors-legacy-db/instructors table:

SQL
 
create table instructors
(
    id         integer   not null
        primary key,
    created_at timestamp not null,
    updated_at timestamp not null,
    first_name       varchar(255),
    last_name       varchar(255),
    title varchar(255)
);


Also if you’ve noticed for the instructors-legacy-db container I’m using a init.sql script to create the table and do some inserts on the startup, so we’ll have some data to play around with. There is nothing special in that script, just 200 randomly generated inserts, here are some of them (the rest of them can be viewed on the repo) :

SQL
 
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');


And here is the view that we are going to address:

SQL
 
create view instructor_aggregate_vw(id, created_at, updated_at, name) as
SELECT instructors.id,
       instructors.created_at,
       instructors.updated_at,
       (((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
       instructors.last_name::text AS name
FROM instructors;


Okay, not that we have everything in place, how do we get our data queried and published?

Kafka Connect in Action

That’s right, we are going to use io.confluent.connect.jdbc.JdbcSourceConnector for that.

The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.

But first, we need to set up our Kafka Environment. For this, I am going to use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connectand the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is what we are going to add to our docker-compose.yml

YAML
 
  course-catalog-kafka-cluster:
    container_name: course-catalog-kafka-cluster
    image: landoop/fast-data-dev
    environment:
      ADV_HOST: 127.0.0.1         
      RUNTESTS: 0                 
    ports:
      - 2181:2181                 
      - 3030:3030                 
      - 8081-8083:8081-8083       
      - 9581-9585:9581-9585       
      - 9092:9092                 
    volumes:
      # Specify an absolute path mapping
      - C:\Users\ionpa\Projects\course-catalog\infra:/my-data


Okay, docker-compose up, and our entire environment is up. You can go ahead and check the UI on http://localhost:3030/ and explore it a bit. 

Now about the connector. Connectors can be added both through the UI on localhost here following a property-based or JSON-based configuration or by executing an HTTP request http://localhost:8083/connectors/ with the PUT method and a JSON body containing your connector’s config. Let’s take a look at the configuration that we are going to use for our connector:

Properties files
 
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --


First of all, hats off to Confluent, for doing such a great job of documenting everything. You can check the meaning of each property here. But, anyway, I am going to give a short description of what we have here:

  • name – obviously, is the connector’s name.
  • topic.prefix – because we are using a custom query this is the name of the topic that we are going to publish our records.
  • connector.class – is the implementation of the connector that we are going to use, in our case it is the io.confluent.connect.jdbc.JdbcSourceConnector
  • table.types – since we are going to query a custom view, the type is going to be VIEW.
  • connection.* - are connection-related properties, obviously the connection URL, user, password to our DB and also some configuration related to the number of attempts and back off.
  • poll.interval.ms – it is basically how frequently the connector should poll the table for new records.
  • transforms.* - in my case configuration properties related to converting/serializing both the payload and the key to AVRO.
  • mode- this is basically one of the most important properties, and it can have the following values:
    • bulk – queries the entire table every time
    • incrementing – will detect new rows based on an id column
    • timestamp – will detect new/updated rows based on an automatically updated timestamp column
    • timestamp+incrementing – considered to be the most robust and accurate mode, since it combines 2 modes mentioned above, and having both the timestamp column and id column in place allows us to identify both the new and updated rows uniquely
  • timestamp.* - defines the required column and delay in milliseconds for the timestamp mode, in our case update_at
  • incrementing.column.name – defines the required column for the incrementing mode, in our case id
  • numeric.mapping – decides how are we going to treat, NUMERIC values, and in our case, it is best_fit which tells those numeric columns should be cast to INT or FLOAT based upon the column’s precision and scale
  • query– this is the most essential property regarding to this article, so let us dive a bit deeper here.
    • Basically, this property defines the query that is going to be used to address a table/view, and for our case, this should be enough, right?
    • SQL
       
      select * from instructor_aggregate_vw

      Well not really, since we want this query to poll in batches, our query needs to be modified. Let’s say we want to query in batches of 50 records, this should be easily implemented with a simple LIMIT like this

    • SQL
       
      select * from instructor_aggregate_vw limit 50

      Yes and no, yes that is the correct implementation, and this won’t work for the Kafka Connector. And it won’t work because the query should be in such a form that WHERE clause should be "appendable", and why is that? Because of our specified mode, the connector will append a WHERE clause based on our defined timestamp/incrementing columns and the query, in the end, will look like this

    • SQL
       
      select * from instructor_aggregate_vw WHERE "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) ORDER BY "updated_at","id" ASC

      And if we add the LIMIT 50, that will break, as WHERE cannot be after LIMIT in PostgreSQL. So how do we deal with this? 

      The trick is to take everything in our hands, comment on the generated WHERE clause by the connector, use a subquery to handle the WHERE clause ourselves and perform the LIMITing like this (pay attention to them --)

    • SQL
       
      select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --

      And this should work for any database, for example in Oracle you’ll do a WHERE rownum <= 50.

This should be it, if you create this connector, you won’t see much since our view is very light on the data that it provides, and the streaming should happen really fast, but if you’re to use a heavy view/table you’ll notice how data appears in your queue in batches of 50 messages.

Consuming

If we are to go even further to consume this data and bring it in course-catalog-db/instructors, it is a matter of defining a sink connector since we don’t do any transformation or use a simple consumer. In Spring Boot with Kotlin, a Kafka listener would look like this

Java
 
@KafkaListener(
    topics = ["\${kafka.consumer.legacyInstructorsTopic}"],
    containerFactory = KafkaConfiguration.LEGACY_INSTRUCTORS_CONTAINER_FACTORY
)
fun handle(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) key: Key,
    @Payload instructorAggregate: InstructorAggregate
) {
    log.info("Received old instructor [${instructorAggregate}]")
    instructorService.upsert(instructorAggregate)
}


For the AVRO deserialization, I got the generated AVRO schemas from the schema registry here and saved the .avsc files. And with the help of this plugin id("com.github.davidmc24.gradle.plugin.avro")  configured like this

Groovy
 
tasks.withType<com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask> {
    source(file("${projectDir}\\src\\main\\resources\\avro"))
    setOutputDir(file("${projectDir}\\src\\main\\kotlin"))
}


I obtained the POJOs, used in the listener.

Conclusion

We’ve seen how a connector can be configured to work in batches and track both new and updated records in the most robust way. You can check all the mentioned codes and files here.

That’s all folks, I hope that you’ve learned something. Happy coding!

Database Java Database Connectivity Relational database Connector (mathematics) Data (computing) kafka sql Data Types

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • Microservices Testing
  • A Gentle Introduction to Kubernetes
  • Top 10 Best Practices for Web Application Testing
  • GitLab vs Jenkins: Which Is the Best CI/CD Tool?

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: