本文共 10635 字,大约阅读时间需要 35 分钟。
锁是用来控制多个线程访问共享资源的方式。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能,虽然Lock在使用时多了显式地获取和释放,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。
//Lock是一个接口,接口,接口!重要的事情说三遍。public interface Lock { //获取锁 void lock(); //可中断地获取锁 void lockInterruptibly() throws InterruptedException; //尝试非阻塞获取锁,立刻返回 boolean tryLock(); //超时获取锁 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //释放锁 void unlock(); //获取等待通知组件 Condition newCondition();}
AQS(AbstractQueuedSynchronizer)队列同步器,是用来构建锁或者其他同步组件的基础框架。一般通过继承同步器并实现它的抽象方法的方式来管理同步状态。同步器提供了3个方法(getState(),setState(int newState)和compareAndSetState(int expect, int update))来更改同步状态。
同步器是实现锁的关键,锁在实现过程中聚合同步器,利用同步器实现锁的语义。同步器提供了如下可重写的方法:
protected boolean tryAcquire(int var1) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int var1) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int var1) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int var1) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
同步器依赖内部的同步队列来完成同步状态的管理,队列节点数据结构如下:
static final class Node { static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node(); static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile AbstractQueuedSynchronizer.Node prev; volatile AbstractQueuedSynchronizer.Node next; volatile Thread thread; AbstractQueuedSynchronizer.Node nextWaiter; final boolean isShared() { return this.nextWaiter == SHARED; } final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException { AbstractQueuedSynchronizer.Node var1 = this.prev; if (var1 == null) { throw new NullPointerException(); } else { return var1; } } Node() { } Node(Thread var1, AbstractQueuedSynchronizer.Node var2) { this.nextWaiter = var2; this.thread = var1; } Node(Thread var1, int var2) { this.waitStatus = var2; this.thread = var1; } }
从上述代码可知,同步队列是一个FIFO双向队列。
public final void acquire(int var1) { if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) { selfInterrupt(); } }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
当线程调用acquire(int)方法时,具体流程如下:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
具体流程如下:
ReentrantLock,重入锁,能够支持一个线程对资源的重复加锁。该锁支持获取锁时的公平性和非公平性选择。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
公平锁和非公平锁的唯一区别是hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断。
公平锁保证了锁的获取按照FIFO原则,而代价是进行了大量的线程切换;非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了更大的吞吐量。
ReentrantReadWriteLock,可重入读写锁,实现了ReadWriteLock接口,通过readLock()方法和writeLock()方法获取读锁和写锁。
读写锁需要在同步状态上维护多个度线程和一个写线程的状态。读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。假设当前读写状态为S,写状态等于S&0x0000FFFFF,读状态等于S>>>16。写状态增加1时,等于S+1,读状态增加1时,等于S+(1<<16)。
写锁是一个支持重入的排它锁。如果当前线程已经获取了写锁,则增加写状态;如果读锁已经被获取或者获取写锁的线程不是当前线程,则当前线程进入等待状态。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
读锁是一个支持重入的共享锁,在没有其他写线程访问时,读锁总会被成功获取,如果当前线程已经获取读锁,则增加读状态。而当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
锁降级是指把持住当前拥有的写锁,再获取到读锁,随后释放写锁的过程。重点在于先获取读锁,在获取读锁的过程中保持本身的写锁,保证了数据的可见性。
Condition接口提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。
public interface Condition { //相当于Object的wait()方法 void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; //相当于Object的wait(long timeout)方法 boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; //相当于Object的notify()方法 void signal(); //相当于Object的notifyAll()方法 void signalAll();}
一般而言,Lock对象能将synchronized关键字替换掉,是synchronized关键字的进阶。掌握Lock有助于学习并发包中源代码的实现原理,才能在实际开发中运用自如。