java 并發編程學習筆記(六)之 AQS (AbstractQueuedSynchronizer)

      網友投稿 722 2025-03-31

      AQS


      (1)aqs

      使用node實現fifo隊列,可以用于構建鎖或者其他的同步裝置的基礎框架

      利用了一個int類型表示狀態

      使用方法是繼承

      子類通過繼承并通過實現它的方法管理其狀態{acquire 和 release}

      可以同時實現排他鎖和共享鎖模式(獨占,共享)

      (2)CountDownLatch

      /**

      * 一般用于 當主線程需要等待 子線程執行完成之后 再執行的場景

      * * 線程名稱 子線程結束

      * * thread1 ---------------------- end |

      * * thread2 ---------------------- end | 主線程 主線程結束

      * * thread3 ---------------------- end | --------------------- end

      * * thread4 ---------------------- end |

      * * thread5 ---------------------- end |

      * *

      *

      */

      @Slf4j

      public class CountDownLatchExample1 {

      private static int threadCount = 200;

      public static void main(String[] args) throws InterruptedException {

      ExecutorService service = Executors.newCachedThreadPool();

      final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

      for (int i = 0; i < threadCount; i++) {

      final int threadNum = i;

      service.execute(() -> {

      try {

      test(threadNum);

      } catch (InterruptedException e) {

      log.error("exception",e);

      } finally {

      countDownLatch.countDown();

      }

      });

      }

      // countDownLatch.await();

      countDownLatch.await(10, TimeUnit.MILLISECONDS); //超時等待

      log.info("finish");

      service.shutdown();

      }

      private static void test(int threadNum) throws InterruptedException {

      log.info("{}", threadNum);

      Thread.sleep(100);

      }

      }

      (3)Semaphore

      /**

      * 一般用于控制同一時刻運行的線程數量

      * | |

      * ---------|----- |---------

      * | |

      * ---------|----- |---------

      * | |

      */

      @Slf4j

      public class SemaphoreExample1 {

      private static int threadCount = 20;

      public static void main(String[] args) throws InterruptedException {

      ExecutorService service = Executors.newCachedThreadPool();

      final Semaphore semaphore = new Semaphore(3);

      for (int i = 0; i < threadCount; i++) {

      final int threadNum = i;

      service.execute(() -> {

      try {

      java 并發編程學習筆記(六)之 AQS (AbstractQueuedSynchronizer)

      //semaphore.acquire(); 每次拿一個許可證

      //semaphore.acquire(3); 每次拿三個許可證

      semaphore.tryAcquire(3);

      semaphore.tryAcquire(1,TimeUnit.SECONDS); //嘗試一秒之內獲取許可

      semaphore.tryAcquire(3,1,TimeUnit.SECONDS);

      if(semaphore.tryAcquire()) { //嘗試獲取許可證 ,沒有獲取到直接將當前線程丟棄

      test(threadNum);

      semaphore.release();

      }else {

      log.info(Thread.currentThread().getName()+"我沒有拿到許可證,┭┮﹏┭┮");

      }

      } catch (InterruptedException e) {

      log.error("exception", e);

      }

      });

      }

      service.shutdown();

      }

      private static void test(int threadNum) throws InterruptedException {

      log.info("{}", threadNum);

      Thread.sleep(1000);

      }

      }

      (4)CyclicBarrier

      /**

      * 一般用于多個線程之間相互等待,當全部都到達某個屏障點的時候在,繼續執行每個線程,并且可以重復循環使用

      * 線程名稱 某個屏障點 終點

      * thread1 ------------| ---------- end

      * thread2 ------------| ---------- end

      * thread3 ------------| ---------- end

      * thread4 ------------| ---------- end

      * thread5 ------------| ---------- end

      *

      */

      @Slf4j

      public class CyclicBarrierExample1 {

      // private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

      private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {

      log.info("五個線程都已經準備就緒");

      });

      public static void main(String[] args) throws InterruptedException {

      ExecutorService service = Executors.newCachedThreadPool();

      for (int i = 0; i < 10; i++) {

      final int threadNum = i;

      Thread.sleep(1000);

      service.execute(() -> {

      try {

      race(threadNum);

      } catch (InterruptedException | BrokenBarrierException e) {

      e.printStackTrace();

      }

      });

      }

      service.shutdown();

      }

      private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {

      Thread.sleep(1000);

      log.info("{} is ready", threadNum);

      cyclicBarrier.await();

      try {

      // cyclicBarrier.await(1, TimeUnit.SECONDS);

      } catch (Exception e) {

      // e.printStackTrace();

      }

      log.info("{} continue", threadNum);

      }

      }

      (5)鎖

      鎖的簡單使用:

      /**

      * class Point {

      * * private double x, y;

      * * private final StampedLock sl = new StampedLock();

      * *

      * * void move(double deltaX, double deltaY) { // an exclusively locked method

      * * long stamp = sl.writeLock();

      * * try {

      * * x += deltaX;

      * * y += deltaY;

      * * } finally {

      * * sl.unlockWrite(stamp);

      * * }

      * * }

      * *

      * * double distanceFromOrigin() { // A read-only method

      * * long stamp = sl.tryOptimisticRead();

      * * double currentX = x, currentY = y;

      * * if (!sl.validate(stamp)) {

      * * stamp = sl.readLock();

      * * try {

      * * currentX = x;

      * * currentY = y;

      * * } finally {

      * * sl.unlockRead(stamp);

      * * }

      * * }

      * * return Math.sqrt(currentX * currentX + currentY * currentY);

      * * }

      * *

      * * void moveIfAtOrigin(double newX, double newY) { // upgrade

      * * // Could instead start with optimistic, not read mode

      * * long stamp = sl.readLock();

      * * try {

      * * while (x == 0.0 && y == 0.0) {

      * * long ws = sl.tryConvertToWriteLock(stamp);

      * * if (ws != 0L) {

      * * stamp = ws;

      * * x = newX;

      * * y = newY;

      * * break;

      * * }

      * * else {

      * * sl.unlockRead(stamp);

      * * stamp = sl.writeLock();

      * * }

      * * }

      * * } finally {

      * * sl.unlock(stamp);

      * * }

      * * }

      * * }}

      * *

      */

      @Slf4j

      public class LockExample1 {

      private static int clientTotal = 5000;

      private static int threadTotal = 200;

      private static int count = 0;

      //重入鎖

      private final static Lock lock = new ReentrantLock();

      //重入讀寫鎖

      private final static Map map = new TreeMap();

      private final static ReentrantReadWriteLock reenLock = new ReentrantReadWriteLock();

      private final static Lock readLock = reenLock.readLock();

      private final static Lock writeLock = reenLock.writeLock();

      //stamped鎖

      private final static StampedLock stampedLock = new StampedLock();

      //condition

      private final static ReentrantLock REENTRANT_LOCK = new ReentrantLock();

      private final static Condition condition = REENTRANT_LOCK.newCondition();

      //重入讀寫鎖的使用

      public Integer getValue(Integer key) {

      readLock.lock();

      Integer value = null;

      try {

      value = map.get(key);

      } catch (Exception e) {

      e.printStackTrace();

      } finally {

      readLock.unlock();

      }

      return value;

      }

      //重入讀寫鎖的使用

      public Set getSet() {

      Set set = null;

      readLock.lock();

      try {

      set = map.keySet();

      } catch (Exception e) {

      e.printStackTrace();

      } finally {

      readLock.unlock();

      }

      return set;

      }

      /**

      * 重入讀寫鎖的使用

      * @param key

      * @param value

      * @return

      */

      public Integer put(Integer key, Integer value) {

      writeLock.lock();

      try {

      map.put(key, value);

      } catch (Exception e) {

      e.printStackTrace();

      } finally {

      writeLock.unlock();

      }

      return value;

      }

      /**

      * condition的使用

      */

      public static void testCond(){

      new Thread(() -> {

      try {

      REENTRANT_LOCK.lock();

      log.info("運動員獲取鎖");

      condition.await();

      log.info("運動員接收到信號,比賽開始~~~~");

      }catch (Exception e){

      e.printStackTrace();

      }finally {

      REENTRANT_LOCK.unlock();

      }

      }).start();

      new Thread(() -> {

      try {

      REENTRANT_LOCK.lock();

      log.info("裁判獲取鎖");

      Thread.sleep(3000);

      log.info("裁判發送信號");

      condition.signalAll();

      }catch (Exception e){

      e.printStackTrace();

      }finally {

      REENTRANT_LOCK.unlock();

      }

      }).start();

      }

      public static void main(String[] args) throws InterruptedException {

      ExecutorService service = Executors.newCachedThreadPool();

      final Semaphore semaphore = new Semaphore(threadTotal);

      final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

      for (int i = 0; i < clientTotal; i++) {

      service.execute(() -> {

      try {

      semaphore.acquire();

      add();

      semaphore.release();

      } catch (InterruptedException e) {

      e.printStackTrace();

      } finally {

      countDownLatch.countDown();

      }

      });

      }

      countDownLatch.await();

      service.shutdown();

      log.info("count {}", count);

      testCond();

      }

      /**

      * stampedLock的使用方式

      */

      private static void add() {

      long stamp = stampedLock.writeLock();

      try {

      count++;

      } catch (Exception e) {

      } finally {

      stampedLock.unlock(stamp);

      }

      }

      }

      同步鎖synchronized不是JUC中的鎖但也順便提下,它是由synchronized 關鍵字進行同步,實現對競爭資源互斥訪問的鎖。

      同步鎖的原理:對于每一個對象,有且僅有一個同步鎖;不同的線程能共同訪問該同步鎖。在同一個時間點該同步鎖能且只能被一個線程獲取到,其他線程都得等待。

      另外:synchronized是Java中的關鍵字且是內置的語言實現;它是在JVM層面上實現的,不但可以通過一些監控工具監控synchronized的鎖定,而且在代碼執行時出現異常,JVM會自動釋放鎖定;synchronized等待的線程會一直等待下去,不能響應中斷。

      重入鎖ReentrantLock,顧名思義:就是支持重進入的鎖,它表示該鎖能夠支持一個線程對資源的重復加鎖。另外該鎖孩紙獲取鎖時的公平和非公平性選擇,所以它包含公平鎖與非公平鎖(它們兩也可以叫可重入鎖)。首先提出兩個疑問:它怎么實現重進入呢?釋放邏輯還跟AQS中一樣嗎?

      ReentrantLock 獨有的功能

      可指定是公平鎖還是非公共鎖,公平鎖就是 先等待的線程先獲得鎖

      提供了一個condition類,可以分組喚醒需要的線程,

      提供了能夠中斷等待鎖的線程機制,lock.lockInterruptibly()

      非公平鎖

      final boolean nonfairTryAcquire(int acquires) {

      final Thread current = Thread.currentThread();

      int c = getState();

      if (c == 0) {

      if (compareAndSetState(0, acquires)) {

      setExclusiveOwnerThread(current);

      return true;

      }

      }

      // 同步狀態已經被其他線程占用,則判斷當前線程是否與被占用的線程是同一個線程,如果是同一個線程則允許獲取,并state+1

      else if (current == getExclusiveOwnerThread()) {

      int nextc = c + acquires;

      if (nextc < 0) // overflow

      throw new Error("Maximum lock count exceeded");

      setState(nextc);

      return true;

      }

      return false;

      }

      該方法增加了再次獲取同步狀態的處理邏輯:通過判斷當前線程是否為獲取鎖的線程來決定獲取操作是否成功。如果是獲取鎖的線程再次請求,則將同步狀態值進行增加并返回true,表示獲取同步狀態成功。

      protected final boolean tryRelease(int releases) {

      int c = getState() - releases;

      if (Thread.currentThread() != getExclusiveOwnerThread())

      throw new IllegalMonitorStateException();

      boolean free = false;

      if (c == 0) {

      free = true;

      setExclusiveOwnerThread(null);

      }

      setState(c);

      return free;

      }

      上面代碼是釋放鎖的代碼。如果該鎖被獲取了n次,那么前(n-1)次都是返回false,直至state=0,將占有線程設置為null,并返回true,表示釋放成功。

      公平鎖

      公平鎖與非公平鎖有啥區別呢? 還是從源碼中分析吧。

      protected final boolean tryAcquire(int acquires) {

      final Thread current = Thread.currentThread();

      int c = getState();

      if (c == 0) {

      // 區別:增加判斷同步隊列中當前節點是否有前驅節點的判斷

      if (!hasQueuedPredecessors() &&

      compareAndSetState(0, acquires)) {

      setExclusiveOwnerThread(current);

      return true;

      }

      }

      // 一樣支持重入

      else if (current == getExclusiveOwnerThread()) {

      int nextc = c + acquires;

      if (nextc < 0)

      throw new Error("Maximum lock count exceeded");

      setState(nextc);

      return true;

      }

      return false;

      }

      與非公平鎖的唯一不同就是增加了一個判斷條件:判斷同步隊列中當前節點是否有前驅節點的判斷,如果方法返回true,則表示有線程比當前線程更早地請求獲取鎖,因此需要等待前驅線程獲取并釋放鎖之后才能繼續獲取鎖。

      公平鎖與非公平鎖的區別

      從上面源碼中得知,公平性鎖保證了鎖的獲取按照FIFO原則,但是代價就是進行大量的線程切換。而非公平性鎖,可能會造成線程“饑餓”(不會保證先進來的就會先獲取),但是極少線程的切換,保證了更大的吞吐量。下面我們看下案例:

      import org.junit.Test;

      import Java.util.*;

      import java.util.concurrent.*;

      import java.util.concurrent.locks.Lock;

      import java.util.concurrent.locks.ReentrantLock;

      public class FairAndUnfairTest {

      private static Lock fairLock = new ReentrantLock2(true);

      private static Lock unFairLock = new ReentrantLock2(false);

      @Test

      public void fair() throws Exception{

      testLock(fairLock);

      }

      @Test

      public void unFairLock() throws Exception{

      testLock(unFairLock);

      }

      private static void testLock(Lock lock) throws InterruptedException, ExecutionException {

      ExecutorService threadPool = Executors.newFixedThreadPool(5);

      List> list = new ArrayList<>();

      for (int i = 0 ; i < 5; i++) {

      Future future = threadPool.submit(new Job(lock));

      list.add(future);

      }

      long cost = 0;

      for (Future future : list) {

      cost += future.get();

      }

      // 查看五個線程所需耗時的時間

      System.out.println("cost:" + cost + " ms");

      }

      private static class Job implements Callable {

      private Lock lock;

      public Job(Lock lock) {

      this.lock = lock;

      }

      @Override

      public Long call() throws Exception {

      long st = System.currentTimeMillis();

      // 同一線程獲取100鎖

      for (int i =0; i < 100; i ++) {

      lock.lock();

      try {

      System.out.println("Lock by[" + Thread.currentThread().getId() + "]," +

      "Waiting by[" + printThread(((ReentrantLock2)lock).getQueuedThreads()) + "]");

      } catch (Exception e) {

      e.printStackTrace();

      } finally {

      lock.unlock();

      }

      }

      // 返回100次所需的時間

      return System.currentTimeMillis() - st;

      }

      private String printThread(Collection list) {

      StringBuilder ids = new StringBuilder();

      for (Thread t : list) {

      ids.append(t.getId()).append(",");

      }

      return ids.toString();

      }

      }

      private static class ReentrantLock2 extends ReentrantLock {

      public ReentrantLock2(boolean fair) {

      super(fair);

      }

      public Collection getQueuedThreads() {

      List arrayList = new ArrayList<>(super.getQueuedThreads());

      Collections.reverse(arrayList);

      return arrayList;

      }

      }

      }

      非公平性鎖的測試結果,cost:117 ms

      公平性鎖的測試結果,cost:193 ms

      讀寫鎖

      讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得并發性相比一般的排他鎖(同一時刻只允許一個線程進行訪問)有了很大的提升。

      下面我們看下它有啥特性:

      特性

      說明

      公平性選擇

      可重入

      該鎖支持可重進入。

      讀線程在獲取了讀鎖之后能夠再次獲取讀鎖。

      寫線程在獲取了寫鎖之后能夠再次獲取寫鎖。

      鎖降級

      遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成讀鎖。

      排他性

      當寫線程訪問時,其他讀寫線程均被阻塞

      另外讀寫鎖是采取一個整型變量來維護多種狀態。高16位表示讀,低16位表示寫。

      // 偏移位

      static final int SHARED_SHIFT = 16;

      static final int SHARED_UNIT = (1 << SHARED_SHIFT);

      // 讀寫線程允許占用的最大數

      static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

      // 獨占標志

      static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

      下面從源碼中找出這些特性,先看下寫鎖的實現:

      1 protected final boolean tryAcquire(int acquires) {

      2

      3 Thread current = Thread.currentThread();

      4 int c = getState();

      5 // 表示獨占個數,也就是與低16位進行與運算。

      6 int w = exclusiveCount(c);

      7 if (c != 0) {

      8 // c!=0 且 w==0表示不存在寫線程,但存在讀線程

      9 if (w == 0 || current != getExclusiveOwnerThread())

      10 return false;

      11 if (w + exclusiveCount(acquires) > MAX_COUNT)

      12 throw new Error("Maximum lock count exceeded");

      13 /**

      14 * 獲取寫鎖的條件:

      15 * 不能存在讀線程且當前線程是當前占用鎖的線程(這里體現可重入性和排他性);

      16 * 當前占用鎖的次數不能超過最大數

      17 */

      18 setState(c + acquires);

      19 return true;

      20 }

      21 if (writerShouldBlock() ||

      22 !compareAndSetState(c, c + acquires))

      23 return false;

      24 setExclusiveOwnerThread(current);

      25 return true;

      26 }

      27 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

      獲取讀鎖源碼如下:

      protected final int tryAcquireShared(int unused) {

      Thread current = Thread.currentThread();

      int c = getState();

      /**

      * exclusiveCount(c) != 0: 表示有寫線程在占用

      * getExclusiveOwnerThread() != current : 當前占用鎖的線程不是當前線程。

      * 如果上面兩個條件同時滿足,則獲取失敗。

      * 上面表明如果當前線程是擁有寫鎖的線程可以獲取讀鎖(體現可重入和鎖降級)。

      */

      if (exclusiveCount(c) != 0 &&

      getExclusiveOwnerThread() != current)

      return -1;

      int r = sharedCount(c);

      if (!readerShouldBlock() &&

      r < MAX_COUNT &&

      compareAndSetState(c, c + SHARED_UNIT)) {

      if (r == 0) {

      firstReader = current;

      firstReaderHoldCount = 1;

      } else if (firstReader == current) {

      firstReaderHoldCount++;

      } else {

      HoldCounter rh = cachedHoldCounter;

      if (rh == null || rh.tid != getThreadId(current))

      cachedHoldCounter = rh = readHolds.get();

      else if (rh.count == 0)

      readHolds.set(rh);

      rh.count++;

      }

      return 1;

      }

      return fullTryAcquireShared(current);

      }

      Java 任務調度

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:哪些因素決定了企業數字化轉型的時間和內容?
      下一篇:JS-面向對象程序設計
      相關文章
      亚洲精品A在线观看| 国产成人久久精品亚洲小说| 久久久久亚洲精品天堂久久久久久| 色综合久久精品亚洲国产| 亚洲日本va在线观看| 91嫩草亚洲精品| 亚洲春色另类小说| 亚洲国产精品久久丫| 亚洲激情校园春色| 亚洲高清资源在线观看| 亚洲精品视频在线观看免费| 亚洲第一香蕉视频| 亚洲成A∨人片在线观看无码| 亚洲福利视频网址| 亚洲国产美女福利直播秀一区二区| 久久精品国产亚洲AV电影| 久久亚洲精品无码VA大香大香| 亚洲黄色网址大全| 亚洲欧洲尹人香蕉综合| 97se亚洲国产综合自在线| 亚洲国产美女精品久久久| 亚洲乱亚洲乱妇无码| 在线观看亚洲视频| 亚洲色偷偷狠狠综合网| 亚洲精品tv久久久久久久久 | 亚洲精品无码不卡在线播放HE | 国产亚洲老熟女视频| 国产亚洲精品福利在线无卡一| 亚洲色偷偷综合亚洲AV伊人| 亚洲真人无码永久在线| 亚洲AV无码专区国产乱码电影| 亚洲午夜精品一区二区| 亚洲图片中文字幕| 亚洲色精品VR一区区三区 | 久久久综合亚洲色一区二区三区| 亚洲色图国产精品| 亚洲Av无码一区二区二三区| 亚洲日本在线电影| 亚洲AV无码不卡在线观看下载| 亚洲熟妇丰满多毛XXXX| 亚洲国产精品人久久|