DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Advanced Kubernetes Setup for Spring Boot App With PostgreSQL DB
  • GraphQL With Java Spring Boot and Postgres or MySQL Made Easy!
  • Authentication with Spring Boot and Spring Security — JWT and Postgres
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17

Trending

  • DZone's Article Submission Guidelines
  • Medallion Architecture: Why You Need It and How To Implement It With ClickHouse
  • Top Book Picks for Site Reliability Engineers
  • A Complete Guide to Modern AI Developer Tools
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Unlocking the Power of Postgres Listen/Notify: Building a Scalable Solution With Spring Boot Integration

Unlocking the Power of Postgres Listen/Notify: Building a Scalable Solution With Spring Boot Integration

Unleash the potential of Postgres Listen/Notify paired with Spring Boot for streamlined and scalable real-time communication.

By 
Jasmin Tankić user avatar
Jasmin Tankić
·
Dec. 09, 23 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
10.8K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, we will examine Postgres' "Listen/Notify" functionality and try to answer the following simple questions:

  1. Why leverage Postgres as a message broker?
  2. When is it most beneficial to do so?
  3. How can you seamlessly integrate it with Spring Boot?

This article dives into strategies for building scalable solutions, prioritizing not just the efficiency of your system but also the integrity of your data. Serving as a fundamental guide, it aims to provide a comprehensive understanding of Postgres Listen/Notify, encouraging you to implement your distinctive use cases.

When and Why To Use Postgres as a Message Broker

Before diving further into the subject, let's establish some foundational clarifications. 

While Postgres LISTEN/NOTIFY feature is powerful for real-time communication within a database, it's important to note that it doesn't serve as a direct alternative or replacement for comprehensive queue solutions like RabbitMQ, Apache Kafka, IBM MQ, Redis Pub/Sub, Azure Event Hub and others. 

Unlike dedicated message queue systems, LISTEN/NOTIFY is specifically tailored to database-centric scenarios and may lack certain advanced features such as distributed data storage, fault tolerance, and the ability to handle massive-scale data streaming. 

So, Why Use It?

  1. Listen/Notify is already part of the Postgres database by default.
  2. Setting it up is simpler compared to configuring and managing a separate Apache Kafka cluster, for example.
  3. Listen/Notify in Postgres reduces application overhead by eliminating the need for an additional layer of complexity and external dependency.
  4. Reduced Infrastructure Complexity and ensures better resource efficiency, Listen/Notify eliminates the need for an additional messaging infrastructure, making it a more straightforward solution for scenarios where simplicity and minimal setup are priorities.

And When To Use It

Listen/Notify works well for handling tasks when all the necessary information is already in the database. It sends messages to background processes, letting them know there's something new to work on.

These are some basic use cases when listening/notifying might be a wise choice.

  1. When there is a need to send real-time notifications upon specific database events.
  2. When background processes need to be triggered immediately upon changes in the database.
  3. When building event-driven architectures.
  4. When building a scalable publish/subscribe mechanism where multiple components can subscribe to specific events.
  5. When aiming to simplify your architecture and reduce dependencies by handling messaging directly within PostgreSQL instead of relying on external message brokers.

How To Get the Most of It and Spring Boot Implementation Example

To get most of this feature, we will need to use it in combination with some other nice features that are offered by Postgres. 

Scenario Description

Let's set up a simple real-life scenario where we want to get advantages of the listen/notify feature. 

In this use case, we aim to provide a straightforward REST endpoint, allowing clients to seamlessly dispatch SMS messages to customers. 

The endpoint accepts a list of JSON objects, each containing essential details such as the phone number and the message content.

Upon a successful client request to this endpoint, our goal is to efficiently submit all these objects into a queue and initiate a background process. This process involves invoking an additional SMS API responsible for managing the delivery of the SMS messages.

How To Handle This Use Case

As it's already mentioned, to get most of the listen/notify feature, let's use a combination of some other Postgres features to do this implementation properly. 

Database

To begin, let's create a straightforward internal queue table. This table will serve as the repository for all SMS messages awaiting processing and transmission by our Spring Boot application.

SQL
 
CREATE TABLE NOTIFICATIONS_QUEUE
(
    ID                         BIGSERIAL    PRIMARY KEY,
    PHONE_NUMBER               VARCHAR(255),
    CONTENT                    VARCHAR(255),
    STATUS                     VARCHAR(255),
    CREATED_AT                 TIMESTAMP    NOT NULL DEFAULT NOW(),
);


Now, let's create a Postgres trigger function.

SQL
 
