曹工谈并发:Synchronized升级为重量级锁后,靠什么 API 来阻塞自己
背景
因为想知道java中的关键字,对应的操作系统级别的api是啥,本来打算整理几个我知道的出来,但是,尴尬的是,我发现java里最重要的synchronized关键字,我就不知道它对应的api是什么。
redis中如何获取锁
在redis源码里,线程如果要进入一个同步区(只能单线程进入的代码块),会先获取一个互斥量,如果获取到了,则可以执行;否则,会阻塞在在这个互斥量上。
互斥量类型定义:
// 定义互斥量
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
类型为 pthread_mutex_t。
互斥量初始化:
使用互斥量前,要先初始化后,才能使用:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_init 这个函数是操作系统提供出来的api,不过应该是类unix系统才有这个。
shell中执行man pthread_mutex_init,可以看到:
The pthread_mutex_init() function shall initialize the mutex referenced by mutex with attributes specified by attr. If attr is NULL, the default mutex
attributes are used; the effect shall be the same as passing the address of a default mutex attributes object. Upon successful initialization, the state of
the mutex becomes initialized and unlocked.
pthread_mutex_init 初始化参数mutex指定的互斥量,,使用attr中指定的属性。如果attr为空,使用默认参数。
成功初始化后,互斥量的状态变为已初始化、未锁定。
如何锁定、解锁互斥量
// 1
pthread_mutex_lock(&bio_mutex[type]);
// 2
pthread_mutex_unlock(&bio_mutex[type]);
- 1处,加锁
- 2处,解锁
我们可以看下linux下执行man pthread_mutex_lock后,看到的帮助:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
The mutex object referenced by mutex shall be locked by calling pthread_mutex_lock(). If the mutex is already locked, the calling thread shall block until the mutex becomes available. This operation shall return with the mutex object referenced by mutex in the locked state with the calling thread as its owner.
可以重点看下上面那句注释:调用pthread_mutex_lock,会导致参数mutext引用的互斥量被锁定;如果该互斥量早已被锁定,则调用线程将被阻塞。
redis中线程使用互斥量的例子
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 1 锁定
pthread_mutex_lock(&bio_mutex[type]);
// 将新工作推入队列
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
// 2 解锁
pthread_mutex_unlock(&bio_mutex[type]);
}
如要了解更多互斥量,可以看看这篇文章,写的不错:
Linux C 编程——多线程和互斥锁mutex
jdk中synchronized,不考虑轻锁、偏向锁,最终有用到前面的互斥量吗
参考文章
现在不用考虑各种优化,只考虑最终synchronized已经升级为重量级锁之后的表现,会使用前面的互斥量吗?
由于作者本身也是半桶水,搞了半天也没把jdk的源码调试环境搞起来,只能看看代码了,顺便结合网络上的一些文章,不过结论应该可靠。
大家先可以参考这两篇文章:
JVM:锁实现(synchronized&JSR166)行为分析和相关源码
简易流程梳理
我这里也简单列举一下整个过程,就从下面这里开始:
// Fast Monitor Enter/Exit
// This the fast monitor enter. The interpreter and compiler use
// some assembly copies of this code. Make sure update those code
// if the following function is changed.
// The implementation is extremely sensitive to race condition. Be careful.
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 1
slow_enter (obj, lock, THREAD) ;
}
1处,前面都是偏向锁相关的东西,先跳过,进入slow_enter。
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 1
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
// 2
lock->set_displaced_header(mark);
// 3
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
lock->set_displaced_header(markOopDesc::unused_mark());
// 2
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
-
1处,mark->is_neutral(),判断对象头,是否是无锁状态,neutral本来是中立的意思,这里表示无锁。
-
2处,如果无锁,则调用lock的方法,lock本身是当前线程在栈内持有的对象,调用lock的set_displaced_header方法,参数为待加锁对象(堆里)的对象头,意思是,把待加锁对象的对象头,设置到线程的栈内变量里。
lock变量的class 类型如下:
class BasicLock VALUE_OBJ_CLASS_SPEC { friend class VMStructs; private: // 1 volatile markOop _displaced_header; public: markOop displaced_header() const { return _displaced_header; } // 2 void set_displaced_header(markOop header) { _displaced_header = header; } void print_on(outputStream* st) const; // move a basic lock (used during deoptimization void move_to(oop obj, BasicLock* dest); static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); } };
结合这里的1、2处代码,上面那句,意思就是,把待加锁对象的对象头,存储到lock变量 _displaced_header属性。
-
3处,这里比较复杂。
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)
这一句里面,cmpxchg_ptr,定义为:
inline static intptr_t cmpxchg_ptr(intptr_t exchange_value, volatile intptr_t* dest, intptr_t compare_value);
这一句就是平时我们说的那种cas操作,表示,如果第二个参数,dest指向的值,和第三个参数,compare_value的值相等,则把第二个参数中的值,设为参数1的值。
重点来看,第二个参数,是什么鬼意思?
Handle obj,说明obj是Handle类型,
class Handle VALUE_OBJ_CLASS_SPEC { private: oop* _handle; protected: // 2 oop obj() const { return *_handle; } public: //1 oop operator () () const { return obj(); }
那么,obj()的意思,应该就是,代码1处,应该是进行了操作符重载,所以会调用obj()方法,obj方法,请看2处,会返回 属性_handle,当然,这里对属性进行了解引用。
所以,基本的意思就是,返回_handle这个属性,执行的oop对象。
然后,再说说参数3,参数3就是mark。
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)
这个mark,是在代码开头这样被赋值的。
markOop mark = obj->mark();
那,我们看看obj的mark方法就行。(不知道为啥,在Handle类里,没找到这个方法,不知道为啥,难道是有什么特殊语法吗。。。),不过这个mark的意思,肯定就是对象里的对象头无误。
然后,第1个参数呢,就是lock,就是那个,如果上面的第二、三个参数相等,就将本参数,即,本线程,栈内对象lock的地址,设置到对象头中,表示,该对象已经被本线程加锁了。
-
4处,这里表示如果是当前线程重复进入:
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; }
-
5处,开始膨胀为重量级锁,并进入重量级锁的争夺
// --接前面的代码 lock->set_displaced_header(markOopDesc::unused_mark()); ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
这里,会先通过调用ObjectSynchronizer::inflate(THREAD, obj()),来完成轻量级锁到重量级锁的升级。
// Inflate light weight monitor to heavy weight monitor static ObjectMonitor* inflate(Thread * Self, oop obj);
这个注释就很清晰,升级轻锁为重锁,并且,会返回对象的monitor,即对应的重锁对象。
膨胀的过程,太复杂,看不懂(心累。。),有兴趣的可以看看这篇。
//github.com/farmerjohngit/myblog/issues/15
膨胀后,返回了对应的monitor,然后进入其enter方法。
然后enter也是茫茫多的代码,根据网上博客,即:
JVM:锁实现(synchronized&JSR166)行为分析和相关源码
会进入以下方法:
ObjectMonitor::EnterI
这个里面,也是茫茫多的代码,而且更可怕的是,注释也多得很,快比代码多了。。
void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; // 1 Try the lock - TATAS if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } DeferredInitialize () ; //2 We try one round of spinning *before* enqueueing Self. if (TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } //3 The Spin failed -- Enqueue and park the thread ... //4 Enqueue "Self" on ObjectMonitor's _cxq ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; // 5 Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } } TEVENT (Inflated enter - Contention) ; int nWakeups = 0 ; int RecheckInterval = 1 ; for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } //6 park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; // Increase the RecheckInterval, but clamp the value. RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; // 7 Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; ... } ... return ; }
- 上面的方法不是完整的,因为太长,删减了,只留了我们要关注的那部分,1处,尝试获取锁
- 2处,尝试自旋一下
- 3处,自旋也失败了,准备进入队列,并阻塞自己,类似于aqs的实现
- 4、5处都是入队的相关操作
- 6处,阻塞自己,判断是阻塞一阵时间,还是一直阻塞。
- 7处,阻塞自己。
我们这里,重点看6处,阻塞自己,采用的方法为:
// self的定义,类型为线程 Thread * Self = THREAD ; ... Self->_ParkEvent->park() ;
我们看看这个类:
thread.hpp public: volatile intptr_t _Stalled ; volatile int _TypeTag ; // 1 ParkEvent * _ParkEvent ; // for synchronized() // 2 ParkEvent * _SleepEvent ; // for Thread.sleep // 3 ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor // 4 ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease
有点意思,竟然有好几个ParkEvent类型的属性,第一个,看注释,就是用来,synchronized使用的;
第二个是Thread.sleep使用的,第三个是jdk自身的native方法用的
ParkEvent是什么
JVM:锁实现(synchronized&JSR166)行为分析和相关源码
大家可以再看下这篇,因为感觉写得不错。
这个类本身的属性,看得一知半解,但是它的父类,是这个。
class ParkEvent : public os::PlatformEvent
这个PlatformEvent有意思的很,它是平台相关的。
可以看到,它有5个同名的类,分别在5个文件,分别是什么os_windows.hpp、os_linux.hpp、os_solaris.hpp,盲猜也知道,是不同操作系统下的实现。
我们看看linux下,
class PlatformEvent : public CHeapObj<mtInternal> {
private:
double CachePad [4] ; // increase odds that _mutex is sole occupant of cache line
volatile int _Event ;
volatile int _nParked ;
// 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
double PostPad [2] ;
Thread * _Assoc ;
public: // TODO-FIXME: make dtor private
~PlatformEvent() { guarantee (0, "invariant") ; }
public:
PlatformEvent() {
int status;
status = pthread_cond_init (_cond, os::Linux::condAttr());
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_Event = 0 ;
_nParked = 0 ;
_Assoc = NULL ;
}
// Use caution with reset() and fired() -- they may require MEMBARs
void reset() { _Event = 0 ; }
int fired() { return _Event; }
void park () ;
void unpark () ;
int TryPark () ;
int park (jlong millis) ; // relative timed-wait only
void SetAssociation (Thread * a) { _Assoc = a ; }
} ;
看到1处了吗,原来,阻塞自己还是用了pthread_mutex_t啊。
看看park怎么实现的:
void os::PlatformEvent::park() { // AKA "down()"
// Invariant: Only the thread associated with the Event/PlatformEvent
// may call park().
// TODO: assert that _Assoc != NULL or _Assoc == Self
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
//1 Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
_Event = 0 ;
// 2
status = pthread_mutex_unlock(_mutex);
...
}
guarantee (_Event >= 0, "invariant") ;
}
1处,加锁;
2处,解锁。
所以,我们本文的答案找到了。
看看其他平台下呢?
其他平台就不一一截图了,除了windows,都是用的pthread_mutex_lock。
总结
为了这个答案,花了一天时间,值得吗,有点不值得,时间花太长了,不过也值得,至少问题解决了。
不过,没把调试环境搭起来太惨了,各种头文件找不到,跳转都点不动,基本上都是全文搜索。。。
谢谢大家。