首頁  >  文章  >  Java  >  手寫Java LockSupport的實作方法

手寫Java LockSupport的實作方法

WBOY
WBOY轉載
2023-05-07 08:25:06566瀏覽

    前言

    在JDK當中給我們提供的各種並發工具當中,例如ReentrantLock等等工具的內部實現,經常會使用到一個工具,這個工具就是LockSupport。 LockSupport為我們提供了一個非常強大的功能,它是線程阻塞最基本的元語,他可以將一個線程阻塞也可以將一個線程喚醒,因此經常在並發的場景下進行使用。

    LockSupport實作原理

    在了解LockSupport實作原理之前我們先用一個案例來了解LockSupport的功能!

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
     
    public class Demo {
     
      public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
          System.out.println("park 之前");
          LockSupport.park(); // park 函数可以将调用这个方法的线程挂起
          System.out.println("park 之后");
        });
        thread.start();
        TimeUnit.SECONDS.sleep(5);
        System.out.println("主线程休息了 5s");
        System.out.println("主线程 unpark thread");
        LockSupport.unpark(thread); // 主线程将线程 thread 唤醒 唤醒之后线程 thread 才可以继续执行
      }
    }

    上面的程式碼的輸出如下:

    park 之前
    主執行緒休息了5s
    主執行緒unpark thread
    park 之後

    #乍看上面的LockSupport的park和unpark實現的功能和await和signal實現的功能好像是一樣的,但是其實不然,我們來看下面的程式碼:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
     
    public class Demo02 {
      public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println("park 之前");
          LockSupport.park(); // 线程 thread 后进行 park 操作 
          System.out.println("park 之后");
        });
        thread.start();
        System.out.println("主线程 unpark thread");
        LockSupport.unpark(thread); // 先进行 unpark 操作
     
      }
    }

    上面程式碼輸出結果如下:

    主執行緒unpark thread
    park 之前
    park 之後

    ##在上方的程式碼當中主執行緒會先進行unpark操作,然後執行緒thread才進行park操作,這種情況下程式也可以正常執行。但如果是signal的呼叫在await呼叫之前的話,程式則不會執行完成,例如下面的程式碼:

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class Demo03 {
     
      private static final ReentrantLock lock = new ReentrantLock();
      private static final Condition condition = lock.newCondition();
     
      public static void thread() throws InterruptedException {
        lock.lock();
     
        try {
          TimeUnit.SECONDS.sleep(5);
          condition.await();
          System.out.println("等待完成");
        }finally {
          lock.unlock();
        }
      }
     
      public static void mainThread() {
        lock.lock();
        try {
          System.out.println("发送信号");
          condition.signal();
        }finally {
          lock.unlock();
          System.out.println("主线程解锁完成");
        }
      }
     
      public static void main(String[] args) {
        Thread thread = new Thread(() -> {
          try {
            thread();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        });
        thread.start();
     
        mainThread();
      }
    }

    上面的程式碼輸出如下:

    #發送訊號

    主執行緒解鎖完成

    在上面的程式碼當中「等待完成「總是不會被印出來的,這是因為signal函數的呼叫在await之前,signal函數只會在它先前執行的await函數有效果,對在其後面呼叫的await是不會產生影響的。

    那是什麼原因導致的這個效果呢?

    其實JVM在實作LockSupport的時候,內部會給每一個執行緒維護一個計數器變數_counter,這個變數是表示的意思是“許可證的數量”,只有當有許可證的時候執行緒才可以執行,同時許可證最大的數量只能為1。當調用一次park的時候許可證的數量會減一。當呼叫一次unpark的時候計數器就會加一,但是計數器的值不能超過1。

    當一個執行緒呼叫park之後,他需要等待一個許可證,只有拿到許可證之後這個執行緒才能夠繼續執行,或是在park之前已經取得一個了一個許可證,那麼它就不需要阻塞,直接可以執行。

    自己動手實作自己的LockSupport

    實作原則

    在前文當中我們已經介紹了locksupport的原理,它主要的內部實作就是透過許可證實現的:

    • 每一個執行緒能夠取得的許可證的最大數目就是1。

    • 當呼叫unpark方法時,執行緒可以取得一個許可證,許可證數量的上限是1,如果已經有一個許可證了,那麼許可證就不能累加。

    • 當呼叫park方法的時候,如果呼叫park方法的執行緒沒有許可證的話,則需要將這個執行緒掛起,直到有其他執行緒呼叫unpark方法,給這個執行緒發放一個許可證,線程才能夠繼續執行。但是如果線程已經有了一個許可證,那麼線程將不會阻塞可以直接執行。

    自己實作LockSupport協定規定

    在我們自己實作的Parker當中我們也可以給每個執行緒一個計數器,記錄執行緒的許可證的數目,當許可證的數目大於等於0的時候,執行緒可以執行,反之執行緒需要被阻塞,協定具體規則如下:

    • 初始執行緒的許可證的數目為0。

    • 如果我們在呼叫park的時候,計數器的值等於1,計數器的值變成0,則執行緒可以繼續執行。

    • 如果我們在呼叫park的時候,計數器的值等於0,則執行緒不可以繼續執行,需要將執行緒掛起,且將計數器的值設為-1。

    • 如果我們在呼叫unpark的時候,被unpark的執行緒的計數器的值等於0,則需要將計數器的值變成1。

    • 如果我們在呼叫unpark的時候,被unpark的執行緒的計數器的值等於1,則不需要改變計數器的值,因為計數器的最大值就是1。

    • 我們在呼叫unpark的時候,如果計數器的值等於-1,表示執行緒已經被掛起了,則需要將執行緒喚醒,同時需要將計數器的值設為0 。

    工具

    因為涉及執行緒的阻斷和喚醒,我們可以使用可重入鎖定ReentrantLock和條件變數Condition,因此需要熟悉這兩個工具的使用。

    ReentrantLock 主要用於加鎖和開鎖,用於保護臨界區。

    Condition.awat 方法用於阻塞執行緒。

    Condition.signal 方法用於將執行緒喚醒。

    因为我们在unpark方法当中需要传入具体的线程,将这个线程发放许可证,同时唤醒这个线程,因为是需要针对特定的线程进行唤醒,而condition唤醒的线程是不确定的,因此我们需要为每一个线程维护一个计数器和条件变量,这样每个条件变量只与一个线程相关,唤醒的肯定就是一个特定的线程。我们可以使用HashMap进行实现,键为线程,值为计数器或者条件变量。

    具体实现

    因此综合上面的分析我们的类变量如下:

    private final ReentrantLock lock; // 用于保护临界去
    private final HashMap<Thread, Integer> permits; // 许可证的数量
    private final HashMap<Thread, Condition> conditions; // 用于唤醒和阻塞线程的条件变量

    构造函数主要对变量进行赋值:

    public Parker() {
      lock = new ReentrantLock();
      permits = new HashMap<>();
      conditions = new HashMap<>();
    }

    park方法

    public void park() {
      Thread t = Thread.currentThread(); // 首先得到当前正在执行的线程
      if (conditions.get(t) == null) { // 如果还没有线程对应的condition的话就进行创建
        conditions.put(t, lock.newCondition());
      }
      lock.lock();
      try {
        // 如果许可证变量还没有创建 或者许可证等于0 说明没有许可证了 线程需要被挂起
        if (permits.get(t) == null || permits.get(t) == 0) {
          permits.put(t, -1); // 同时许可证的数目应该设置为-1
          conditions.get(t).await();
        }else if (permits.get(t) > 0) {
          permits.put(t, 0); // 如果许可证的数目大于0 也就是为1 说明线程已经有了许可证因此可以直接被放行 但是需要消耗一个许可证
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }

    unpark方法

    public void unpark(Thread thread) {
      Thread t = thread; // 给线程 thread 发放一个许可证
      lock.lock();
      try {
        if (permits.get(t) == null) // 如果还没有创建许可证变量 说明线程当前的许可证数量等于初始数量也就是0 因此方法许可证之后 许可证的数量为 1
          permits.put(t, 1);
        else if (permits.get(t) == -1) { // 如果许可证数量为-1,则说明肯定线程 thread 调用了park方法,而且线程 thread已经被挂起了 因此在 unpark 函数当中不急需要将许可证数量这是为0 同时还需要将线程唤醒
          permits.put(t, 0);
          conditions.get(t).signal();
        }else if (permits.get(t) == 0) { // 如果许可证数量为0 说明线程正在执行 因此许可证数量加一
          permits.put(t, 1);
        } // 除此之外就是许可证为1的情况了 在这种情况下是不需要进行操作的 因为许可证最大的数量就是1
      }finally {
        lock.unlock();
      }
    }

    完整代码

    import java.util.HashMap;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class Parker {
     
      private final ReentrantLock lock;
      private final HashMap<Thread, Integer> permits;
      private final HashMap<Thread, Condition> conditions;
     
      public Parker() {
        lock = new ReentrantLock();
        permits = new HashMap<>();
        conditions = new HashMap<>();
      }
     
      public void park() {
        Thread t = Thread.currentThread();
        if (conditions.get(t) == null) {
          conditions.put(t, lock.newCondition());
        }
        lock.lock();
        try {
          if (permits.get(t) == null || permits.get(t) == 0) {
            permits.put(t, -1);
            conditions.get(t).await();
          }else if (permits.get(t) > 0) {
            permits.put(t, 0);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
     
      public void unpark(Thread thread) {
        Thread t = thread;
        lock.lock();
        try {
          if (permits.get(t) == null)
            permits.put(t, 1);
          else if (permits.get(t) == -1) {
            permits.put(t, 0);
            conditions.get(t).signal();
          }else if (permits.get(t) == 0) {
            permits.put(t, 1);
          }
        }finally {
          lock.unlock();
        }
      }
    }

    JVM实现一瞥

    其实在JVM底层对于park和unpark的实现也是基于锁和条件变量的,只不过是用更加底层的操作系统和libc(linux操作系统)提供的API进行实现的。虽然API不一样,但是原理是相仿的,思想也相似。

    比如下面的就是JVM实现的unpark方法:

    void Parker::unpark() {
      int s, status;
      // 进行加锁操作 相当于 可重入锁的 lock.lock()
      status = pthread_mutex_lock(_mutex);
      assert (status == 0, "invariant");
      s = _counter;
      _counter = 1;
      if (s < 1) {
        // 如果许可证小于 1 进行下面的操作
        if (WorkAroundNPTLTimedWaitHang) {
          // 这行代码相当于 condition.signal() 唤醒线程
          status = pthread_cond_signal (_cond);
          assert (status == 0, "invariant");
          // 解锁操作 相当于可重入锁的 lock.unlock()
          status = pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant");
        } else {
          status = pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant");
          status = pthread_cond_signal (_cond);
          assert (status == 0, "invariant");
        }
      } else {
        // 如果有许可证 也就是 s == 1 那么不许要将线程挂起
        // 解锁操作 相当于可重入锁的 lock.unlock()
        pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      }
    }

    JVM实现的park方法,如果没有许可证也是会将线程挂起的:

    手寫Java LockSupport的實作方法

    以上是手寫Java LockSupport的實作方法的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除