Home  >  Article  >  Java  >  Code sharing for implementing a universal concurrent object pool using Java

Code sharing for implementing a universal concurrent object pool using Java

黄舟
黄舟Original
2017-03-24 10:55:551219browse

In this article we mainly discuss how to implement an objectpool in Java. In recent years, the performance of the Java virtual machine has been greatly improved in all aspects, so for most objects, there is no need to use object pools to improve performance. The fundamental reason is that creating a new object is not as expensive as it used to be.

However, there are still some objects whose creation overhead is very high, such as threads, database connections and other non-lightweight objects. In any application, we will definitely use more than one such object. If there is a very convenient way to create and manage a pool of these objects, so that these objects can be dynamically reused, and the client code does not need to care about their lifecycle, it will still be very powerful.

Before we actually start writing code, let’s sort out what functions the next object pool needs to complete.

  • The object pool should be able to return to the client if there are available objects.

  • After the client puts the objects back into the pool, they can reuse these objects.

  • The object pool can create new objects to meet the growing needs of clients.

  • There needs to be a mechanism to correctly close the pool to ensure that memory leaks will not occur after closing.

Needless to say, the above points are the basic functions of the interface of the connection pool that we want to expose to the client.

Our declared interface is as follows:

package com.test.pool;

/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param <T> the type of object to pool.
 */
public interface Pool<T>
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();

 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t);

 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();
}

In order to support any object, the above interface is deliberately designed to be simple and universal. It provides methods to get/return objects from the pool, and a mechanism to close the pool so that objects can be released.

Now let’s implement this interface. Before we get started, it's worth mentioning that an ideal release method should first try to check whether the object returned by the client can be reused. If it is, throw it back into the pool. If not, discard the object. We hope that all implementations of this Pool interface can follow this rule. Before starting the specific implementation class, we first create an abstract class in order to restrict subsequent implementations to follow this point. The abstract class we implemented is called AbstractPool, and its definition is as follows:

package com.test.pool;

/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 *
 * @author Swaranga
 *
 * @param <T> the type of pooled objects.
 */
abstract class AbstractPool <T> implements Pool <T>
{
 /**
  * Returns the object to the pool.
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  *
  * If the object validation fails,
  * some implementations
  * will try to create a new one
  * and put it into the pool; however
  * this behaviour is subject to change
  * from implementation to implementation
  *
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }

 protected abstract void handleInvalidReturn(T t);

 protected abstract void returnToPool(T t);

 protected abstract boolean isValid(T t);
}

In the above class, we make the object pool have to verify the object before putting it back into the pool. Specific implementations are free to choose how to implement these three methods in order to customize their own behavior. They decide based on their own logic how to determine whether an object is valid, what to do if it is invalid (handleInvalidReturn method), and how to put a valid object back into the pool (returnToPool method).

With the above categories, we can start the specific implementation. However, there is still a problem. Since the above classes are designed to support universal object pools, the specific implementation does not know how to verify the validity of the objects (because the objects are all generic). So we need something else to help us do this.

We need a general method to complete object verification, and the specific implementation does not need to care about the type of object. Therefore, we introduced a new interface, Validator, which defines methods for validating objects. The definition of this interface is as follows:

package com.test.pool;

 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);

  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }

The above interface defines a method to check the object and a method to invalidate the object. When preparing to discard an object and clean up memory, the invalidate method comes in handy. It is worth noting that this interface itself has no meaning. It only makes sense when it is used in an object pool, so we define this interface into the Pool interface. This is the same as Map and Map.Entry in the Java collection library. So our Pool interface becomes like this:

package com.test.pool;

/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();

 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t);

 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();

 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);

  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }
}

The preparations are almost done. Before we start, we still need an ultimate weapon. This is the trump card of this object pool. It is "the ability to create new objects". Our object pool is generic, so they need to know how to generate new objects to fill the pool. This functionality cannot rely on the object pool itself, there must be a common way to create new objects. This can be accomplished through an ObjectFactory interface, which has only one "how to create a new object" method. Our ObjectFactory interface is as follows:

package com.test.pool;

/**
 * Represents the mechanism to create
 * new objects to be used in an object pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to create.
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  *
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这样的话导致我们还需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。

这里是BlockingQueue的声明:

package com.test.pool;

import java.util.concurrent.TimeUnit;

/**
 * Represents a pool of objects that makes the
 * requesting threads wait if no object is available.
 *
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * indefinitely until an object is available.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * sets the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  * @return T an instance of the Object
  * of type T from the pool.
  */
 T get();

