A Generic and Concurrent Object Pool
Join the DZone community and get the full member experience.
Join For Free- The pool must let clients use an object if any is available.
- It must reuse the objects once they are returned to the pool by a client.
- If required, it must be able to create more objects to satisfy growing demands of the client.
- It must provide a proper shutdown mechanism, such that on shutdown no memory leaks occur.
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); }
package com.test.pool; /** * Represents an abstract pool, that defines the procedure * of returning an object to the pool. * * @author Swaranga * * @param < T > the type of pooled objects. */ abstract class AbstractPool < T > implements Pool < T > { /** * Returns the object to the pool. * The method first validates the object if it is * re-usable and then puts returns it to the pool. * * If the object validation fails, * some implementations * will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation * */ @Override public final void release(T t) { if(isValid(t)) { returnToPool(t); } else { handleInvalidReturn(t); } } protected abstract void handleInvalidReturn(T t); protected abstract void returnToPool(T t); protected abstract boolean isValid(T t); }
package com.test.pool; /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); }
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); } }
package com.test.pool; /** * Represents the mechanism to create * new objects to be used in an object pool. * * @author Swaranga * * @param < T > the type of object to create. */ public interface ObjectFactory < T > { /** * Returns a new instance of an object of type T. * * @return T an new instance of the object of type T */ public abstract T createNew(); }
Hence the Blockingpool interface declaration is as follows:
package com.test.pool; import java.util.concurrent.TimeUnit; /** * Represents a pool of objects that makes the * requesting threads wait if no object is available. * * @author Swaranga * * @param < T > the type of objects to pool. */ public interface BlockingPool < T > extends Pool < T > { /** * Returns an instance of type T from the pool. * * The call is a blocking call, * and client threads are made to wait * indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * sets the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * @return T an instance of the Object * of type T from the pool. */ T get(); /** * Returns an instance of type T from the pool, * waiting up to the * specified wait time if necessary * for an object to become available.. * * The call is a blocking call, * and client threads are made to wait * for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * set the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * * @param time amount of time to wait before giving up, * in units of <tt>unit</tt> * @param unit a <tt>TimeUnit</tt> determining * how to interpret the * <tt>timeout</tt> parameter * * @return T an instance of the Object * of type T from the pool. * * @throws InterruptedException * if interrupted while waiting */ T get(long time, TimeUnit unit) throws InterruptedException; }
package com.test.pool; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public final class BoundedBlockingPool < T > extends AbstractPool < T > implements BlockingPool < T > { private int size; private BlockingQueue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private ExecutorService executor = Executors.newCachedThreadPool(); private volatile boolean shutdownCalled; public BoundedBlockingPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedBlockingQueue < T >(size); initializeObjects(); shutdownCalled = false; } public T get(long timeOut, TimeUnit unit) { if(!shutdownCalled) { T t = null; try { t = objects.poll(timeOut, unit); return t; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( "Object pool is already shutdown"); } public T get() { if(!shutdownCalled) { T t = null; try { t = objects.take(); } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( "Object pool is already shutdown"); } public void shutdown() { shutdownCalled = true; executor.shutdownNow(); clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { if(validator.isValid(t)) { executor.submit(new ObjectReturner(objects, t)); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } } private class ObjectReturner < E > implements Callable < Void > { private BlockingQueue < E > queue; private E e; public ObjectReturner(BlockingQueue < E > queue, E e) { this.queue = queue; this.e = e; } public Void call() { while(true) { try { queue.put(e); break; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } } return null; } } }
The above is a very basic object pool backed internally by a
LinkedBlockingQueue. The only method of interest is the returnToPool()
method. Since the internal storage is a blocking pool, if we tried to
put the returned element directly into the LinkedBlockingPool, it might
block he client if the queue is full. But we do not want a client of an
object pool to block just for a mundane task like returning an object to
the pool. So we have made the actual task of inserting the object into
the LinkedBlockingQueue as an asynchronous task and submit it to an
Executor instance so that the client thread can return immediately.
Now we will use the above object pool into our code. We will use the
object pool to pool some database connection objects. Hence we will need
a Validator to validate our database connection objects.
Our JDBCConnectionValidator will look like this:
package com.test; import java.sql.Connection; import java.sql.SQLException; import com.test.pool.Pool.Validator; public final class JDBCConnectionValidator implements Validator < Connection > { public boolean isValid(Connection con) { if(con == null) { return false; } try { return !con.isClosed(); } catch(SQLException se) { return false; } } public void invalidate(Connection con) { try { con.close(); } catch(SQLException se) { } } }
And our JDBCObjectFactory, that will enable the object pool to create new objects will be as follows:
package com.test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import com.test.pool.ObjectFactory; public class JDBCConnectionFactory implements ObjectFactory < Connection > { private String connectionURL; private String userName; private String password; public JDBCConnectionFactory( String driver, String connectionURL, String userName, String password) { super(); try { Class.forName(driver); } catch(ClassNotFoundException ce) { throw new IllegalArgumentException( "Unable to find driver in classpath", ce); } this.connectionURL = connectionURL; this.userName = userName; this.password = password; } public Connection createNew() { try { return DriverManager.getConnection( connectionURL, userName, password); } catch(SQLException se) { throw new IllegalArgumentException( "Unable to create new connection", se); } } }
Now we create a JDBC object pool using the above Validator and ObjectFactory:
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = new BoundedBlockingPool < Connection > ( 10, new JDBCConnectionValidator(), new JDBCConnectionFactory("", "", "", "") ); //do whatever you like } }
As a bonus for reading the entire post. I will provide another implementation of the Pool interface that is essentially a non blocking object pool. The only difference of this implementation for the previous one is that this implementation does not block the client if an element is unavailable, rather return null. Here it goes:
package com.test.pool; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; public class BoundedPool < T > extends AbstractPool < T > { private int size; private Queue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private Semaphore permits; private volatile boolean shutdownCalled; public BoundedPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedList < T >(); initializeObjects(); shutdownCalled = false; } @Override public T get() { T t = null; if(!shutdownCalled) { if(permits.tryAcquire()) { t = objects.poll(); } } else { throw new IllegalStateException( "Object pool already shutdown"); } return t; } @Override public void shutdown() { shutdownCalled = true; clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { boolean added = objects.add(t); if(added) { permits.release(); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } } }
Considering we are now two implementations strong, it is better to let users create our pools via factory with meaningful names. Here is the factory:
package com.test.pool; import com.test.pool.Pool.Validator; /** * Factory and utility methods for * {@link Pool} and {@link BlockingPool} classes * defined in this package. * This class supports the following kinds of methods: * * <ul> * <li> Method that creates and returns a default non-blocking * implementation of the {@link Pool} interface. * </li> * * <li> Method that creates and returns a * default implementation of * the {@link BlockingPool} interface. * </li> * </ul> * * @author Swaranga */ public final class PoolFactory { private PoolFactory() { } /** * Creates a and returns a new object pool, * that is an implementation of the {@link BlockingPool}, * whose size is limited by * the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to * validate the re-usability of returned objects. * * @return a blocking object pool * bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedBlockingPool < T > ( size, validator, factory); } /** * Creates a and returns a new object pool, * that is an implementation of the {@link Pool} * whose size is limited * by the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to validate * the re-usability of returned objects. * * @return an object pool bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedNonBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedPool < T >(size, validator, factory); } }
Thus our clients now can create object pools in a more readable manner:
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = PoolFactory.newBoundedBlockingPool( 10, new JDBCConnectionFactory("", "", "", ""), new JDBCConnectionValidator()); //do whatever you like } }
And so ends our long post. This one was long overdue. Feel free to use it, change it, add more implementations.
Published at DZone with permission of Swaranga Sarma, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments