这里先不对大名鼎鼎的AQS进行介绍,在ReentrantLock内,AQS中state代表着锁的数量,初始值为0,如果有线程获得锁会变为1,由于ReentrantLock是可重入锁,获得锁的线程还是可以继续获得锁,相应的state会在1的基础上继续做++操作,由于对state的操作都是原子的,对它的修改都是通过compareAndSetState(int expect, int update)实现的(CAS)
/** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstractvoidlock();
/** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ finalbooleannonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
protectedfinalbooleantryRelease(int releases){ int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) thrownew IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
//...省略 }
其中nonfairTryAcquire是专门为非公平锁获取锁实现。
这里不明白为什么Doug Lea大神不把实现放到NonfairSync中去~
接下来从代码层面来走一遍lock以及unlock的流程
1 2 3 4 5 6 7 8 9 10 11 12
classX{ privatefinal ReentrantLock lock = new ReentrantLock(); // ... publicvoidm(){ lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
1 2 3 4 5 6 7
publicReentrantLock(){ sync = new NonfairSync(); }
publicReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); //与NonfairSync不同在于,它会判断队列中是否还有排队线程, //如果没有才会设置自己为线程拥有者 if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } //重入 elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
1 2 3 4 5 6 7 8 9 10 11
//h != t queue非空 publicfinalbooleanhasQueuedPredecessors(){ // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
如果tryAcquire成功,代表加锁成功 end
addWaiter
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//放入队列尾部,模式是独占锁模式 private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private Node enq(final Node node){ for (;;) { Node t = tail; if (t == null) { // 初始化节点,初始化完成后再设置head节点的next节点为等待节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ int ws = pred.waitStatus; //pre节点告诉后节点,你需要等待一个Signal,所以你先Park吧~ if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; //pre节点已经Cancelled,那么当前节点要对前边为CANCELLED的进行剔除, //通过do while 循环找到一个waitStatus>0的节点并设置为自己的pre节点 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果pre节点状态为0或者PROPAGATE,将pre节点设置为SIGNAL方便下次进来时执行Park /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
//这个方法目的在于如果自己是被interrupted, //那么将自己标记为CANCELLED,并判断是否需要唤醒后继线程 privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //将waitStatus设置为0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //筛选后继结点waitStatus<=0的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }