>  기사  >  Java  >  Java를 사용하여 범용 동시 개체 풀을 구현하기 위한 코드 공유

Java를 사용하여 범용 동시 개체 풀을 구현하기 위한 코드 공유

黄舟
黄舟원래의
2017-03-24 10:55:551219검색

이 기사에서는 주로 Java에서 객체풀을 구현하는 방법에 대해 설명합니다. 최근에는 Java Virtual Machine의 성능이 모든 측면에서 크게 향상되어 대부분의 객체에 대해 더 이상 객체 풀링을 통한 성능 향상이 필요하지 않습니다. 근본적인 이유는 새로운 객체를 만드는 것이 예전만큼 비용이 많이 들지 않기 때문입니다.

그러나 스레드, 데이터베이스 연결 및 기타 경량이 아닌 개체와 같이 생성 오버헤드가 매우 높은 개체가 여전히 있습니다. 모든 응용 프로그램에서 우리는 그러한 개체를 두 개 이상 사용할 것입니다. 이러한 개체의 풀을 생성하고 관리하여 이러한 개체를 동적으로 재사용할 수 있고 클라이언트 코드가 개체의 라이프 사이클을 신경 쓸 필요가 없는 편리한 방법이 있다면 여전히 매우 유용할 것입니다. 강한.

실제로 코드 작성을 시작하기 전에 다음 개체 풀이 완료해야 할 기능이 무엇인지 정리하겠습니다.

  • 객체 풀은 사용 가능한 객체가 있는 경우 클라이언트에 반환할 수 있어야 합니다.

  • 클라이언트가 개체를 다시 풀에 넣은 후 해당 개체를 재사용할 수 있습니다.

  • 객체 풀은 증가하는 클라이언트 요구 사항을 충족하기 위해 새로운 객체를 생성할 수 있습니다.

  • 풀을 닫은 후 메모리 누수가 발생하지 않도록 풀을 적절하게 닫는 메커니즘이 필요합니다.

위 사항은 말할 필요도 없이 클라이언트에게 노출시키고자 하는 Connection Pool의 인터페이스의 기본 기능입니다.

우리가 선언한 인터페이스는 다음과 같습니다:

package com.test.pool;

/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param <T> the type of object to pool.
 */
public interface Pool<T>
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();

 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t);

 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();
}

어떤 객체든 지원하기 위해 위의 인터페이스는 의도적으로 단순하고 보편적이도록 설계되었습니다. 풀에서 개체를 가져오거나 반환하는 메서드와 개체를 해제할 수 있도록 풀을 닫는 메커니즘을 제공합니다.

이제 이 인터페이스를 구현해 보겠습니다. 시작하기 전에 이상적인 릴리스 방법은 먼저 클라이언트가 반환한 개체를 재사용할 수 있는지 확인해야 한다는 점을 언급할 가치가 있습니다. 그렇다면 해당 개체를 수영장에 다시 던지십시오. 우리는 이 Pool 인터페이스의 모든 구현이 이 규칙을 따를 수 있기를 바랍니다. 특정 구현 클래스를 시작하기 전에 먼저 추상 클래스를 생성하여 후속 구현이 이 지점을 따르도록 제한합니다. 우리가 구현한 추상 클래스는 AbstractPool이라고 하며 그 정의는 다음과 같습니다.

package com.test.pool;

/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 *
 * @author Swaranga
 *
 * @param <T> the type of pooled objects.
 */
