Home  >  Article  >  Java  >  Implementation method of handwritten Java LockSupport

Implementation method of handwritten Java LockSupport

WBOY
WBOYforward
2023-05-07 08:25:06561browse

    Preface

    Among the various concurrency tools provided to us by the JDK, such as the internal implementation of ReentrantLock and other tools, a tool is often used , this tool is LockSupport. LockSupport provides us with a very powerful function. It is the most basic primitive for thread blocking. It can block a thread or wake up a thread, so it is often used in concurrent scenarios.

    LockSupport implementation principle

    Before understanding the LockSupport implementation principle, let us first use a case to understand the function of LockSupport!

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
     
    public class Demo {
     
      public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
          System.out.println("park 之前");
          LockSupport.park(); // park 函数可以将调用这个方法的线程挂起
          System.out.println("park 之后");
        });
        thread.start();
        TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程休息了 5s");
        System.out.println("主线程 unpark thread");
        LockSupport.unpark(thread); // 主线程将线程 thread 唤醒 唤醒之后线程 thread 才可以继续执行
      }
    }

    The output of the above code is as follows:

    Before park
    The main thread rested for 5s
    The main thread unpark thread
    After park

    At first glance, the functions implemented by park and unpark of LockSupport and the functions implemented by await and signal seem to be the same, but in fact they are not. Let’s look at the following code:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
     
    public class Demo02 {
      public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("park 之前");
          LockSupport.park(); // 线程 thread 后进行 park 操作 
          System.out.println("park 之后");
        });
        thread.start();
        System.out.println("主线程 unpark thread");
        LockSupport.unpark(thread); // 先进行 unpark 操作
     
      }
    }

    The above code output The results are as follows:

    Main thread unpark thread
    park before
    park after

    In the above code, the main thread will first perform the unpark operation, and then the thread thread only performs the park operation, in this case the program can also be executed normally. But if the signal call is before the await call, the program will not be executed. For example, the following code:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class Demo03 {
     
      private static final ReentrantLock lock = new ReentrantLock();
      private static final Condition condition = lock.newCondition();
     
      public static void thread() throws InterruptedException {
        lock.lock();
     
        try {
          TimeUnit.SECONDS.sleep(5);
          condition.await();
          System.out.println("等待完成");
        }finally {
          lock.unlock();
        }
      }
     
      public static void mainThread() {
        lock.lock();
        try {
          System.out.println("发送信号");
          condition.signal();
        }finally {
          lock.unlock();
          System.out.println("主线程解锁完成");
        }
      }
     
      public static void main(String[] args) {
        Thread thread = new Thread(() -> {
          try {
            thread();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        });
        thread.start();
     
        mainThread();
      }
    }

    The output of the above code is as follows:

    Send signal
    Main thread unlocking completed

    In the above code, "waiting for completion" will never be printed out. This is because the signal function is called before await, and the signal function will only respond to it. The await function executed before has an effect, but will not affect the await called after it.

    What is the reason for this effect?

    In fact, when the JVM implements LockSupport, it will internally maintain a counter variable _counter for each thread. This variable represents the "number of licenses". Only when there is a license, the thread can It can be executed, but the maximum number of licenses at the same time can only be 1. When park is called once, the number of licenses will be reduced by one. When unpark is called once, the counter will be incremented by one, but the value of the counter cannot exceed 1.

    When a thread calls park, it needs to wait for a license. Only after getting the license can the thread continue to execute, or if a license has been obtained before park, then it will not If it needs to be blocked, it can be executed directly.

    Implement your own LockSupport by yourself

    Implementation principle

    In the previous article, we have introduced the principle of locksupport, and its main internal implementation is achieved through a license:

    • The maximum number of licenses that each thread can obtain is 1.

    • When the unpark method is called, the thread can obtain a license. The upper limit of the number of licenses is 1. If there is already a license, the licenses cannot be accumulated.

    • When calling the park method, if the thread calling the park method does not have a license, this thread needs to be suspended until another thread calls the unpark method to issue it to this thread. A license is required before the thread can continue executing. But if the thread already has a license, the thread will not block and can execute directly.

    Implement the LockSupport protocol regulations by ourselves

    In our own implementation of Parker, we can also give each thread a counter to record the number of licenses for the thread. When the license When the number of licenses is greater than or equal to 0, the thread can execute, otherwise the thread needs to be blocked. The specific rules of the protocol are as follows:

    • The number of licenses for the initial thread is 0.

    • If the counter value is equal to 1 and the counter value becomes 0 when we call park, the thread can continue to execute.

    • If the counter value is equal to 0 when we call park, the thread cannot continue to execute. The thread needs to be suspended and the counter value is set to -1.

    • If the counter value of the unparked thread is equal to 0 when we call unpark, we need to change the counter value to 1.

    • If when we call unpark, the value of the counter of the unparked thread is equal to 1, there is no need to change the value of the counter, because the maximum value of the counter is 1.

    • When we call unpark, if the counter value is equal to -1, it means that the thread has been suspended, and the thread needs to be awakened and the counter value needs to be set to 0. .

    Tools

    Because it involves blocking and waking up threads, we can use reentrant locks ReentrantLock and condition variables Condition, so we need to be familiar with the use of these two tools.

    ReentrantLock is mainly used for locking and unlocking, and is used to protect critical sections.

    Condition.awat method is used to block the thread.

    Condition.signal method is used to wake up the thread.

    因为我们在unpark方法当中需要传入具体的线程,将这个线程发放许可证,同时唤醒这个线程,因为是需要针对特定的线程进行唤醒,而condition唤醒的线程是不确定的,因此我们需要为每一个线程维护一个计数器和条件变量,这样每个条件变量只与一个线程相关,唤醒的肯定就是一个特定的线程。我们可以使用HashMap进行实现,键为线程,值为计数器或者条件变量。

    具体实现

    因此综合上面的分析我们的类变量如下:

    private final ReentrantLock lock; // 用于保护临界去
    private final HashMap<Thread, Integer> permits; // 许可证的数量
    private final HashMap<Thread, Condition> conditions; // 用于唤醒和阻塞线程的条件变量

    构造函数主要对变量进行赋值:

    public Parker() {
      lock = new ReentrantLock();
      permits = new HashMap<>();
      conditions = new HashMap<>();
    }

    park方法

    public void park() {
      Thread t = Thread.currentThread(); // 首先得到当前正在执行的线程
      if (conditions.get(t) == null) { // 如果还没有线程对应的condition的话就进行创建
        conditions.put(t, lock.newCondition());
      }
      lock.lock();
      try {
        // 如果许可证变量还没有创建 或者许可证等于0 说明没有许可证了 线程需要被挂起
        if (permits.get(t) == null || permits.get(t) == 0) {
          permits.put(t, -1); // 同时许可证的数目应该设置为-1
          conditions.get(t).await();
        }else if (permits.get(t) > 0) {
          permits.put(t, 0); // 如果许可证的数目大于0 也就是为1 说明线程已经有了许可证因此可以直接被放行 但是需要消耗一个许可证
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }

    unpark方法

    public void unpark(Thread thread) {
      Thread t = thread; // 给线程 thread 发放一个许可证
      lock.lock();
      try {
        if (permits.get(t) == null) // 如果还没有创建许可证变量 说明线程当前的许可证数量等于初始数量也就是0 因此方法许可证之后 许可证的数量为 1
          permits.put(t, 1);
        else if (permits.get(t) == -1) { // 如果许可证数量为-1,则说明肯定线程 thread 调用了park方法,而且线程 thread已经被挂起了 因此在 unpark 函数当中不急需要将许可证数量这是为0 同时还需要将线程唤醒
          permits.put(t, 0);
          conditions.get(t).signal();
        }else if (permits.get(t) == 0) { // 如果许可证数量为0 说明线程正在执行 因此许可证数量加一
          permits.put(t, 1);
        } // 除此之外就是许可证为1的情况了 在这种情况下是不需要进行操作的 因为许可证最大的数量就是1
      }finally {
        lock.unlock();
      }
    }

    完整代码

    import java.util.HashMap;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class Parker {
     
      private final ReentrantLock lock;
      private final HashMap<Thread, Integer> permits;
      private final HashMap<Thread, Condition> conditions;
     
      public Parker() {
        lock = new ReentrantLock();
        permits = new HashMap<>();
        conditions = new HashMap<>();
      }
     
      public void park() {
        Thread t = Thread.currentThread();
        if (conditions.get(t) == null) {
          conditions.put(t, lock.newCondition());
        }
        lock.lock();
        try {
          if (permits.get(t) == null || permits.get(t) == 0) {
            permits.put(t, -1);
            conditions.get(t).await();
          }else if (permits.get(t) > 0) {
            permits.put(t, 0);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
     
      public void unpark(Thread thread) {
        Thread t = thread;
        lock.lock();
        try {
          if (permits.get(t) == null)
            permits.put(t, 1);
          else if (permits.get(t) == -1) {
            permits.put(t, 0);
            conditions.get(t).signal();
          }else if (permits.get(t) == 0) {
            permits.put(t, 1);
          }
        }finally {
          lock.unlock();
        }
      }
    }

    JVM实现一瞥

    其实在JVM底层对于park和unpark的实现也是基于锁和条件变量的,只不过是用更加底层的操作系统和libc(linux操作系统)提供的API进行实现的。虽然API不一样,但是原理是相仿的,思想也相似。

    比如下面的就是JVM实现的unpark方法:

    void Parker::unpark() {
      int s, status;
      // 进行加锁操作 相当于 可重入锁的 lock.lock()
      status = pthread_mutex_lock(_mutex);
      assert (status == 0, "invariant");
      s = _counter;
      _counter = 1;
      if (s < 1) {
        // 如果许可证小于 1 进行下面的操作
        if (WorkAroundNPTLTimedWaitHang) {
          // 这行代码相当于 condition.signal() 唤醒线程
          status = pthread_cond_signal (_cond);
          assert (status == 0, "invariant");
          // 解锁操作 相当于可重入锁的 lock.unlock()
          status = pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant");
        } else {
          status = pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant");
          status = pthread_cond_signal (_cond);
          assert (status == 0, "invariant");
        }
      } else {
        // 如果有许可证 也就是 s == 1 那么不许要将线程挂起
        // 解锁操作 相当于可重入锁的 lock.unlock()
        pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      }
    }

    JVM实现的park方法,如果没有许可证也是会将线程挂起的:

    Implementation method of handwritten Java LockSupport

    The above is the detailed content of Implementation method of handwritten Java LockSupport. For more information, please follow other related articles on the PHP Chinese website!

    Statement:
    This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete