博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
重学多线程(三)—— 锁
阅读量:4223 次
发布时间:2019-05-26

本文共 10635 字,大约阅读时间需要 35 分钟。

前言

锁是用来控制多个线程访问共享资源的方式。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能,虽然Lock在使用时多了显式地获取和释放,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。

Lock的API接口

//Lock是一个接口,接口,接口!重要的事情说三遍。public interface Lock {
//获取锁 void lock(); //可中断地获取锁 void lockInterruptibly() throws InterruptedException; //尝试非阻塞获取锁,立刻返回 boolean tryLock(); //超时获取锁 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //释放锁 void unlock(); //获取等待通知组件 Condition newCondition();}

Lock的内部实现

AQS队列同步器介绍

AQS(AbstractQueuedSynchronizer)队列同步器,是用来构建锁或者其他同步组件的基础框架。一般通过继承同步器并实现它的抽象方法的方式来管理同步状态。同步器提供了3个方法(getState(),setState(int newState)和compareAndSetState(int expect, int update))来更改同步状态。

同步器是实现锁的关键,锁在实现过程中聚合同步器,利用同步器实现锁的语义。

同步器提供了如下可重写的方法:

protected boolean tryAcquire(int var1) {        throw new UnsupportedOperationException();    }    protected boolean tryRelease(int var1) {        throw new UnsupportedOperationException();    }    protected int tryAcquireShared(int var1) {        throw new UnsupportedOperationException();    }    protected boolean tryReleaseShared(int var1) {        throw new UnsupportedOperationException();    }    protected boolean isHeldExclusively() {        throw new UnsupportedOperationException();    }

AQS队列同步器实现

同步队列

同步器依赖内部的同步队列来完成同步状态的管理,队列节点数据结构如下:

static final class Node {        static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();        static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;        static final int CANCELLED = 1;        static final int SIGNAL = -1;        static final int CONDITION = -2;        static final int PROPAGATE = -3;        volatile int waitStatus;        volatile AbstractQueuedSynchronizer.Node prev;        volatile AbstractQueuedSynchronizer.Node next;        volatile Thread thread;        AbstractQueuedSynchronizer.Node nextWaiter;        final boolean isShared() {            return this.nextWaiter == SHARED;        }        final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {            AbstractQueuedSynchronizer.Node var1 = this.prev;            if (var1 == null) {                throw new NullPointerException();            } else {                return var1;            }        }        Node() {        }        Node(Thread var1, AbstractQueuedSynchronizer.Node var2) {            this.nextWaiter = var2;            this.thread = var1;        }        Node(Thread var1, int var2) {            this.waitStatus = var2;            this.thread = var1;        }    }

从上述代码可知,同步队列是一个FIFO双向队列。

独占式同步状态获取

public final void acquire(int var1) {        if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {            selfInterrupt();        }    }
private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        Node pred = tail;        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        enq(node);        return node;    }
private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }

当线程调用acquire(int)方法时,具体流程如下:

这里写图片描述

共享式同步状态获取

public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }
private void doAcquireShared(int arg) {        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

独占式超时获取同步状态

private boolean doAcquireNanos(int arg, long nanosTimeout)            throws InterruptedException {        if (nanosTimeout <= 0L)            return false;        final long deadline = System.nanoTime() + nanosTimeout;        final Node node = addWaiter(Node.EXCLUSIVE);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    return true;                }                nanosTimeout = deadline - System.nanoTime();                if (nanosTimeout <= 0L)                    return false;                if (shouldParkAfterFailedAcquire(p, node) &&                    nanosTimeout > spinForTimeoutThreshold)                    LockSupport.parkNanos(this, nanosTimeout);                if (Thread.interrupted())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }

具体流程如下:

这里写图片描述

常用锁

ReentrantLock

ReentrantLock,重入锁,能够支持一个线程对资源的重复加锁。该锁支持获取锁时的公平性和非公平性选择。

final boolean nonfairTryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }
protected final boolean tryRelease(int releases) {            int c = getState() - releases;            if (Thread.currentThread() != getExclusiveOwnerThread())                throw new IllegalMonitorStateException();            boolean free = false;            if (c == 0) {                free = true;                setExclusiveOwnerThread(null);            }            setState(c);            return free;        }