 /**
  * Returns an instance of type T from the pool,
  * waiting up to the
  * specified wait time if necessary
  * for an object to become available..
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * for time until an object is available
  * or until the timeout occurs.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * set the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  *
  * @param time amount of time to wait before giving up,
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *
  * @return T an instance of the Object
  * of type T from the pool.
  *
  * @throws InterruptedException
  * if interrupted while waiting
  */

 T get(long time, TimeUnit unit) throws InterruptedException;
}

BoundedBlockingPool的实现如下:

package com.test.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public final class BoundedBlockingPool
        extends <AbstractPool>
        implements <BlockingPool>
{
    private int size;
    private BlockingQueue  objects;
    private Validator  validator;
    private ObjectFactory  objectFactory;
    private ExecutorService executor =
            Executors.newCachedThreadPool();
    private volatile boolean shutdownCalled;

    public BoundedBlockingPool(
            int size,
            Validator  validator,
            ObjectFactory  objectFactory)
    {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedBlockingQueue (size);
        initializeObjects();
        shutdownCalled = false;
    }

    public T get(long timeOut, TimeUnit unit)
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.poll(timeOut, unit);
                return t;
            }
            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }
        throw new IllegalStateException(
                &#39;Object pool is already shutdown&#39;);
    }

    public T get()
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.take();
            }

            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }

        throw new IllegalStateException(
                &#39;Object pool is already shutdown&#39;);
    }

    public void shutdown()
    {
        shutdownCalled = true;
        executor.shutdownNow();
        clearResources();
    }

    private void clearResources()
    {
        for(T t : objects)
        {
            validator.invalidate(t);
        }
    }

    @Override
    protected void returnToPool(T t)
    {
        if(validator.isValid(t))
        {
            executor.submit(new ObjectReturner(objects, t));
        }
    }

    @Override
    protected void handleInvalidReturn(T t)
    {
    }

    @Override
    protected boolean isValid(T t)
    {
        return validator.isValid(t);
    }

    private void initializeObjects()
    {
        for(int i = 0; i < size; i++)
        {
            objects.add(objectFactory.createNew());
        }
    }

    private class ObjectReturner
            implements <Callable>
    {
        private BlockingQueue  queue;
        private E e;
        public ObjectReturner(BlockingQueue  queue, E e)
        {
            this.queue = queue;
            this.e = e;
        }

        public Void call()
        {
            while(true)
            {
                try
                {
                    queue.put(e);
                    break;
                }
                catch(InterruptedException ie)
                {
                    Thread.currentThread().interrupt();
                }
            }
            return null;
        }

    }

}

上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。

现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。

下面是这个JDBCConnectionValidator:

package com.test;

import java.sql.Connection;
import java.sql.SQLException;
import com.test.pool.Pool.Validator;
public final class JDBCConnectionValidator implements Validator < Connection >
{
    public boolean isValid(Connection con)
    {
        if(con == null)
        {
            return false;
        }

        try
        {
            return !con.isClosed();
        }
        catch(SQLException se)
        {
            return false;
        }

    }

    public void invalidate(Connection con)
    {
        try
        {
            con.close();
        }
        catch(SQLException se)
        {
        }
    }

}

