Java ReentrantLock 源码学习
(1) 什么是ReentrantLock
ReentrantLock是Java里的一种可重入锁。 也可以用来达到互斥的效果。
根据创建时的传参,可以分为公平锁和非公平锁。
(2) 为什么要用ReentrantLock
ReentrantLock和synchroized异同点
(3) ReentrantLock怎么用
public class ReentrantLockDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 业务逻辑处理
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at cn.wkq.java.juc.reentrantlock.ReentrantLockDemo.main(ReentrantLockDemo.java:29)
(4) ReentrantLock源码解读
(4.1) ReentrantLock结构
ReentrantLock包含抽象内部类Sync、内部类 NonfairSync、FairSync
Sync 继承 AbstractQueuedSynchronizer
AbstractQueuedSynchronizer内部有同步状态state、等待队列头结点head、等待队列尾结点tail
(4.1) 加锁流程
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
ReentrantLock $ NonfairSync
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 使用CAS尝试将state状态变量设置为1
if (compareAndSetState(0, 1))
// 设置当前拥有独占访问权限的线程。
setExclusiveOwnerThread(Thread.currentThread());
else
// 以独占模式获取,忽略中断。
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
AbstractQueuedSynchronizer::acquire
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断当前线程
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
ReentrantLock $ Sync::nonfairTryAcquire
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
* 执行不公平的 tryLock。 tryAcquire 在子类中实现,但两者都需要对 trylock 方法进行非公平尝试。
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 通过CAS尝试将state同步状态变量从0设置为1 获取同步锁
if (compareAndSetState(0, acquires)) {
// 将独占锁的拥有者设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果拥有独占锁的的线程是当前线程的话,表示当前线程需要重复获取锁(重入锁)
else if (current == getExclusiveOwnerThread()) {
// 当前同步状态state变量值加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 写入state同步状态变量值,由于使用volatile修饰,单独的读写操作具有原子性
setState(nextc);
return true;
}
return false;
}
/**
* Creates and enqueues node for current thread and given mode.
* 为当前线程和给定模式创建和排队节点。
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//cas尝试设置尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 把当前节点放到尾结点
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 初始化头结点 并通过cas设置头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 把当前节点设置成尾结点 如果失败一直循环
if (compareAndSetTail(t, node)) {
t.next = node;
// 设置尾结点成功,返回
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 以独占不间断模式获取已在队列中的线程。 由条件等待方法以及获取使用。
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// 标志cancelAcquire()方法是否执行
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
/**
* 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
* 了解等待队列的数据结构和机制更容易看懂
*
* 如果当前节点的前驱结点已经是同步队列的头结点了,说明了两点内容:
* 1、其前驱结点已经获取到了同步锁了,并且锁还没释放
* 2、其前驱结点已经获取到了同步锁了,但是锁已经释放了
*
* 然后使用tryAcquire()方法去尝试获取同步锁,如果前驱结点已经释放了锁,那么就会获取成功,
* 否则同步锁获取失败,继续循环
*/
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
// 然后将当前节点的前驱结点的后继结点置为null,帮助进行垃圾回收 为什么要设置成null?
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
*
* shouldParkAfterFailedAcquire()是对当前节点的前驱结点的状态进行判断,
* 以及去针对各种状态做出相应处理;
* 如果前驱结点p的状态为SIGNAL的话,就返回true。
*
* parkAndCheckInterrupt()主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
*
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/**
* 如果for(;;)循环中出现异常,并且failed=false没有执行的话,cancelAcquire方法
* 就会将当前线程的状态置为 node.CANCELLED 已取消状态,并且将当前节点node移出
* 同步队列。
*/
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
(4.2) 释放锁流程
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里的判断条件为什么是h != null && h.waitStatus != 0?
h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}
参考资料
[1] AQS之ReentrantLock源码解析
[2] 从ReentrantLock的实现看AQS的原理及应用
[3] 不可不说的Java“锁”事