DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Boosting Similarity Search With Stream Processing
  • Comparing Apache Ignite In-Memory Cache Performance With Hazelcast In-Memory Cache and Java Native Hashmap
  • Why Database Migrations Take Months and How to Speed Them Up
  • Unmasking Entity-Based Data Masking: Best Practices 2025

Trending

  • How the Go Runtime Preempts Goroutines for Efficient Concurrency
  • Blue Skies Ahead: An AI Case Study on LLM Use for a Graph Theory Related Application
  • How to Practice TDD With Kotlin
  • Immutable Secrets Management: A Zero-Trust Approach to Sensitive Data in Containers
  1. DZone
  2. Data Engineering
  3. Databases
  4. Hazelcast Jet Tutorial: Building Custom JDBC Sinks

Hazelcast Jet Tutorial: Building Custom JDBC Sinks

In this tutorial, we are going to build a JDBC sink writing Stock updates to a relational database, but you can apply the same principles for building an arbitrary sink.

By 
Jaromir Hamala user avatar
Jaromir Hamala
·
Apr. 24, 18 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
6.3K Views

Join the DZone community and get the full member experience.

Join For Free

Hazelcast Jet supports writing into a number of third-party systems, including HDFS, Apache Kafka, and others. But what if you want to write into your own system that is not supported by Jet out-of-the-box? Starting with the version 0.6, Jet offers a new simple-to-use API for building custom Sinks and this tutorial will show you how to use it!

In this tutorial, we are going to build a JDBC sink writing Stock updates to a relational database, but you can apply the same principles for building an arbitrary sink.

Basics

The basic construction block is SinkBuilder. You can obtain its instance via the factory method Sinks::builder. The builder accepts a bunch of functions controlling the Sink behavior. The two most important functions are:

  1. The function you have to pass to a builder in a constructor. It creates a context object which is then passed to the onReceive() function.
  2. The onReceive() function. Jet calls this function for each element the Sink receive. The function receives the element itself and also the context and this is where you write an element to your target system.

You can optionally also pass two other functions: they control lifecycle and batching behavior of the sink.

A very simple Sink could look like this:

public class JDBCSink {
    private static final String INSERT_QUERY = "insert into stock_updates (ts, price, symbol) values (?, ?, ?)";

    public static Sink newSink(String connectionUrl) {
        return Sinks.builder((unused) -> JDBCSink.openConnection(connectionUrl))
                .onReceiveFn(JDBCSink::insertUpdate)
                .destroyFn(JDBCSink::closeConnection)
                .build();
    }

    private static Connection openConnection(String connectionUrl) {
        try {
            return DriverManager.getConnection(connectionUrl);
        } catch (SQLException e) {
            throw new IllegalStateException("Cannot acquire a connection with URL '" + connectionUrl + "'", e);
        }
    }