还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.test.pool.ObjectFactory;
public class JDBCConnectionFactory implements ObjectFactory < Connection >
{
   private String connectionURL;
   private String userName;
   private String password;
   public JDBCConnectionFactory(
     String driver,
     String connectionURL,
     String userName,
     String password) {
     super();
     try
     {
        Class.forName(driver);
     }
     catch(ClassNotFoundException ce)
     {
        throw new IllegalArgumentException(&#39;Unable to find driver in classpath&#39;, ce);
     }
     this.connectionURL = connectionURL;
     this.userName = userName;
     this.password = password;
   }

   public Connection createNew()
   {
      try
      {
         return DriverManager.getConnection(
            connectionURL,
            userName,
            password);
      }
      catch(SQLException se)
      {
         throw new IllegalArgumentException(&#39;Unable to create new connection&#39;, se);
      }
   }
}

现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
            new BoundedBlockingPool < Connection > (
            10,
            new JDBCConnectionValidator(),
            new JDBCConnectionFactory(&#39;&#39;, &#39;&#39;, &#39;&#39;, &#39;&#39;)
        );
        //do whatever you like
    }
}

为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:

package com.test.pool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BoundedPool < T > extends AbstractPool < T >
{
    private int size;
    private Queue < T > objects;
    private Validator < T > validator;
    private ObjectFactory < T > objectFactory;
    private Semaphore permits;
    private volatile boolean shutdownCalled;

    public BoundedPool(
        int size,
        Validator < T > validator,
        ObjectFactory < T > objectFactory)
        {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedList < T >();
        initializeObjects();
        shutdownCalled = false;
    }

    @Override
    public T get()
    {
        T t = null;

        if(!shutdownCalled)
        {
            if(permits.tryAcquire())
            {
                t = objects.poll();
            }

         }
         else
         {
             throw new IllegalStateException(&#39;Object pool already shutdown&#39;);
         }
         return t;
     }

     @Override
     public void shutdown()
     {
         shutdownCalled = true;
         clearResources();
     }

     private void clearResources()
     {
         for(T t : objects)
         {
             validator.invalidate(t);
         }
     }

     @Override
     protected void returnToPool(T t)
     {
         boolean added = objects.add(t);
         if(added)
         {
             permits.release();
         }
     }

     @Override
     protected void handleInvalidReturn(T t)
     {
     }

     @Override
     protected boolean isValid(T t)
     {
         return validator.isValid(t);
     }

     private void initializeObjects()
     {
         for(int i = 0; i < size; i++)
         {
             objects.add(objectFactory.createNew());
         }
     }
}

考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:

package com.test.pool;

import com.test.pool.Pool.Validator;

/**

* Factory and utility methods for

* {@link Pool} and {@link BlockingPool} classes

* defined in this package.
* This class supports the following kinds of methods:
*
*
<ul>
*
<li> Method that creates and returns a default non-blocking
*        implementation of the {@link Pool} interface.
*   </li>
*
*
<li> Method that creates and returns a
*        default implementation of
*        the {@link BlockingPool} interface.
*   </li>
*
</ul>
*
* @author Swaranga
*/
public final class PoolFactory
{
    private PoolFactory()
    {
    }

/**
* Creates a and returns a new object pool,
* that is an implementation of the {@link BlockingPool},
* whose size is limited by
* the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to
* validate the re-usability of returned objects.
*
* @return a blocking object pool
* bounded by <tt> size </tt>
*/
public static < T > Pool < T >
newBoundedBlockingPool(
int size,
ObjectFactory < T > factory,
Validator < T > validator)
{
    return new BoundedBlockingPool < T > (
    size,
    validator,
    factory);
}
/*
* Creates a and returns a new object pool,
* that is an implementation of the {@link Pool}
* whose size is limited
* by the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to validate
* the re-usability of returned objects.
*
* @return an object pool bounded by <tt> size </tt>
*/
public static < T > Pool < T > newBoundedNonBlockingPool(
    int size,
    ObjectFactory < T > factory,
    Validator < T > validator)
{
    return new BoundedPool < T >(size, validator, factory);
}
}

现在我们的客户端就能用一种可读性更强的方式来创建对象池了:

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
        PoolFactory.newBoundedBlockingPool(
        10,
        new JDBCConnectionFactory(&#39;&#39;, &#39;&#39;, &#39;&#39;, &#39;&#39;),
        new JDBCConnectionValidator());
        //do whatever you like
     }
}

The above is the detailed content of Code sharing for implementing a universal concurrent object pool using Java. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn