1. 加鎖park與解鎖的 unpark
- 以下三種情況會(huì)導(dǎo)致park返回亲雪。
If the permit is available then it is consumed and the call returns
immediately; otherwise
the current thread becomes disabled for thread scheduling
purposes and lies dormant until one of three things happens:
- Some other thread invokes {@link #unpark unpark} with the
current thread as the target; or- Some other thread {@linkplain Thread#interrupt interrupts}
the current thread; or- The call spuriously (that is, for no reason) returns.
park 和 unpark 方法實(shí)際上是調(diào)用了 Unsafe 類(lèi)里的函數(shù).
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
U.putObject(t, PARKBLOCKER, arg);
}
@HotSpotIntrinsicCandidate
public native void putObject(Object o, long offset, Object x);
@HotSpotIntrinsicCandidate
public native void park(boolean isAbsolute, long time);
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
@HotSpotIntrinsicCandidate
public native void unpark(Object thread);
2.hotspot源碼中的Parker
//在hotspot源碼的Thread.hpp中
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
Parker 類(lèi)繼承自 PlatformParker茴迁,PlatformParker 的實(shí)現(xiàn)就是系統(tǒng)相關(guān)了崎脉,linux 的實(shí)現(xiàn)是利用了 POSIX 的 mutex 和 condition噪矛。
// 源碼位置hotspot/src/share/vm/rumtime/Park.hpp
class Parker : public os::PlatformParker {
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
};
//源碼位置hotspot/src/os/posix/vm/Os_posix.hpp
// JSR166 support
// PlatformParker provides the platform dependent base class for the
// Parker class. It basically provides the internal data structures:
// - mutex and convars
// which are then used directly by the Parker methods defined in the OS
// specific implementation files.
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex[1];
pthread_cond_t _cond[2]; // one for relative times and one for absolute
};
Parker::park 方法中,主要是調(diào)用了 pthread_cond_wait 方法和 pthread_cond_timedwait趴梢。
void Parker::park(bool isAbsolute, jlong time) {
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's
// an interrupt pending.
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
struct timespec absTime;
if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
return;
}
if (time > 0) {
to_abstime(&absTime, time, isAbsolute);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);
// Don't wait if cannot get lock since interference arises from
// unparking. Also re-check interrupt before trying wait.
if (Thread::is_interrupted(thread, false) ||
pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
assert_status(status == 0, status, "cond_timedwait");
}
else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
assert_status(status == 0 || status == ETIMEDOUT,
status, "cond_timedwait");
}
_cur_index = -1;
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
Parker::unpark 設(shè)置 _counter 為1噩峦,再 unlock mutex,如果 _counter 之前小于 1笼痹,則調(diào)用 pthread_cond_signal 喚醒等待的線(xiàn)程配喳。
void Parker::unpark() {
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "invariant");
const int s = _counter;
_counter = 1;
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "invariant");
// Note that we signal() *after* dropping the lock for "immortal" Events.
// This is safe and avoids a common class of futile wakeups. In rare
// circumstances this can cause a thread to return prematurely from
// cond_{timed}wait() but the spurious wakeup is benign and the victim
// will simply re-test the condition and re-park itself.
// This provides particular benefit if the underlying platform does not
// provide wait morphing.
if (s < 1 && index != -1) {
// thread is definitely parked
status = pthread_cond_signal(&_cond[index]);
assert_status(status == 0, status, "invariant");
}
}
#endif
- _count相關(guān)
HotSpot Parker用condition和mutex維護(hù)了一個(gè)_counter變量,park時(shí)凳干,變量_counter置為0晴裹,unpark時(shí),變量_counter置為1救赐。
在 Parker::park 中有這么一段代碼涧团,如果 _counter 大于 0,則立即返回。在上面 unpark 的代碼中泌绣,我們看到 unpark 將 _counter 設(shè)置為 1喳瓣,也就是說(shuō):兩個(gè)線(xiàn)程之間的 park 和 unpark 不存在時(shí)序關(guān)系,可以先 unpark 再 park赞别,不會(huì)造成死鎖畏陕。這相對(duì)于存在依賴(lài)關(guān)系的 wait/notify 機(jī)制是一個(gè)巨大的優(yōu)點(diǎn)。
void Parker::park(bool isAbsolute, jlong time) {
// 這中間省略了很多代碼
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
// 這后面省略了更多代碼
3.應(yīng)用
3.1AQS: AbstractQueuedSynchronizer
AbstractQueuedSynchronizer 中獲取鎖的代碼如下仿滔,這里使用 for 循環(huán)惠毁,而不是在 park 返回后就立即返回,也是為了排除中斷崎页、虛假喚醒等并非因資源可用而喚醒的情況鞠绰。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
3.2FutureTask
在 FutureTask 中,等待操作完成的 awaitDone 大致分為以下步驟:
- 先檢查是否存在中斷飒焦,是則拋異常 InterruptedException蜈膨;
- 是否已經(jīng)完成,是則返回牺荠;
- 進(jìn)入等待隊(duì)列中翁巍;
- 當(dāng)設(shè)置了超時(shí)時(shí)間 nanos 時(shí),調(diào)用 LockSupport.parkNanos 方法等待休雌;
- 沒(méi)有設(shè)置超時(shí)時(shí)間時(shí)灶壶,調(diào)用 LockSupport.park 方法等待。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}