DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years
  • SQL Commands: A Brief Guide
  • How to Store Text in PostgreSQL: Tips, Tricks, and Traps
  • Building an Enterprise CDC Solution

Trending

  • How to Build Real-Time BI Systems: Architecture, Code, and Best Practices
  • Code Reviews: Building an AI-Powered GitHub Integration
  • Agile’s Quarter-Century Crisis
  • How to Merge HTML Documents in Java
  1. DZone
  2. Data Engineering
  3. Databases
  4. Kafka JDBC Source Connector for Large Data

Kafka JDBC Source Connector for Large Data

In this article we are going to look at how we can use Kafka’s JDBC Source Connector to query a large table/view in batches both for new records and updated ones and publish them to a topic using Avro Schema.

By 
Ion Pascari user avatar
Ion Pascari
·
Nov. 29, 22 · Tutorial
Likes (8)
Comment
Save
Tweet
Share
12.4K Views

Join the DZone community and get the full member experience.

Join For Free

Recently I had a task to migrate some data from an old monolith’s Oracle database to a microservice with a PostgreSQL database. The problem was that the data needed for migration had a parent table with around 2 million records with 150 columns and on top of that, everything was brought into view with a payload column aggregating data from various tables in XML. As you can imagine, the SELECT from that view was pretty slow, and by pretty, I mean insanely slow which was not going to work very well for the connector. So, in this article we’ll take a look at a similar simplified use case and how can we deal with it.

Use Case

We have a course-catalogue application with a PostgreSQL database that deals with instructors and their courses. Now we need to migrate some legacy instructors from another PostgreSQL database that soon is going to be decommissioned. So we have instructors-legacy-db and the course-catalog-db. In our case, both databases won’t be that overwhelmed with records, with just about 200 records for the instructors-legacy-db, but for the sake of the example, just imagine that instructors-legacy-db is that table with 2 million cumbersome records. 

Right, here is the docker-compose.yml

YAML
 
version: '3'
services:
  course-catalog-operational-db:
    image: postgres:13.3
    container_name: course-catalog-operational-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: course-catalog-db
    ports:
      - "5433:5432"
  instructors-legacy-db:
    image: postgres:13.3
    container_name: instructors-legacy-db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    environment:
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: instructors-db
    ports:
      - "5434:5432"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql


Here is the course-catalog-db/instructors table:

SQL
 
create table instructors
(
    id          integer   not null
        primary key,
    created_at  timestamp not null,
    updated_at  timestamp not null,
    description varchar(3000),
    name        varchar(255),
    summary     varchar(3000)
);


And here is the instructors-legacy-db/instructors table:

SQL
 
create table instructors
(
    id         integer   not null
        primary key,
    created_at timestamp not null,
    updated_at timestamp not null,
    first_name       varchar(255),
    last_name       varchar(255),
    title varchar(255)
);


Also if you’ve noticed for the instructors-legacy-db container I’m using a init.sql script to create the table and do some inserts on the startup, so we’ll have some data to play around with. There is nothing special in that script, just 200 randomly generated inserts, here are some of them (the rest of them can be viewed on the repo) :

SQL
 
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');


And here is the view that we are going to address:

SQL
 