abstract class AbstractPool <T> implements Pool <T>
{
 /**
  * Returns the object to the pool.
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  *
  * If the object validation fails,
  * some implementations
  * will try to create a new one
  * and put it into the pool; however
  * this behaviour is subject to change
  * from implementation to implementation
  *
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }

 protected abstract void handleInvalidReturn(T t);

 protected abstract void returnToPool(T t);

 protected abstract boolean isValid(T t);
}

위 클래스에서는 객체를 풀에 다시 넣기 전에 객체 풀이 객체를 확인하도록 했습니다. 특정 구현에서는 자신의 동작을 사용자 정의하기 위해 이 세 가지 메서드를 구현하는 방법을 자유롭게 선택할 수 있습니다. 그들은 자신의 논리에 따라 객체가 유효한지 확인하는 방법, 유효하지 않은 경우 수행할 작업(handleInvalidReturn 메서드), 유효한 객체를 풀에 다시 넣는 방법(returnToPool 메서드)을 결정합니다.

위의 클래스를 사용하여 구체적인 구현을 시작할 수 있습니다. 그러나 여전히 문제가 있습니다. 위의 클래스는 범용 개체 풀을 지원하도록 설계되었으므로 특정 구현에서는 개체의 유효성을 확인하는 방법을 모릅니다(객체는 모두 일반이므로). 따라서 이를 수행하는 데 도움이 되는 다른 것이 필요합니다.

객체 검증을 완료하려면 일반적인 방법이 필요하며, 구체적인 구현에서는 객체 유형을 신경 쓸 필요가 없습니다. 따라서 우리는 객체의 유효성을 검사하는 방법을 정의하는 새로운 인터페이스인 Validator를 도입했습니다. 이 인터페이스의 정의는 다음과 같습니다.

package com.test.pool;

 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);

  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }

위 인터페이스는 객체를 확인하는 방법과 객체를 무효화하는 방법을 정의합니다. 객체를 삭제하고 메모리를 정리할 준비가 되면 무효화 방법이 유용합니다. 이 인터페이스 자체는 의미가 없다는 점은 주목할 가치가 있습니다. 객체 풀에서 사용될 때만 의미가 있으므로 이 인터페이스를 Pool 인터페이스에 정의합니다. 이는 Java 컬렉션 라이브러리의 Map 및 Map.Entry와 동일합니다. 따라서 풀 인터페이스는 다음과 같습니다.

rree

준비가 거의 완료되었습니다. 시작하기 전에 여전히 궁극적인 무기가 필요합니다. 이것이 이 개체 풀의 핵심 기능입니다. 그것은 "새로운 사물을 창조하는 능력"이다. 우리의 개체 풀은 일반적이므로 풀을 채우기 위해 새 개체를 생성하는 방법을 알아야 합니다. 이 기능은 개체 풀 자체에 의존할 수 없으며 새 개체를 생성하는 공통 방법이 있어야 합니다. 이는 "새 객체를 생성하는 방법" 메소드가 하나만 있는 ObjectFactory 인터페이스를 통해 수행할 수 있습니다. ObjectFactory 인터페이스는 다음과 같습니다.

package com.test.pool;

/**
 * Represents the mechanism to create
 * new objects to be used in an object pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to create.
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  *
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这样的话导致我们还需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。

这里是BlockingQueue的声明:

package com.test.pool;

import java.util.concurrent.TimeUnit;

/**
 * Represents a pool of objects that makes the
 * requesting threads wait if no object is available.
 *
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * indefinitely until an object is available.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * sets the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  * @return T an instance of the Object
  * of type T from the pool.
  */
 T get();

 /**
  * Returns an instance of type T from the pool,
  * waiting up to the
  * specified wait time if necessary
  * for an object to become available..
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * for time until an object is available
  * or until the timeout occurs.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * set the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  *
  * @param time amount of time to wait before giving up,
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *
  * @return T an instance of the Object
  * of type T from the pool.
  *
  * @throws InterruptedException
  * if interrupted while waiting
  */

 T get(long time, TimeUnit unit) throws InterruptedException;
}

BoundedBlockingPool的实现如下:

package com.test.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public final class BoundedBlockingPool
        extends <AbstractPool>
        implements <BlockingPool>
{
    private int size;
    private BlockingQueue  objects;
    private Validator  validator;
    private ObjectFactory  objectFactory;
    private ExecutorService executor =
            Executors.newCachedThreadPool();
    private volatile boolean shutdownCalled;

    public BoundedBlockingPool(
            int size,
            Validator  validator,
            ObjectFactory  objectFactory)
    {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedBlockingQueue (size);
        initializeObjects();
        shutdownCalled = false;
    }

    public T get(long timeOut, TimeUnit unit)
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.poll(timeOut, unit);
                return t;
            }
            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }
        throw new IllegalStateException(
                &#39;Object pool is already shutdown&#39;);
    }

    public T get()
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.take();
            }

            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }

        throw new IllegalStateException(
                &#39;Object pool is already shutdown&#39;);
    }

    public void shutdown()
    {
        shutdownCalled = true;
        executor.shutdownNow();
        clearResources();
    }

    private void clearResources()
    {
        for(T t : objects)
        {
            validator.invalidate(t);
        }
    }

    @Override
    protected void returnToPool(T t)
    {
        if(validator.isValid(t))
        {
            executor.submit(new ObjectReturner(objects, t));
        }
    }

    @Override
    protected void handleInvalidReturn(T t)
    {
    }

    @Override
    protected boolean isValid(T t)
    {
        return validator.isValid(t);
    }

    private void initializeObjects()
    {
        for(int i = 0; i < size; i++)
        {
            objects.add(objectFactory.createNew());
        }
    }

    private class ObjectReturner
            implements <Callable>
    {
        private BlockingQueue  queue;
        private E e;
        public ObjectReturner(BlockingQueue  queue, E e)
        {
            this.queue = queue;
            this.e = e;
        }

        public Void call()
        {
            while(true)
            {
                try
                {
                    queue.put(e);
                    break;
                }
                catch(InterruptedException ie)
                {
                    Thread.currentThread().interrupt();
                }
            }
            return null;
        }

    }

}

上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。

现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。

下面是这个JDBCConnectionValidator:

package com.test;

import java.sql.Connection;
import java.sql.SQLException;
import com.test.pool.Pool.Validator;
public final class JDBCConnectionValidator implements Validator < Connection >
{
    public boolean isValid(Connection con)
    {
        if(con == null)
        {
            return false;
        }

        try
        {
            return !con.isClosed();
        }
        catch(SQLException se)
        {
            return false;
        }

    }

    public void invalidate(Connection con)
    {
        try
        {
            con.close();
        }
        catch(SQLException se)
        {
        }
    }

}

还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.test.pool.ObjectFactory;
public class JDBCConnectionFactory implements ObjectFactory < Connection >
{
   private String connectionURL;
   private String userName;
   private String password;
   public JDBCConnectionFactory(
     String driver,
     String connectionURL,
     String userName,
     String password) {
     super();
     try
     {
        Class.forName(driver);
     }
     catch(ClassNotFoundException ce)
     {
        throw new IllegalArgumentException(&#39;Unable to find driver in classpath&#39;, ce);
     }
     this.connectionURL = connectionURL;
     this.userName = userName;
     this.password = password;
   }

   public Connection createNew()
   {
      try
      {
         return DriverManager.getConnection(
            connectionURL,
            userName,
            password);
      }
      catch(SQLException se)
      {
         throw new IllegalArgumentException(&#39;Unable to create new connection&#39;, se);
      }
   }
}

现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
            new BoundedBlockingPool < Connection > (
            10,
            new JDBCConnectionValidator(),
            new JDBCConnectionFactory(&#39;&#39;, &#39;&#39;, &#39;&#39;, &#39;&#39;)
        );
        //do whatever you like
    }
}

为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:

package com.test.pool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BoundedPool < T > extends AbstractPool < T >
{
    private int size;
    private Queue < T > objects;
    private Validator < T > validator;
    private ObjectFactory < T > objectFactory;
    private Semaphore permits;
    private volatile boolean shutdownCalled;

    public BoundedPool(
        int size,
        Validator < T > validator,
        ObjectFactory < T > objectFactory)
        {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedList < T >();
        initializeObjects();
        shutdownCalled = false;
    }

    @Override
    public T get()
    {
        T t = null;

        if(!shutdownCalled)
        {
            if(permits.tryAcquire())
            {
                t = objects.poll();
            }

         }
         else
         {
             throw new IllegalStateException(&#39;Object pool already shutdown&#39;);
         }
         return t;
     }

     @Override
     public void shutdown()
     {
         shutdownCalled = true;
         clearResources();
     }

     private void clearResources()
     {
         for(T t : objects)
         {
             validator.invalidate(t);
         }
     }

     @Override
     protected void returnToPool(T t)
     {
         boolean added = objects.add(t);
         if(added)
         {
             permits.release();
         }
     }

     @Override
     protected void handleInvalidReturn(T t)
     {
     }

     @Override
     protected boolean isValid(T t)
     {
         return validator.isValid(t);
     }

     private void initializeObjects()
     {
         for(int i = 0; i < size; i++)
         {
             objects.add(objectFactory.createNew());
         }
     }
}

考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:

package com.test.pool;

import com.test.pool.Pool.Validator;

/**

* Factory and utility methods for

* {@link Pool} and {@link BlockingPool} classes

* defined in this package.
* This class supports the following kinds of methods:
*
*
<ul>
*
<li> Method that creates and returns a default non-blocking
*        implementation of the {@link Pool} interface.
*   </li>
*
*
<li> Method that creates and returns a
*        default implementation of
*        the {@link BlockingPool} interface.
*   </li>
*
</ul>
*
* @author Swaranga
*/
public final class PoolFactory
{
    private PoolFactory()
    {
    }

/**
* Creates a and returns a new object pool,
* that is an implementation of the {@link BlockingPool},
* whose size is limited by
* the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to
* validate the re-usability of returned objects.
*
* @return a blocking object pool
* bounded by <tt> size </tt>
*/
public static < T > Pool < T >
newBoundedBlockingPool(
int size,
ObjectFactory < T > factory,
Validator < T > validator)
{
    return new BoundedBlockingPool < T > (
    size,
    validator,
    factory);
}
/*
* Creates a and returns a new object pool,
* that is an implementation of the {@link Pool}
* whose size is limited
* by the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to validate
* the re-usability of returned objects.
*
* @return an object pool bounded by <tt> size </tt>
*/
public static < T > Pool < T > newBoundedNonBlockingPool(
    int size,
    ObjectFactory < T > factory,
    Validator < T > validator)
{
    return new BoundedPool < T >(size, validator, factory);
}
}

现在我们的客户端就能用一种可读性更强的方式来创建对象池了:

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
        PoolFactory.newBoundedBlockingPool(
        10,
        new JDBCConnectionFactory(&#39;&#39;, &#39;&#39;, &#39;&#39;, &#39;&#39;),
        new JDBCConnectionValidator());
        //do whatever you like
     }
}

위 내용은 Java를 사용하여 범용 동시 개체 풀을 구현하기 위한 코드 공유의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.