今天我們來聊一聊以數組為數據結構的阻塞隊列 ArrayBlockingQueue,它實現了 BlockingQueue 接口,繼承了抽象類 AbstractQueue。
BlockingQueue 提供了三個元素入隊的方法。
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
三個元素出隊的方法。
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
boolean remove(Object o);
一起來看看,ArrayBlockingQueue 是如何實現的吧。
初識
首先看一下 ArrayBlockingQueue 的主要屬性和構造函數。
屬性
//存放元素
final Object[] items;
//取元素的索引
int takeIndex;
//存元素的索引
int putIndex;
//元素的數量
int count;
//控制并發的鎖
final ReentrantLock lock;
//非空條件信號量
private final Condition notEmpty;
//非滿條件信號量
private final Condition notFull;
transient Itrs itrs = null;
從以上屬性可以看出:
- 以數組的方式存放元素。
- 用 putIndex 和 takeIndex 控制元素入隊和出隊的索引。
- 用重入鎖控制并發、保證線程的安全。
構造函數
ArrayBlockingQueue 有三個構造函數,其中 public ArrayBlockingQueue(int capacity, boolean fair, Collection c)
構造函數并不常用,暫且不提。看其中兩個構造函數。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//構造數組
this.items = new Object[capacity];
//默認以非公平鎖初始化 ReentrantLock
lock = new ReentrantLock(fair);
//創建兩個條件信號量
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
可以看出 ArrayBlockingQueue 必須再創建時傳入數組的大小。
元素入隊
ArrayBlockingQueue 有 add()、offer()、put()、offer(E e, long timeout, TimeUnit unit) 方法用來元素的入隊。
add
//ArrayBlockingQueue.add()
public boolean add(E e) {
//調用父類的 AbstractQueue.add() 方法
return super.add(e);
}
//AbstractQueue.add()
public boolean add(E e) {
//調用 ArrayBlockingQueue.offer(),成功則返回 true,否則拋出異常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//ArrayBlockingQueue.offer()
public boolean offer(E e) {
//非空檢查
checkNotNull(e);
//加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
//數組滿了,返回 false
if (count == items.length)
return false;
else {
//添加元素
enqueue(e);
return true;
}
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.enqueue()
private void enqueue(E x) {
final Object[] items = this.items;
//直接放到 putIndex 的位置
items[putIndex] = x;
//如果索引滿了,putIndex 就從 0 開始,為什么呢?
if (++putIndex == items.length)
putIndex = 0;
//數量加一
count++;
//數組里面有數據了,對 notEmpty 條件隊列進行通知
notEmpty.signal();
}
上面留下了一個坑,索引等于數組的長度的時候,索引就從 0 開始了。其實很簡單,這個數組是不是先入先出的,0 索引的數組先入隊,也是先出隊的。這時候 0 索引的位置就空了,所以 putIndex 到達數組長度的時候就可以從 0 開始。這里可以看出,ArrayBlockingQueue 是絕對不可以修改數組長度的,一旦初始化后長度就不能再改變了。
put
//ArrayBlockingQueue.put()
public void put(E e) throws InterruptedException {
//非空檢查
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數組滿了,線程加入 notFull 隊列中等待被喚醒
while (count == items.length)
notFull.await();
//添加元素
enqueue(e);
} finally {
//解鎖
lock.unlock();
}
}
offer
ArrayBlockingQueue 中有兩個 offer() 方法,offer(E e) 和 offer(E e, long timeout, TimeUnit unit),add() 方法調用的就是 offer(E e) 方法。
//ArrayBlockingQueue.offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//非空檢查
checkNotNull(e);
//將時間轉換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//當數組滿了
while (count == items.length) {
//時間到了,元素還沒有入隊,則返回 false
if (nanos <= 0)
return false;
//線程加入 notFull 隊列中,等待被喚醒,到達 nanos 時間返回剩余的 nanos 時間
nanos = notFull.awaitNanos(nanos);
}
//元素入隊
enqueue(e);
return true;
} finally {
//解鎖
lock.unlock();
}
}
以上就是所有的元素入隊的方法,可以得出一些結論:
- add() 元素滿了,就拋出異常。
- offer() 元素滿了,返回 false。
- put() 元素滿了,線程阻塞等待被入隊。
- offer(E e, long timeout, TimeUnit unit) 加入超時時間,如果時間到了元素還是沒有被入隊,則返回 false
移除元素
ArrayBlockingQueue 提供了 poll()、take()、poll(long timeout, TimeUnit unit)、remove() 方法用于元素的出隊。
poll
ArrayBlockingQueue 中有兩個 poll() 方法,poll() 和 poll(long timeout, TimeUnit unit)。
//ArrayBlockingQueue.poll()
public E poll() {
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
//沒有元素返回 null,否則元素出隊
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//ArrayBlockingQueue.dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取 takeIndex 上的元素
E x = (E) items[takeIndex];
//設置 takeIndex 索引上的元素為 null
items[takeIndex] = null;
//當 takeIndex 長度是數組長度,takeIndex 索引從 0 開始
if (++takeIndex == items.length)
takeIndex = 0;
//元素數量 -1
count--;
if (itrs != null)
//更新迭代器
itrs.elementDequeued();
//喚醒 notFull 的等待隊列,其中等待的第一個線程可以添加元素了
notFull.signal();
return x;
}
//ArrayBlockingQueue.poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
////將時間轉換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數組為空,超時還沒有元素出隊,則返回 null
while (count == 0) {
if (nanos <= 0)
return null;
//線程加入 notEmpty 中,等待被喚醒,到達 nanos 時間返回剩余的 nanos 時間
nanos = notEmpty.awaitNanos(nanos);
}
//元素出隊
return dequeue();
} finally {
lock.unlock();
}
}
take
//ArrayBlockingQueue.take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//無元素
while (count == 0)
//將線程加入 notEmpty 的等待隊列中,等待被入隊的元素喚醒
notEmpty.await();
//元素出隊
return dequeue();
} finally {
//解鎖
lock.unlock();
}
}
remove
//ArrayBlockingQueue.remove()
public boolean remove(Object o) {
//非空檢查
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
if (count > 0) {
//入隊元素的索引
final int putIndex = this.putIndex;
//出隊元素的索引
int i = takeIndex;
do {
//找到元素
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//i 等于數組長度的時候,從 0 開始
if (++i == items.length)
i = 0;
// i == putIndex 說明已經遍歷了一遍
} while (i != putIndex);
}
return false;
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.removeAt()
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//需要出隊的 removeIndex 正好是 takeIndex
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//更新迭代器
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// 循環移動元素,將 next 元素向前移動 1 個
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
//設置 i 索引的位置為空,putIndex 索引為 i
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 喚醒 notFull 隊列中等待的線程,通知可以元素入隊了
notFull.signal();
}
以上就是所有的元素出隊的方法,可以得出一些結論:
- poll() 元素出隊為空,則返回空
- take() 元素出隊為空的時候,會阻塞線程
- remove() 元素出隊的時候可能會移動數組
- poll(long timeout, TimeUnit unit) 加入超時時間,如果時間到了還是沒有元素需要出隊,則返回 null
總結
ArrayBlockingQueue 可以被用在生產者和消費者模型中。
- ArrayBlockingQueue,不能被擴容,初始化被指定容量。
- 利用 putIndex 和 takeIndex 循環利用數組。
- 利用了 ReentrantLock 和 兩個 Condition 保證了線程的安全。
-
接口
+關注
關注
33文章
8961瀏覽量
153264 -
函數
+關注
關注
3文章
4372瀏覽量
64288 -
數據結構
+關注
關注
3文章
573瀏覽量
40623 -
數組
+關注
關注
1文章
419瀏覽量
26393
發布評論請先 登錄
SystemVerilog中的類構造函數new
一個基于多屬性協商的效用函數研究
基于生成函數的格雷對分析與構造
基于plateaued函數的平衡布爾函數構造
2.10 學生類-構造函數 (15分)

評論