公平锁和非公平锁的唯一区别是hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断。

公平锁保证了锁的获取按照FIFO原则,而代价是进行了大量的线程切换;非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了更大的吞吐量。

ReentrantReadWriteLock

ReentrantReadWriteLock,可重入读写锁,实现了ReadWriteLock接口,通过readLock()方法和writeLock()方法获取读锁和写锁。

读写状态设计

读写锁需要在同步状态上维护多个度线程和一个写线程的状态。读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。假设当前读写状态为S,写状态等于S&0x0000FFFFF,读状态等于S>>>16。写状态增加1时,等于S+1,读状态增加1时,等于S+(1<<16)。

写锁获取

写锁是一个支持重入的排它锁。如果当前线程已经获取了写锁,则增加写状态;如果读锁已经被获取或者获取写锁的线程不是当前线程,则当前线程进入等待状态。

protected final boolean tryAcquire(int acquires) {            Thread current = Thread.currentThread();            int c = getState();            int w = exclusiveCount(c);            if (c != 0) {                // (Note: if c != 0 and w == 0 then shared count != 0)                if (w == 0 || current != getExclusiveOwnerThread())                    return false;                if (w + exclusiveCount(acquires) > MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                // Reentrant acquire                setState(c + acquires);                return true;            }            if (writerShouldBlock() ||                !compareAndSetState(c, c + acquires))                return false;            setExclusiveOwnerThread(current);            return true;        }

读锁获取

读锁是一个支持重入的共享锁,在没有其他写线程访问时,读锁总会被成功获取,如果当前线程已经获取读锁,则增加读状态。而当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。

protected final boolean tryReleaseShared(int unused) {            Thread current = Thread.currentThread();            if (firstReader == current) {                // assert firstReaderHoldCount > 0;                if (firstReaderHoldCount == 1)                    firstReader = null;                else                    firstReaderHoldCount--;            } else {                HoldCounter rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                int count = rh.count;                if (count <= 1) {                    readHolds.remove();                    if (count <= 0)                        throw unmatchedUnlockException();                }                --rh.count;            }            for (;;) {                int c = getState();                int nextc = c - SHARED_UNIT;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }

锁降级

锁降级是指把持住当前拥有的写锁,再获取到读锁,随后释放写锁的过程。重点在于先获取读锁,在获取读锁的过程中保持本身的写锁,保证了数据的可见性。

Condition接口

Condition接口提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。

public interface Condition {
//相当于Object的wait()方法 void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; //相当于Object的wait(long timeout)方法 boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; //相当于Object的notify()方法 void signal(); //相当于Object的notifyAll()方法 void signalAll();}

总结

一般而言,Lock对象能将synchronized关键字替换掉,是synchronized关键字的进阶。掌握Lock有助于学习并发包中源代码的实现原理,才能在实际开发中运用自如。

你可能感兴趣的文章
C++ 继承&多态
查看>>
C++多继承的观察和7点体会(都是实用派的观点) good
查看>>
python socket编程详细介绍
查看>>
高人对libsvm的经典总结(全面至极)
查看>>
Linux下c语言多线程编程
查看>>
火狐下easyui1.3.*弹出window框定位不到中间解决把办法
查看>>
Docker拉取镜像失败报错Error response from daemon: Get https://registry-1.docker.io解决办法
查看>>
IO操作的工具类总结
查看>>
对指定文件或目录进行压缩和解压缩的工具类总结
查看>>
Java中如何遍历Map对象的4种方法
查看>>
图片延时加载例子详解
查看>>
js获取url参数值的两种方式详解
查看>>
java中System.getProperty()方法详解
查看>>
MyEclipse设置默认注释的格式
查看>>
同一服务器部署多个tomcat时的端口号修改详情
查看>>
常用正则表达式集锦
查看>>
Spring定时器的时间表达式
查看>>
fastdfs简介
查看>>
主键和唯一索引的区别
查看>>
linux下使用yum安装gcc详解
查看>>