create view instructor_aggregate_vw(id, created_at, updated_at, name) as
SELECT instructors.id,
       instructors.created_at,
       instructors.updated_at,
       (((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
       instructors.last_name::text AS name
FROM instructors;


Okay, not that we have everything in place, how do we get our data queried and published?

Kafka Connect in Action

That’s right, we are going to use io.confluent.connect.jdbc.JdbcSourceConnector for that.

The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.

But first, we need to set up our Kafka Environment. For this, I am going to use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connectand the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is what we are going to add to our docker-compose.yml

YAML
 
  course-catalog-kafka-cluster:
    container_name: course-catalog-kafka-cluster
    image: landoop/fast-data-dev
    environment:
      ADV_HOST: 127.0.0.1         
      RUNTESTS: 0                 
    ports:
      - 2181:2181                 
      - 3030:3030                 
      - 8081-8083:8081-8083       
      - 9581-9585:9581-9585       
      - 9092:9092                 
    volumes:
      # Specify an absolute path mapping
      - C:\Users\ionpa\Projects\course-catalog\infra:/my-data


Okay, docker-compose up, and our entire environment is up. You can go ahead and check the UI on http://localhost:3030/ and explore it a bit. 

Now about the connector. Connectors can be added both through the UI on localhost here following a property-based or JSON-based configuration or by executing an HTTP request http://localhost:8083/connectors/ with the PUT method and a JSON body containing your connector’s config. Let’s take a look at the configuration that we are going to use for our connector:

Properties files
 
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --


First of all, hats off to Confluent, for doing such a great job of documenting everything. You can check the meaning of each property here. But, anyway, I am going to give a short description of what we have here:

  • name – obviously, is the connector’s name.
  • topic.prefix – because we are using a custom query this is the name of the topic that we are going to publish our records.
  • connector.class – is the implementation of the connector that we are going to use, in our case it is the io.confluent.connect.jdbc.JdbcSourceConnector
  • table.types – since we are going to query a custom view, the type is going to be VIEW.
  • connection.* - are connection-related properties, obviously the connection URL, user, password to our DB and also some configuration related to the number of attempts and back off.
  • poll.interval.ms – it is basically how frequently the connector should poll the table for new records.
  • transforms.* - in my case configuration properties related to converting/serializing both the payload and the key to AVRO.
  • mode- this is basically one of the most important properties, and it can have the following values:
    • bulk – queries the entire table every time
    • incrementing – will detect new rows based on an id column
    • timestamp – will detect new/updated rows based on an automatically updated timestamp column
    • timestamp+incrementing – considered to be the most robust and accurate mode, since it combines 2 modes mentioned above, and having both the timestamp column and id column in place allows us to identify both the new and updated rows uniquely
  • timestamp.* - defines the required column and delay in milliseconds for the timestamp mode, in our case update_at
  • incrementing.column.name – defines the required column for the incrementing mode, in our case id
  • numeric.mapping – decides how are we going to treat, NUMERIC values, and in our case, it is best_fit which tells those numeric columns should be cast to INT or FLOAT based upon the column’s precision and scale
  • query– this is the most essential property regarding to this article, so let us dive a bit deeper here.
    • Basically, this property defines the query that is going to be used to address a table/view, and for our case, this should be enough, right?
    • SQL
       
      select * from instructor_aggregate_vw

      Well not really, since we want this query to poll in batches, our query needs to be modified. Let’s say we want to query in batches of 50 records, this should be easily implemented with a simple LIMIT like this

    • SQL
       
      select * from instructor_aggregate_vw limit 50

      Yes and no, yes that is the correct implementation, and this won’t work for the Kafka Connector. And it won’t work because the query should be in such a form that WHERE clause should be "appendable", and why is that? Because of our specified mode, the connector will append a WHERE clause based on our defined timestamp/incrementing columns and the query, in the end, will look like this

    • SQL
       
      select * from instructor_aggregate_vw WHERE "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) ORDER BY "updated_at","id" ASC

      And if we add the LIMIT 50, that will break, as WHERE cannot be after LIMIT in PostgreSQL. So how do we deal with this? 

      The trick is to take everything in our hands, comment on the generated WHERE clause by the connector, use a subquery to handle the WHERE clause ourselves and perform the LIMITing like this (pay attention to them --)

    • SQL
       
      select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --

      And this should work for any database, for example in Oracle you’ll do a WHERE rownum <= 50.

This should be it, if you create this connector, you won’t see much since our view is very light on the data that it provides, and the streaming should happen really fast, but if you’re to use a heavy view/table you’ll notice how data appears in your queue in batches of 50 messages.

Consuming

If we are to go even further to consume this data and bring it in course-catalog-db/instructors, it is a matter of defining a sink connector since we don’t do any transformation or use a simple consumer. In Spring Boot with Kotlin, a Kafka listener would look like this

Java
 
@KafkaListener(
    topics = ["\${kafka.consumer.legacyInstructorsTopic}"],
    containerFactory = KafkaConfiguration.LEGACY_INSTRUCTORS_CONTAINER_FACTORY
)
fun handle(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) key: Key,
    @Payload instructorAggregate: InstructorAggregate
) {
    log.info("Received old instructor [${instructorAggregate}]")
    instructorService.upsert(instructorAggregate)
}


For the AVRO deserialization, I got the generated AVRO schemas from the schema registry here and saved the .avsc files. And with the help of this plugin id("com.github.davidmc24.gradle.plugin.avro")  configured like this

Groovy
 
tasks.withType<com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask> {
    source(file("${projectDir}\\src\\main\\resources\\avro"))
    setOutputDir(file("${projectDir}\\src\\main\\kotlin"))
}


I obtained the POJOs, used in the listener.

Conclusion

We’ve seen how a connector can be configured to work in batches and track both new and updated records in the most robust way. You can check all the mentioned codes and files here.

That’s all folks, I hope that you’ve learned something. Happy coding!

Database Java Database Connectivity Relational database Connector (mathematics) Data (computing) kafka sql Data Types

Opinions expressed by DZone contributors are their own.

Related

  • Architecture and Code Design, Pt. 1: Relational Persistence Insights to Use Today and On the Upcoming Years
  • SQL Commands: A Brief Guide
  • How to Store Text in PostgreSQL: Tips, Tricks, and Traps
  • Building an Enterprise CDC Solution

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!