    private static void closeConnection(Connection c) {
        try {
            c.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    private static void insertUpdate(Connection c, StockPriceUpdate i) {
        try (PreparedStatement ps = c.prepareStatement(INSERT_QUERY)) {
            ps.setLong(1, i.getTimestamp());
            ps.setLong(2, i.getPrice());
            ps.setString(3, i.getSymbol());

            ps.executeUpdate();
        } catch (SQLException e) {
            throw new IllegalStateException("Error while inserting " + i + " into database");
        }
    }
}

The implementation is rather simplistic and perhaps naive, but it's working! Jet calls the openConnection() function for each Sink instance it creates. This function acquires a new JDBC connection. This connection is then passed to the function insertUpdate() along with the new item. Sink Builder wires all these functions together and creates a regular sink from them.

One reason why it's so simple is that it has origins in the Jet threading model: a single Sink instance is always single-threaded and you do not have to deal with concurrency.

This is how you could use your sink in a Jet Pipeline:

String connectionJdbcUrl = getJDBCUrlString();
Pipeline pipeline = Pipeline.create().drawFrom(Sources.mapJournal(MAP_NAME, JournalInitialPosition.START_FROM_OLDEST))
    .map(Map.Entry::getValue)
    .drainTo(JDBCSink.newSink(connectionJdbcUrl))
    .getPipeline();
jetInstance.newJob(pipeline);

The pipeline reads from IMap change journal, extract just the value from each entry and pass the value to our Sink. That's it! You can see it in action in this project.

Optimizations

While the code above works it has multiple drawbacks. One of the big ones is performance:

  1. It creates a new prepared statement for each element it receives. This is unnecessary as prepared statements can be perfectly reused.
  2. The insertUpdate() function calls a blocking JDBC method for each element it receives. Again, this is not great performance-wise as it usually involves a network roundtrip to a database and this could very easily become a bottleneck.

We can address both concerns with a rather simple code change:

public class BetterJDBCSink {
    private static final String INSERT_QUERY = "insert into stock_updates (ts, price, symbol) values (?, ?, ?)";

    public static Sink newSink(String connectionUrl) {
        return Sinks.builder((unused) -> JDBCSink.createStatement(connectionUrl))
                .onReceiveFn(JDBCSink::insertUpdate)
                .destroyFn(JDBCSink::cleanup)
                .flushFn(JDBCSink::flush)
                .build();
    }

    private static PreparedStatement createStatement(String connectionUrl) {
        Connection connection = null;
        try {
            connection = DriverManager.getConnection(connectionUrl);
            return connection.prepareStatement(INSERT_QUERY);
        } catch (SQLException e) {
            closeSilently(connection);
            throw new IllegalStateException("Cannot acquire a connection with URL '" + connectionUrl + "'", e);
        }
    }

    private static void closeSilently(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                //ignored
            }
        }
    }

    private static void cleanup(PreparedStatement ps) {
        try {
            if (ps != null) {
                ps.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (ps != null) {
                try {
                    Connection connection = ps.getConnection();
                    closeSilently(connection);
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static void flush(PreparedStatement ps) {
        try {
            ps.executeBatch();
        } catch (SQLException e) {
            throw new IllegalStateException("Error while storing a batch into database", e);
        }
    }

    private static void insertUpdate(PreparedStatement ps, StockPriceUpdate i) {
        try {
            ps.setLong(1, i.getTimestamp());
            ps.setLong(2, i.getPrice());
            ps.setString(3, i.getSymbol());

            ps.addBatch();
        } catch (SQLException e) {
            throw new IllegalStateException("Error while inserting " + i + " into database", e);
        }
    }

As you can see the basic structure is still the same: SinkBuilder with a bunch of function registered. There are two significant changes:

  1. The function we are passing to the SinkBuilder constructor does not produce a Connection, but it directly produces a PreparedStatement. This way we can easily re-used the statement as Jet will pass it to the insertUpdate() function.
  2. The insertUpdate() function does not execute the statement directly, but it just creates a batch. This is a feature JDBC provides and a JDBC driver is free to optimize batched query execution. We have to notify the driver when is a good time to actually execute the batches queries. To do so we registered a new function in the SinkBuilder: flush(). Jet will call it when it is a good time to actually flush the batches record.

This optimized Sink will perform much better. It does not need to wait for a database roundtrip with each element. Instead it batches the elements and goes into a database only once in a while. When exactly? This is determined by Jet itself and depends on factors such as incoming data rate: when traffic is low then Jet calls the flush for each item. However when the rate of incoming elements increases then Jet will call the flush less frequently and the batching effect will kick in.

Wrapping Up

We built a completely custom Sink in a few lines of code. It would require minimal changes to use e.g. JMS to write into a message broker. You can see full source of this tutorial on GitHub.

Have a look at Jet Reference Manual and some of the awesome demos the team has built!

Database Hazelcast

Published at DZone with permission of Jaromir Hamala, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Boosting Similarity Search With Stream Processing
  • Comparing Apache Ignite In-Memory Cache Performance With Hazelcast In-Memory Cache and Java Native Hashmap
  • Why Database Migrations Take Months and How to Speed Them Up
  • Unmasking Entity-Based Data Masking: Best Practices 2025

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!