AQS提供了兩種鎖,獨(dú)占鎖和共享鎖。獨(dú)占鎖只有一把鎖,同一時(shí)間只允許一個(gè)線程獲得鎖;而共享鎖則有多把鎖,同一時(shí)間允許多個(gè)線程獲得鎖。我們本文主要講獨(dú)占鎖。
一. 獨(dú)占鎖的獲取
AQS中對(duì)獨(dú)占鎖的獲取一共有三個(gè)方法:
- acquire:不響應(yīng)中斷獲取獨(dú)占鎖
- acquireInterruptibly:響應(yīng)中斷獲取獨(dú)占鎖
- tryAcquireNanos:響應(yīng)中斷+超時(shí)獲取獨(dú)占鎖
由于篇幅,我們主要著眼于acquire方法,當(dāng)然,只要你理解了acquire,acquireInterruptibly和tryAcquireNanos自然不在話下了,因?yàn)檫@兩個(gè)方法只是在acquire的基礎(chǔ)上增加了一些判斷邏輯來(lái)處理中斷和超時(shí)情況而已。
我們上源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其acquire方法中一共有四個(gè)方法,其邏輯也分為4步:
- tryAcquire :嘗試獲取鎖,成功即acquire方法結(jié)束,否則調(diào)用addWaiter
- addWaiter :獲取鎖失敗即調(diào)用此方法入隊(duì),即將獲取鎖失敗的線程包裝成Node放入同步隊(duì)列的隊(duì)尾
- acquireQueued :入隊(duì)成功后即調(diào)用此方法,如果Node在隊(duì)首則再次搶鎖,否則掛起等待喚醒(喚醒后再去獲取鎖)
- selfInterrupt :如果是被中斷喚醒,則再次執(zhí)行中斷
粗略介紹完后,我們現(xiàn)在一個(gè)一個(gè)方法看。
1.1 tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire是鉤子方法,是我們根據(jù)需要重寫的。其功能就是在獨(dú)占模式下去獲取鎖,獲取成功則返回true,acquire方法直接結(jié)束;如果獲取失敗返回false,則后續(xù)會(huì)調(diào)用后面要講的addWaiter方法將線程入隊(duì)。
因?yàn)锳QS是模板類,不同的子類只需要重寫不同的鉤子方法,因此,tryAcquire不能設(shè)置成抽象方法,不然一些不需要此鉤子方法的子類也要實(shí)現(xiàn)這個(gè)方法。所以作者對(duì)tryAcquire的默認(rèn)實(shí)現(xiàn)是拋了一個(gè)異常(當(dāng)然我認(rèn)為直接寫個(gè)return也是ok的)。
1.2 addWaiter
如果tryAcquire獲取鎖失敗后,我們就會(huì)調(diào)用addWaiter將線程包裝成Node入隊(duì)掛起。addWaiter的大致邏輯是:先將線程包裝成Node,然后入隊(duì),如果隊(duì)列未初始化或者入隊(duì)失敗,則會(huì)調(diào)用子方法enq,enq來(lái)進(jìn)行初始化隊(duì)列和自旋入隊(duì),我們看下具體代碼:
private Node addWaiter(Node mode) {
// 將此線程包裝成Node
Node node = new Node(Thread.currentThread(), mode);
// 將pred指向尾結(jié)點(diǎn)
Node pred = tail;
// 如果pred 即尾結(jié)點(diǎn)不為null,說(shuō)明同步隊(duì)列初始化完成了。
if (pred != null) {
// 尾插法
// 步驟一:將node的前驅(qū)指針指向當(dāng)前尾結(jié)點(diǎn)
node.prev = pred;
// 步驟二:通過(guò)CAS將尾結(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 走到這一步有兩個(gè)原因
// 1是隊(duì)列未初始化,2是尾結(jié)點(diǎn)插入失敗
enq(node);
return node;
}
下面是enq方法,當(dāng)執(zhí)行到這個(gè)方法時(shí),說(shuō)明線程獲取鎖已經(jīng)失敗了,然后入隊(duì)過(guò)程又失敗了,入隊(duì)過(guò)程失敗有兩個(gè)原因:
- 同步隊(duì)列未初始化
- 入隊(duì)過(guò)程中CAS操作失敗
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 隊(duì)列為空, 初始化隊(duì)列操作,即將head和tail指向一個(gè)空節(jié)點(diǎn)
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else { // 隊(duì)列不為空
// 并發(fā)下,cas操作可能會(huì)失敗,所以通過(guò)for循環(huán)不斷進(jìn)行入隊(duì),直到成功為止
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
CAS節(jié)點(diǎn)入隊(duì)失敗的原因,我們看到enq源碼中執(zhí)行完尾插法的步驟一,即將Node的前驅(qū)指針指向當(dāng)前尾結(jié)點(diǎn),如果是并發(fā)情況下,應(yīng)該是如下圖所示(紫色節(jié)點(diǎn)代表我們關(guān)注的Node):
此時(shí),可能有多個(gè)Node都準(zhǔn)備入隊(duì),所以此時(shí)可能有多個(gè)Node的前驅(qū)節(jié)點(diǎn)都指向尾結(jié)點(diǎn),所以我們?cè)趫?zhí)行步驟二將尾結(jié)點(diǎn)指向Node時(shí),采用的是CAS,即只有一個(gè)Node能成功,假設(shè)我們關(guān)注的Node入隊(duì)成功了,如下圖:
則另外兩個(gè)CAS操作肯定會(huì)失敗,即它們將要進(jìn)入enq方法重新自旋入隊(duì)。
1.3 acquireQueued
執(zhí)行完addWaiter方法后,說(shuō)明我們已經(jīng)入隊(duì)成功了,此時(shí)我們需要將Node中的線程掛起,等待下次被喚醒。
但在掛起之前,我們需要再次檢查下我們此時(shí)的Node是否是在隊(duì)首,如果在隊(duì)首,我們又會(huì)再次去搶鎖。否則我們會(huì)通過(guò)shouldParkAfterFailedAcquire判斷是否要掛起(shouldParkAfterFailedAcquire不僅僅是判斷此線程是否可以被掛起,還會(huì)將同步隊(duì)列中屬性為CANCELLED的Node移除隊(duì)列),如果需要掛起,則調(diào)用parkAndCheckInterrupt將線程掛起。具體源碼如下:
final boolean acquireQueued(final Node node, int arg) {
// 獲取失敗標(biāo)簽,默認(rèn)ture,如果獲取到鎖了后則會(huì)置為false
boolean failed = true;
try {
// 中斷標(biāo)簽,默認(rèn)false
boolean interrupted = false;
for (;;) {
// 獲取此節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),則會(huì)再次調(diào)用tryAcquire搶鎖
// 如果搶鎖成功了,則進(jìn)入if語(yǔ)句,然后return
if (p == head && tryAcquire(arg)) {
// 將此節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
// 獲取失敗標(biāo)志置為false,因?yàn)槟玫芥i了
failed = false;
// 返回中斷標(biāo)志
return interrupted;
}
// shouldParkAfterFailedAcquire判斷是否要掛起
// 如果要掛起,則調(diào)用parkAndCheckInterrupt將線程掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire源碼如下。其主要作用有2:
- 決定獲取鎖失敗后,是否將線程掛起
- 清除同步隊(duì)列中所有狀態(tài)為CANCELLED的節(jié)點(diǎn)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果此節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為SIGNAL,則說(shuō)明此節(jié)點(diǎn)需要掛起,返回true
if (ws == Node.SIGNAL)
return true;
// 如果此節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)狀態(tài)大于0,即狀態(tài)為CANCELLED則移除前驅(qū)節(jié)點(diǎn),然后再往前遍歷,直到清除完所有CANCELLED的節(jié)點(diǎn)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅(qū)節(jié)點(diǎn)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這是acquireQueued中的最后一步,即將線程掛起,然后靜靜的等待被喚醒。除非該線程被其他線程unpark或者被中斷,否則該線程的程序?qū)⒁恢蓖V乖谶@。
private final boolean parkAndCheckInterrupt() {
// 通過(guò)LockSupport掛起線程
LockSupport.park(this);
// 返回線程的標(biāo)志位,true表示此線程被中斷過(guò)
return Thread.interrupted();
}
1.4 selfInterrupt
通過(guò)我們前面的分析可以知道,當(dāng)線程被中斷過(guò),則會(huì)進(jìn)入到此方法。
而interrupte這個(gè)方法也只是將當(dāng)前線程的中斷標(biāo)志置為true,至于會(huì)不會(huì)被中斷,這個(gè)是由系統(tǒng)決定的。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
二. 獨(dú)占鎖的的釋放
相比獨(dú)占鎖的獲取,獨(dú)占鎖的釋放邏輯就簡(jiǎn)單多了。獨(dú)占鎖釋放只做了兩件事情:
- 釋放鎖
- 喚醒head結(jié)點(diǎn)后最近需要被喚醒的節(jié)點(diǎn)。
其釋放邏輯的實(shí)現(xiàn)是通過(guò)release方法,而做的兩件事分別對(duì)應(yīng)了其子方法tryRelease和unparkSuccessor:
public final boolean release(int arg) {
// 如果釋放鎖成功,則進(jìn)入if去喚醒同步隊(duì)列中的線程
if (tryRelease(arg)) {
Node h = head;
// head節(jié)點(diǎn)不為空(即同步隊(duì)列不為空) 且 狀態(tài)不為0(初始化隊(duì)列時(shí),head結(jié)點(diǎn)waitStatus為0,此時(shí)等待隊(duì)列中是沒(méi)有節(jié)點(diǎn)的)
// 則喚醒head結(jié)點(diǎn)后繼節(jié)點(diǎn)
if (h != null && h.waitStatus != 0)
// 喚醒離head最近需要被喚醒的節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
2.1 tryRelease
這個(gè)方法和tryAcquire一樣,也是鉤子方法,是留給子類重寫的,作用是用來(lái)釋放鎖,如果釋放成功則返回true,失敗返回false,這個(gè)具體的實(shí)現(xiàn)我們也放在后續(xù)AQS的子類中講解,這里就不過(guò)多闡述了。
2.2 unparkSuccessor
此方法的作用是喚醒后繼Node,我們看代碼:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// waitStatus< 0,說(shuō)明此時(shí)waitStatus為SIGNAL
if (ws < 0)
// 此時(shí)需要將waitStatus置為0,待會(huì)喚醒后繼節(jié)點(diǎn)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 此Node的后繼節(jié)點(diǎn)如果是null或者狀態(tài)為CANCELLED,則此Node已經(jīng)不存在或者取消
// 則我們需要從尾結(jié)點(diǎn)往前遍歷找到離head最近的需要被喚醒的Node
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒Node中的線程
LockSupport.unpark(s.thread);
}
這里需要注意的是,我們?cè)谡倚枰粏拘训墓?jié)點(diǎn)時(shí),為什么是從后往前遍歷呢?
其實(shí)這和獲取鎖時(shí)的尾結(jié)點(diǎn)入隊(duì)有關(guān),我們?cè)倏聪氯腙?duì)方法addWaiter中插入尾結(jié)點(diǎn)的相關(guān)代碼:
node.prev = pred; //step1
if (compareAndSetTail(pred, node)) // step2
pred.next = node; // step3
假設(shè)我們此時(shí)有個(gè)Node正在入隊(duì),執(zhí)行完step2,還未執(zhí)行step3,unparkSuccessor中如果采用從head往后遍歷,是找不到這個(gè)新插入的Node的;但如果是采用從后往前遍歷,則不會(huì)出現(xiàn)這個(gè)問(wèn)題。
三. 總結(jié)
對(duì)于獨(dú)占鎖的獲取與釋放,就分析完了,這里我再總結(jié)一下:
獲取獨(dú)占鎖是通過(guò)acquire來(lái)實(shí)現(xiàn)的,首先通過(guò)tryAcquire獲取鎖,如果獲取成功,則直接返回,如果失敗,則會(huì)調(diào)用addWaiter方法進(jìn)行入隊(duì),如果入隊(duì)過(guò)程中發(fā)現(xiàn)隊(duì)列未初始化,則會(huì)初始化隊(duì)列再進(jìn)行入隊(duì),入隊(duì)不成功則會(huì)一直自旋直到成功;入隊(duì)成功后就會(huì)掛起,直到被其他線程或者中斷喚醒;喚醒后會(huì)檢查線程的中斷標(biāo)志位,如果被中斷過(guò),會(huì)再次調(diào)用中斷方法,告訴系統(tǒng)自己需要被中斷。
釋放獨(dú)占鎖是通過(guò)release方法實(shí)現(xiàn)的,其首先通過(guò)tryRelease釋放鎖,如果失敗則直接返回false,如果成功則會(huì)調(diào)用unparkSuccessor喚醒后繼節(jié)點(diǎn)。
-
代碼
+關(guān)注
關(guān)注
30文章
4891瀏覽量
70371 -
線程
+關(guān)注
關(guān)注
0文章
507瀏覽量
20112
發(fā)布評(píng)論請(qǐng)先 登錄
AQS如何解決線程同步與通信問(wèn)題
AQS是什么

評(píng)論