java 并發編程學習筆記(六)之 AQS (AbstractQueuedSynchronizer)
AQS

(1)aqs
使用node實現fifo隊列,可以用于構建鎖或者其他的同步裝置的基礎框架
利用了一個int類型表示狀態
使用方法是繼承
子類通過繼承并通過實現它的方法管理其狀態{acquire 和 release}
可以同時實現排他鎖和共享鎖模式(獨占,共享)
(2)CountDownLatch
/**
* 一般用于 當主線程需要等待 子線程執行完成之后 再執行的場景
* * 線程名稱 子線程結束
* * thread1 ---------------------- end |
* * thread2 ---------------------- end | 主線程 主線程結束
* * thread3 ---------------------- end | --------------------- end
* * thread4 ---------------------- end |
* * thread5 ---------------------- end |
* *
*
*/
@Slf4j
public class CountDownLatchExample1 {
private static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
service.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("exception",e);
} finally {
countDownLatch.countDown();
}
});
}
// countDownLatch.await();
countDownLatch.await(10, TimeUnit.MILLISECONDS); //超時等待
log.info("finish");
service.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(100);
}
}
(3)Semaphore
/**
* 一般用于控制同一時刻運行的線程數量
* | |
* ---------|----- |---------
* | |
* ---------|----- |---------
* | |
*/
@Slf4j
public class SemaphoreExample1 {
private static int threadCount = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
service.execute(() -> {
try {
//semaphore.acquire(); 每次拿一個許可證
//semaphore.acquire(3); 每次拿三個許可證
semaphore.tryAcquire(3);
semaphore.tryAcquire(1,TimeUnit.SECONDS); //嘗試一秒之內獲取許可
semaphore.tryAcquire(3,1,TimeUnit.SECONDS);
if(semaphore.tryAcquire()) { //嘗試獲取許可證 ,沒有獲取到直接將當前線程丟棄
test(threadNum);
semaphore.release();
}else {
log.info(Thread.currentThread().getName()+"我沒有拿到許可證,┭┮﹏┭┮");
}
} catch (InterruptedException e) {
log.error("exception", e);
}
});
}
service.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
(4)CyclicBarrier
/**
* 一般用于多個線程之間相互等待,當全部都到達某個屏障點的時候在,繼續執行每個線程,并且可以重復循環使用
* 線程名稱 某個屏障點 終點
* thread1 ------------| ---------- end
* thread2 ------------| ---------- end
* thread3 ------------| ---------- end
* thread4 ------------| ---------- end
* thread5 ------------| ---------- end
*
*/
@Slf4j
public class CyclicBarrierExample1 {
// private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {
log.info("五個線程都已經準備就緒");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
service.execute(() -> {
try {
race(threadNum);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
cyclicBarrier.await();
try {
// cyclicBarrier.await(1, TimeUnit.SECONDS);
} catch (Exception e) {
// e.printStackTrace();
}
log.info("{} continue", threadNum);
}
}
(5)鎖
鎖的簡單使用:
/**
* class Point {
* * private double x, y;
* * private final StampedLock sl = new StampedLock();
* *
* * void move(double deltaX, double deltaY) { // an exclusively locked method
* * long stamp = sl.writeLock();
* * try {
* * x += deltaX;
* * y += deltaY;
* * } finally {
* * sl.unlockWrite(stamp);
* * }
* * }
* *
* * double distanceFromOrigin() { // A read-only method
* * long stamp = sl.tryOptimisticRead();
* * double currentX = x, currentY = y;
* * if (!sl.validate(stamp)) {
* * stamp = sl.readLock();
* * try {
* * currentX = x;
* * currentY = y;
* * } finally {
* * sl.unlockRead(stamp);
* * }
* * }
* * return Math.sqrt(currentX * currentX + currentY * currentY);
* * }
* *
* * void moveIfAtOrigin(double newX, double newY) { // upgrade
* * // Could instead start with optimistic, not read mode
* * long stamp = sl.readLock();
* * try {
* * while (x == 0.0 && y == 0.0) {
* * long ws = sl.tryConvertToWriteLock(stamp);
* * if (ws != 0L) {
* * stamp = ws;
* * x = newX;
* * y = newY;
* * break;
* * }
* * else {
* * sl.unlockRead(stamp);
* * stamp = sl.writeLock();
* * }
* * }
* * } finally {
* * sl.unlock(stamp);
* * }
* * }
* * }}
* *
*/
@Slf4j
public class LockExample1 {
private static int clientTotal = 5000;
private static int threadTotal = 200;
private static int count = 0;
//重入鎖
private final static Lock lock = new ReentrantLock();
//重入讀寫鎖
private final static Map
private final static ReentrantReadWriteLock reenLock = new ReentrantReadWriteLock();
private final static Lock readLock = reenLock.readLock();
private final static Lock writeLock = reenLock.writeLock();
//stamped鎖
private final static StampedLock stampedLock = new StampedLock();
//condition
private final static ReentrantLock REENTRANT_LOCK = new ReentrantLock();
private final static Condition condition = REENTRANT_LOCK.newCondition();
//重入讀寫鎖的使用
public Integer getValue(Integer key) {
readLock.lock();
Integer value = null;
try {
value = map.get(key);
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
return value;
}
//重入讀寫鎖的使用
public Set
Set
readLock.lock();
try {
set = map.keySet();
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
return set;
}
/**
* 重入讀寫鎖的使用
* @param key
* @param value
* @return
*/
public Integer put(Integer key, Integer value) {
writeLock.lock();
try {
map.put(key, value);
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
return value;
}
/**
* condition的使用
*/
public static void testCond(){
new Thread(() -> {
try {
REENTRANT_LOCK.lock();
log.info("運動員獲取鎖");
condition.await();
log.info("運動員接收到信號,比賽開始~~~~");
}catch (Exception e){
e.printStackTrace();
}finally {
REENTRANT_LOCK.unlock();
}
}).start();
new Thread(() -> {
try {
REENTRANT_LOCK.lock();
log.info("裁判獲取鎖");
Thread.sleep(3000);
log.info("裁判發送信號");
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
REENTRANT_LOCK.unlock();
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
service.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
service.shutdown();
log.info("count {}", count);
testCond();
}
/**
* stampedLock的使用方式
*/
private static void add() {
long stamp = stampedLock.writeLock();
try {
count++;
} catch (Exception e) {
} finally {
stampedLock.unlock(stamp);
}
}
}
同步鎖synchronized不是JUC中的鎖但也順便提下,它是由synchronized 關鍵字進行同步,實現對競爭資源互斥訪問的鎖。
同步鎖的原理:對于每一個對象,有且僅有一個同步鎖;不同的線程能共同訪問該同步鎖。在同一個時間點該同步鎖能且只能被一個線程獲取到,其他線程都得等待。
另外:synchronized是Java中的關鍵字且是內置的語言實現;它是在JVM層面上實現的,不但可以通過一些監控工具監控synchronized的鎖定,而且在代碼執行時出現異常,JVM會自動釋放鎖定;synchronized等待的線程會一直等待下去,不能響應中斷。
重入鎖ReentrantLock,顧名思義:就是支持重進入的鎖,它表示該鎖能夠支持一個線程對資源的重復加鎖。另外該鎖孩紙獲取鎖時的公平和非公平性選擇,所以它包含公平鎖與非公平鎖(它們兩也可以叫可重入鎖)。首先提出兩個疑問:它怎么實現重進入呢?釋放邏輯還跟AQS中一樣嗎?
ReentrantLock 獨有的功能
可指定是公平鎖還是非公共鎖,公平鎖就是 先等待的線程先獲得鎖
提供了一個condition類,可以分組喚醒需要的線程,
提供了能夠中斷等待鎖的線程機制,lock.lockInterruptibly()
非公平鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 同步狀態已經被其他線程占用,則判斷當前線程是否與被占用的線程是同一個線程,如果是同一個線程則允許獲取,并state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
該方法增加了再次獲取同步狀態的處理邏輯:通過判斷當前線程是否為獲取鎖的線程來決定獲取操作是否成功。如果是獲取鎖的線程再次請求,則將同步狀態值進行增加并返回true,表示獲取同步狀態成功。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
上面代碼是釋放鎖的代碼。如果該鎖被獲取了n次,那么前(n-1)次都是返回false,直至state=0,將占有線程設置為null,并返回true,表示釋放成功。
公平鎖
公平鎖與非公平鎖有啥區別呢? 還是從源碼中分析吧。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 區別:增加判斷同步隊列中當前節點是否有前驅節點的判斷
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 一樣支持重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
與非公平鎖的唯一不同就是增加了一個判斷條件:判斷同步隊列中當前節點是否有前驅節點的判斷,如果方法返回true,則表示有線程比當前線程更早地請求獲取鎖,因此需要等待前驅線程獲取并釋放鎖之后才能繼續獲取鎖。
公平鎖與非公平鎖的區別
從上面源碼中得知,公平性鎖保證了鎖的獲取按照FIFO原則,但是代價就是進行大量的線程切換。而非公平性鎖,可能會造成線程“饑餓”(不會保證先進來的就會先獲取),但是極少線程的切換,保證了更大的吞吐量。下面我們看下案例:
import org.junit.Test;
import Java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class FairAndUnfairTest {
private static Lock fairLock = new ReentrantLock2(true);
private static Lock unFairLock = new ReentrantLock2(false);
@Test
public void fair() throws Exception{
testLock(fairLock);
}
@Test
public void unFairLock() throws Exception{
testLock(unFairLock);
}
private static void testLock(Lock lock) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
List
for (int i = 0 ; i < 5; i++) {
Future
list.add(future);
}
long cost = 0;
for (Future
cost += future.get();
}
// 查看五個線程所需耗時的時間
System.out.println("cost:" + cost + " ms");
}
private static class Job implements Callable
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
@Override
public Long call() throws Exception {
long st = System.currentTimeMillis();
// 同一線程獲取100鎖
for (int i =0; i < 100; i ++) {
lock.lock();
try {
System.out.println("Lock by[" + Thread.currentThread().getId() + "]," +
"Waiting by[" + printThread(((ReentrantLock2)lock).getQueuedThreads()) + "]");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 返回100次所需的時間
return System.currentTimeMillis() - st;
}
private String printThread(Collection
StringBuilder ids = new StringBuilder();
for (Thread t : list) {
ids.append(t.getId()).append(",");
}
return ids.toString();
}
}
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
public Collection
List
Collections.reverse(arrayList);
return arrayList;
}
}
}
非公平性鎖的測試結果,cost:117 ms
公平性鎖的測試結果,cost:193 ms
讀寫鎖
讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得并發性相比一般的排他鎖(同一時刻只允許一個線程進行訪問)有了很大的提升。
下面我們看下它有啥特性:
特性
說明
公平性選擇
可重入
該鎖支持可重進入。
讀線程在獲取了讀鎖之后能夠再次獲取讀鎖。
寫線程在獲取了寫鎖之后能夠再次獲取寫鎖。
鎖降級
遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成讀鎖。
排他性
當寫線程訪問時,其他讀寫線程均被阻塞
另外讀寫鎖是采取一個整型變量來維護多種狀態。高16位表示讀,低16位表示寫。
// 偏移位
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 讀寫線程允許占用的最大數
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 獨占標志
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
下面從源碼中找出這些特性,先看下寫鎖的實現:
1 protected final boolean tryAcquire(int acquires) {
2
3 Thread current = Thread.currentThread();
4 int c = getState();
5 // 表示獨占個數,也就是與低16位進行與運算。
6 int w = exclusiveCount(c);
7 if (c != 0) {
8 // c!=0 且 w==0表示不存在寫線程,但存在讀線程
9 if (w == 0 || current != getExclusiveOwnerThread())
10 return false;
11 if (w + exclusiveCount(acquires) > MAX_COUNT)
12 throw new Error("Maximum lock count exceeded");
13 /**
14 * 獲取寫鎖的條件:
15 * 不能存在讀線程且當前線程是當前占用鎖的線程(這里體現可重入性和排他性);
16 * 當前占用鎖的次數不能超過最大數
17 */
18 setState(c + acquires);
19 return true;
20 }
21 if (writerShouldBlock() ||
22 !compareAndSetState(c, c + acquires))
23 return false;
24 setExclusiveOwnerThread(current);
25 return true;
26 }
27 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
獲取讀鎖源碼如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/**
* exclusiveCount(c) != 0: 表示有寫線程在占用
* getExclusiveOwnerThread() != current : 當前占用鎖的線程不是當前線程。
* 如果上面兩個條件同時滿足,則獲取失敗。
* 上面表明如果當前線程是擁有寫鎖的線程可以獲取讀鎖(體現可重入和鎖降級)。
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
Java 任務調度
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。