CREATE OR REPLACE FUNCTION notify_new_notification_in_queue()
RETURNS TRIGGER AS $$
BEGIN
  IF TG_OP = 'INSERT' THEN
    PERFORM pg_notify('notifications', NEW.id::text);
  ELSIF TG_OP = 'UPDATE' THEN
    PERFORM pg_notify('notifications', NEW.id::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER notification_added_in_queue
    AFTER INSERT OR UPDATE ON notifications_queue
                        FOR EACH ROW
                        EXECUTE FUNCTION notify_new_notification_in_queue();


This code creates a setup in Postgres to alert when new records are added or updated in a NOTIFICATIONS_QUEUE table. 

Simply put, whenever a new record is added, or an existing one is updated in this table, it triggers a notification. This notification is sent via pg_notify method, and it will contain a newly generated database record ID.

With this, we are done with database changes, and let's jump into the Spring boot code.

Spring Boot Implementation

Spring boot implementation is quite straightforward. 

First, let's create a NotificationListenerConfiguration class.

Java
 
@Configuration
@Slf4j
@RequiredArgsConstructor
public class NotificationsListenerConfiguration {
    private final NotificationListener notificationListener;
    @Bean
    CommandLineRunner startListener(NotificationHandler handler) {
        return (args) -> {
            log.info("Starting to watch for new notifications in the queue...");
            notificationListener.listenForNotifications(handler);
        };
    }
}


And let's follow up with actual NotificationListener class

Java
 
@Component
@Slf4j
@RequiredArgsConstructor
public class NotificationListener {
    @Value("${notificationListenerTimeout:10000}")
    private int listenerTimeout;

    private static final String NOTIFICATIONS_CHANNEL = "notifications";
    private final DataSource dataSource;
    private final NotificationQueueDatabaseService queueDatabaseService;
    @Async
    public void listenForNotifications(Consumer<PGNotification> consumer) {
        while (true) {
            try (Connection c = dataSource.getConnection()) {
                PGConnection pgconn = c.unwrap(PGConnection.class);
                c.createStatement().execute("LISTEN " + NOTIFICATIONS_CHANNEL);
                log.info("Connection established: Listening for notifications on channel: [{}]", NOTIFICATIONS_CHANNEL);
                queueDatabaseService.notifyAboutRemainingNotificationsInQueue();
                log.info("Notified about hanging notifications in the queue...");
                while (true) {
                    PGNotification[] nts = pgconn.getNotifications(listenerTimeout);
                    if (nts == null) {
                        continue;
                    }
                    for (PGNotification nt : nts) {
                        consumer.accept(nt);
                    }
                }
            } catch (Exception e) {
                log.warn("Error occurred while listening for notifications, attempting to reconnect...", e);
            }
        }
    }
}


This code defines a NotificationListener class that asynchronously listens for notifications on a Postgres channel named "notifications". If an error occurs while listening for notifications, it logs a warning, attempts to reconnect, and continues listening.

Now, we need just one more class to get this up and running.

Java
 
@Component
@Slf4j
@RequiredArgsConstructor
public class NotificationHandler implements Consumer<PGNotification>{
    private final NotificationQueueDatabaseService queueDatabaseService;
    private final NotificationService notificationService;

    @Override
    @Transactional
    public void accept(PGNotification pgNotification) {
        log.info("Notification with id: [{}] received...", pgNotification.getParameter());
        Long notificationId = getNotificationIdFromPgNotificationParameter(pgNotification.getParameter());

        if (notificationId == null) {
            return;
        }

        Boolean lockObtained = queueDatabaseService.obtainLockForNotification(notificationId);

        if (!lockObtained) {
            log.info("Notification with id: [{}] is already being processed...", pgNotification.getParameter());
            return;
        }

        DBNotificationQueue notification = null;

        try {
            notification = queueDatabaseService.findUnlockedNotificationById(notificationId);
        } catch (Exception exception) {
            //handle some exception
        }

        try {
            notificationService.sendNotification(notification);
        } catch (Exception exception) {
            //handle send notification exception
        } finally {
            //delete notification from the queue or update status
        }
    }

    private Long getNotificationIdFromPgNotificationParameter(String pgNotificationParameter) {
        try {
            return Long.parseLong(pgNotificationParameter);
        } catch (Exception exception) {
            log.error("Error occurred while parsing notification id from pgNotificationParameter : [{}]", pgNotificationParameter, exception);
            return null;
        }
    }
}


Please note that this code is intentionally simplified to provide you with a basic understanding of how a Notification Handler should be structured. In essence, whenever a new record is added or updated in the NOTIFICATIONS_QUEUE table, this method captures the change and initiates the processing sequence. 

How To Scale Properly

To enhance the scalability of this code, we've used a built-in feature in Postgres known as advisory lock. 

Examining the database method queueDatabaseService.obtainLockForNotification(notificationId), you'll find it structured like this:

Java
 
@Query(value = "SELECT pg_try_advisory_xact_lock(?1)", nativeQuery = true)
Boolean obtainLockForNotification(Long id);


This Postgres SQL statement is attempting to acquire an advisory lock for a transaction using the value provided in the method. 

Advisory Lock

An advisory lock is a mechanism in PostgreSQL that allows applications to coordinate and synchronize activities, providing a way to control access to shared resources. In simpler terms, this statement is trying to obtain a specific type of lock to manage concurrent access to certain operations in a transaction.

Combined with the @Transactional annotation on the accept method, this configuration guarantees that a single notification is processed exclusively on one instance of the application. This precaution is crucial; without this code snippet, there is a risk of sending duplicate notifications to customers, as multiple instances might attempt to handle the same notification concurrently.

Conclusion

This article highlights the simplicity and effectiveness of Postgres "Listen/Notify" for real-time database communication, demonstrated through a practical Spring Boot example. I hope this encourages developers to consider and implement this approach in their projects for streamlined solutions.

Message broker Spring Boot PostgreSQL

Opinions expressed by DZone contributors are their own.

Related

  • Advanced Kubernetes Setup for Spring Boot App With PostgreSQL DB
  • GraphQL With Java Spring Boot and Postgres or MySQL Made Easy!
  • Authentication with Spring Boot and Spring Security — JWT and Postgres
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!