DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
  1. DZone
  2. Data Engineering
  3. Databases
  4. Implementing Producer / Consumer using SynchronousQueue

Implementing Producer / Consumer using SynchronousQueue

Andriy Redko user avatar by
Andriy Redko
·
Jan. 02, 13 · Interview
Like (0)
Save
Tweet
Share
9.23K Views

Join the DZone community and get the full member experience.

Join For Free

Among plenty of useful classes which Java provides for concurrency support, there is one I would like to talk about: SynchronousQueue. In particular, I would like to walk through Producer / Consumer implementation using handy SynchronousQueue as an exchange mechanism.

It might not sound clear why to use this type of queue for producer / consumer communication unless we look under the hood of SynchronousQueue implementation. It turns out that it's not really a queue as we used to think about queues. The analogy would be just a collection containing at most one element.

Why it's useful? Well, there are several reasons. From producer's point of view, only one element (or message) could be stored into the queue. In order to proceed with the next element (or message), the producer should wait till consumer consumes the one currently in the queue. From consumer's point of view, it just polls the queue for next element (or message) available. Quite simple, but the great benefit is: producer cannot send messages faster than consumer can process them.

Here is one of the use cases I encountered recently: compare two database tables (possibly just huge) and detect if those contain different data or data is the same (copy). The SynchronousQueue is quite a handy tool for this problem: it allows to handle each table in own thread as well as compensate the possible timeouts / latency while reading from two different databases.

Let's start by defining our compare function which accepts source and destination data sources as well as a table name (to compare). I am using quite useful JdbcTemplate class from Spring framework as it extremely well abstract all the boring details dealing with connections and prepared statements.

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}

Before doing any actual data comparison, it's a good idea to compare table's row count of the source and destination databases:

if( from.queryForLong("SELECT count(1) FROM " + table ) != to.queryForLong("SELECT count(1) FROM " + table ) ) {
    return false;
}

Now, at least knowing that table contains same number of rows in both databases, we can start with data comparison. The algorithm is very simple:

  • create a separate thread for source (producer) and destination (consumer) databases
  • producer thread reads single row from the table and puts it into the SynchronousQueue
  • consumer thread also reads single row from the table, then asks queue for the available row to compare (waits if necessary) and lastly compare two result sets

Using another great part Java concurrent utilities for thread pooling, let's define a thread pool with fixed amount of threads (2).

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();        

Following the described algorithm, the producer functionality could be represented as a single callable:

Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( "Having more data but consumer has already completed" );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( "Having more data but producer has been interrupted" );
                    }
                }
            }
        );

        return  null;
    }
};

The code is a bit verbose due to Java syntax but it doesn't do much actually. Every result set read from the table producer converts to a list (implementation has been omitted as it's a boilerplate) and puts in a queue (offer). If queue is not empty, producer is blocked waiting for consumer to finish his work. The consumer, respectively, could be represented as a following callable:

Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( "Having more data but producer has already completed" );
                        }                                     
 
                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( "Row data is not the same" );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( "Having more data but consumer has been interrupted" );
                    }
                }
            }
        );
                    
        return  null;
    }
};

The consumer does a reverse operation on the queue: instead of putting data it pulls it (poll) from the queue. If queue is empty, consumer is blocked waiting for producer to publish next row. The part which is left is only submitting those callables for execution. Any exception returned by the Future's get method indicates that table doesn't contain the same data (or there are issue with getting data from database):

List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
for( final Future< Void > future: futures ) {
    future.get( 5, TimeUnit.MINUTES );
}

That's basically all for today ... and this year. Happy New Year to everyone!


 

producer consumer Database

Published at DZone with permission of Andriy Redko, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DevOps vs Agile: Which Approach Will Win the Battle for Efficiency?
  • HTTP vs Messaging for Microservices Communications
  • Fargate vs. Lambda: The Battle of the Future
  • Use Golang for Data Processing With Amazon Kinesis and AWS Lambda

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: