Home  >  Article  >  Java  >  Implementation of blocking queue for actual state dependency management in JAVA development

Implementation of blocking queue for actual state dependency management in JAVA development

无忌哥哥
无忌哥哥Original
2018-07-19 11:32:052600browse

The class library itself contains many classes with state dependencies. Such as FutureTask, BlockingQueue, etc. Some operations in these classes are based on state preconditions. For example, you cannot delete elements from an empty queue or obtain the calculation result of an unfinished task. Before executing these two operations, you must wait until the queue enters a non-empty state or the task enters the completed state. The simplest way for us to create state-dependent classes is to construct them based on a class library. But if the class library does not have the functionality you want, you can also use the underlying mechanism provided by the Java language and class library to construct your own synchronization mechanism.

So, this article will introduce how to construct your own state dependency class. Introduce step by step from the simplest construction to complex standardized construction, so as to understand the process and how to get the final result.

Management of state dependency

The blockable state dependency operation is shown in the following pseudo code:

acquire lock on object state //首先获取锁
     while (precondition does not hold) { //前提条件是否满足,不满足则一直循环重试
        release lock //释放锁
        wait until precondition might hold  //等待知道满足前提条件
        optionally fail if interrupted or timeout expire //中断或者超时,各种异常
        reacquire lock //重新获取锁
    }

perform action //执行任务
release lock  //释放锁

Acquire the lock, check whether the condition is met, and release it if not The lock enters the blocking state until the condition is met or interrupted, times out, etc., and the lock is reacquired. Execute the task and release the lock.

You may not be able to understand it intuitively when you look at this pseudocode now. It’s okay. Just read on and you will know what it means after reading this article. Every operation is constructed from this pseudocode architecture.

ArrayBlockingQueue is a bounded cache that provides two operations, put and take. They all contain a prerequisite: elements cannot be placed into a full cache, and elements cannot be retrieved from an empty cache. Well, our goal is to construct such an ArrayBlockingQueue.

Next, we will introduce two implementations of bounded cache, which use different methods to deal with situations where preconditions are not met.

First, let’s look at the following base class BaseBoundeBuffer. Subsequent implementations extend this base class. It is an array-based circular cache, and the variables buf, head, tail, and count contained are protected by the cache's built-in lock. It also provides synchronous doPut and doTake methods, and in subclasses, put and take operations are implemented through these methods, and the underlying state will be hidden from subclasses.

public abstract class BaseBoundedBuffer<V> {

    private final V[] buf;    
    private int tail;    
    private int head;    
    private int count;    
    protected BaseBoundedBuffer(int capacity) {        
    this.buf = (V[]) new Object[capacity];
        count = 0;
    }    
    protected synchronized final void doPut(V v) {
        buf[tail] = v;        if(++tail == buf.length)
            tail = 0;
        ++count;
    }    
    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;        
        if(++head == buf.length)
        head = 0;
        --count;        
        return v;
    }    
    public synchronized final boolean isFull() {        
        return count == buf.length;
    }    
    public synchronized final boolean isEmpty() {        
        return count == 0;
    }
}

The first implementation of bounded cache synchronizes both put and take methods, checks first and then executes, and throws an exception if it fails.

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer{

    protected GrumpyBoundedBuffer(int capacity) {        
        super(capacity);
    }    
    public synchronized void put(V v) throws BufferFullException {        
        if(isFull()) {            
            throw new BufferFullException();
       }
        doPut(v);
    }    
    public synchronized V take() throws BufferFullException {        
        if(isEmpty())            
            throw new BufferFullException();        
            return (V) doTake();
    }
}

As shown above, if the prerequisites are not met, an exception will be thrown directly. The so-called exception here refers to the cache being full or empty. In fact, this exception does not mean that the program is wrong. For example, seeing a red light does not mean that the signal light is abnormal, but waiting until the green light is crossing the road. So, what this means is that the caller is required to catch the exception and retry each cache operation.

Let’s look directly at the following client calling code:

private static GrumpyBoundedBuffer gbb = new GrumpyBoundedBuffer(5);
...while(true) {    try {
        V item = gbb.take();        
        break;
    } catch(BufferEmptyException e) {
        Thread.sleep(500);
    }
}

To put it bluntly, if the prerequisites are not met, try again until the conditions are met, so that it seems that blocking can be achieved Effect. But in this case, the caller must handle the failure of the precondition by himself, and the CPU is always occupied. The problem here is that it will be very troublesome for the caller to use this queue!

The second method, SleepyBoundedBuffer implements a simple blocking retry mechanism through polling and sleep, thus allowing the caller to strip off the retry mechanism and simplify the use of cache. Please look at the following code listing:

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer{

    protected SleepyBoundedBuffer(int capacity) {        
        super(capacity);        
        // TODO Auto-generated constructor stub
    }    
    public void put(V v) throws InterruptedException {        
        while(true) {            
            synchronized(this) {                
                if(!isFull()) {
                    doPut(v);                    
                    return;
                }
            }
            Thread.sleep(200);
        }
    }    
    public V take() throws InterruptedException{        
        while(true) {            
            synchronized(this) {                
            if(!isEmpty()) {                    
            return (V) doTake();
                }
            }
            Thread.sleep(200);
        }
    }
}

From the caller's perspective, this method works well. If an operation meets the prerequisites, it will be executed immediately, otherwise it will be blocked. The caller does not need to handle failures and retries, but the caller still needs to handle InterruptedException. Like most well-behaved blocking library methods, SleepyBoundedBuffer supports cancellation via interrupts.

The problem with SleepyBoundedBuffer is, how long is the sleep time setting reasonable? How can we achieve optimal performance? As shown in the figure below, the condition set by thread B is true, but A is still sleeping at this time. This sleep is the bottleneck of performance.

Well, is there some way to achieve it? When the condition is true, the thread wakes up immediately and executes it?
Let’s give it a try, I’ll explain it to you in the next article!

The above is the detailed content of Implementation of blocking queue for actual state dependency management in JAVA development. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn