java多线程高并发入门第二讲 weir 2018-03-07 12:57:53.0 java,多线程,并发 1161 Java的多线程并发出现大概是从jdk1.5开始的,其实一直都有只不过之前的性能不好而已,1.5之后在Doug Lea老爷子手里让java一下子大放光彩。同时也使得多线程并发和NIO变得复杂起来,如果不是持续关注这块还有不断在项目中实践想吃透这块内容还很是天方夜谭,我现在也是在学习从来没用这块学过底层的架构代码。看好多文章说想吃透这块要从AbstractQueuedSynchronizer(AQS)入手,对于入门的人来说基本不可能,我们还是从内存说起,我们都知道内存里面存放很多进程,进程里面又有很多线程,他们是怎么跟CPU交互的恐怕知道人就少了我也不知道,查资料说是有个多道程序处理体系,那就有问题了多个进程都在内存中CPU到底先执行谁有什么规则对不对这是要思考的问题?还有多线程又是怎么让CPU来执行的,我想很多人都知道,当有多个线程同时执行时CPU执行的是没有什么固定的顺序的,你写几个Thread试试就知道。如果我们想实现达到执行顺序时就需要对线程进行控制这里也是需要思考的问题点?我们现在不关心进程的话就把重点放在线程上面,怎么控制多个线程的执行顺序,大家都知道线程在执行的时候也会出现等待资源的情况,比如IO非常慢就会IO等待,CPU这时就空闲了,那就要想办法立刻去执行别的线程不要让CPU空闲下来造成资源浪费,所谓的高性能就是时时刻刻都让计算机每个硬件资源都在工作而不是一个忙得要死一个闲的要死。 我们看AbstractQueuedSynchronizer的源码发现包括我们看jdk文档说明都说到先进先出FIFO队列,也就是说把多个线程放在队列中从而来控制队列中线程的执行与否, static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } Doug Lea大神对并发性能的提升都在这个内部静态类里面了,看着像不像链表,你能理解队列和链表那就完全可以理解AQS这个东西了,基本AQS没有什么神秘了,大家大致浏览一下AQS的源码就会发现,还多代码都是多链表的操作,剩下好多同步的是使用Unsafe这个我们没法左右了,大家还会发现有ConditionObject这么一个内部类,看看你发现他好像也维护了一个链表,这里还会让人不明白的是既然是实现线程等待、唤醒用的怎么没有Thread里面的方法,那这里是实现线程的等待和唤醒的?这里就是跟Thread线程最大的区别了,这里判断线程是否换型是否可运行都是通过状态标记来控制的。 protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 你会发现,state状态的同步也交给了C代码,就是专业上说的CAS,这里还有别的话题可以说这里先不说了。 AQS锁只有两种独占和共享,独占好理解,我一旦获取到锁那别人就都要等待,同一时刻只有一个获取同步锁;共享如果区别独占来看就是同一时刻可以有多个线程获取到同步锁,但是抢到同步锁之后CPU怎么执行这些线程那就不好说了,这个跟多个Thread执行时抢占CPU是一样的,没有什么规律性。 我们现在分析一下AQS的数据结构:队列加链表,链表有前驱后驱,还有等待链表就是ConditionObject里面的waiter,也是个链表,线程在同步队列和等待队列中切换,这中间的代码就是怎么维护队列和链表了,大家看源码就会发现这点,说实话没有一定功底的java基础不一定能看懂,我反正就很吃力,还要有结构空间想象力,想象有一个管道,外面东西一旦进去就只能排队了,他前面的不出去你永远出不去,链表就比较好理解了,你的前面和后面都做了标记这样就知道你前面是有对象后面是否有对象。这样你再去看代码可能会好理解和分析一点其实也不是很难,要比B-tree好理解多了,这块应该是AQS甚至是java1.5之后新的并发包整个最关键的代码了,搞清楚这块并发就基本通了。看这块代码还需要注意涉及到同步的基本都是Unsafe完成的所以大家不要疑惑,这也是为了封装更底层的东西不想让代码看起来复杂同时也是为了性能考虑,也有一大部分原因是为了java的平台无关这一特性。unsafe.compareAndSwapInt(this, stateOffset, expect, update);看到这种代码知道干什么就行了不必再去纠结他又是怎么实现的。 AQS这个类看上起两千多行代码但是暴露出来的可以用的并不多,现在告诉你他就像是个模块一样,通过让别的类继承来用的你也不是很明白,等下我们来用他写一个同步锁的例子你对它的认知就会好很多 package weir.lock.aqs; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class MyLock implements Lock{ private Sync sync = new Sync(); private class Sync extends AbstractQueuedSynchronizer{ private static final long serialVersionUID = -1342993319166571156L; //独占模式 @Override protected boolean tryAcquire(int arg) { //第一个线程进来可以拿到锁,其它线程再进来拿不到 int state2 = getState(); if (state2 == 0) { if (compareAndSetState(0, arg)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } } return false; } @Override protected boolean tryRelease(int arg) { //判断当前线程即可 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new RuntimeException(); } int state2 = getState() - arg; boolean falg = false; if (state2 == 0) { setExclusiveOwnerThread(null); falg = true; } setState(state2); return falg; } Condition newCondition(){ return new ConditionObject(); } } @Override public void lock() { //以独占方式获取锁 sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } } private class Sync extends AbstractQueuedSynchronizer 看这个AQS基本的固定用法,里面是用独占模式还是共享模式这个在去调用: protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } AQS默认是抛出异常,这里就是需要我们去自己实现的了,要做什么样的锁逻辑判断,我上面写的逻辑大家应该可以看懂吧。然后就是public class MyLock implements Lock 重写Lock几个方法,里面的实现就可以用sync也就是AQS里面提供的方法了。大家多看看这段代码,是不是很固定的写法,其他复杂的写法这个基本是个骨架,从简单入手慢慢深入。 package weir.lock.aqs; public class TestMyLock { private int v; public int next() { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } return v++; } public static void main(String[] args) { TestMyLock myLock = new TestMyLock(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread()+" "+myLock.next()); } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread()+" "+myLock.next()); } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread()+" "+myLock.next()); } } }).start(); } } 在执行这段代码你会发现v会有重复数值出现。我们改造成加锁的, private MyLock lock = new MyLock(); public int next() { lock.lock(); try { Thread.sleep(200); return v++; } catch (InterruptedException e) { throw new RuntimeException(); }finally { lock.unlock(); } } 就改动这块,在运行试试。到现在你对AQS是不是有些认知了,我还是想说如果要我现在出入的总结一下整个新的并发底层是怎么实现的就是不管你有多少个线程要运行,我都会把每个线程放进队列里面,然后通过独占和共享规则,还有用户指定的线程是否有顺序在不断地等待唤醒线程来让CPU获取到执行。知道队列和链表的运行逻辑你会发现其实也没那么难理解了,大家先消化消化。 接下来说说Condition接口,实现是ConditionObject正好也在AQS中,前面也提到了 简单一看就会发现,其实就是让线程等待或者唤醒一个线程,跟Thread里面的方法功能一样,Object里面的wait和notify,这里的ConditionObject功能比较强大的是可以指定某个线程等待或唤醒,而Object里面的是不行的,这个地方源码层面是比较难理解的,有人用等待队列来描述他,还有从等待队列到同步队列的交互,一个线程要么在同步队列中要么在等待队列中,这点不难理解吧,还是先看两个例子吧: package weir.lock.aqs; /** * a ——》b——》c 顺序执行 * @author weir * 2018年3月7日 下午7:44:30 * */ public class ConditionNoTest { private int signal; public synchronized void a() { while (signal !=0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("aa"); signal++; notifyAll(); } public synchronized void b() { while (signal!=1) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("bb"); signal++; notifyAll(); } public synchronized void c() { while (signal!=2) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("cc"); signal = 0; notifyAll(); } public static void main(String[] args) { ConditionNoTest demo = new ConditionNoTest(); A a = new A(demo); B b = new B(demo); C c = new C(demo); new Thread(a).start(); new Thread(b).start(); new Thread(c).start(); } } class A implements Runnable{ private ConditionNoTest demo; public A(ConditionNoTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.a(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class B implements Runnable{ private ConditionNoTest demo; public B(ConditionNoTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.b(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class C implements Runnable{ private ConditionNoTest demo; public C(ConditionNoTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.c(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } 再看用Condition来实现: package weir.lock.aqs2; /** * a ——》b——》c 顺序执行 * @author weir * 2018年3月7日 下午7:44:30 * */ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest { private int signal; Lock lock = new ReentrantLock(); Condition a = lock.newCondition(); Condition b = lock.newCondition(); Condition c = lock.newCondition(); public void a() { lock.lock(); while (signal !=0) { try { a.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("aa"); signal++; b.signal(); lock.unlock(); } public void b() { lock.lock(); while (signal!=1) { try { b.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("bb"); signal++; c.signal(); lock.unlock(); } public void c() { lock.lock(); while (signal!=2) { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("cc"); signal = 0; a.signal(); lock.unlock(); } public static void main(String[] args) { ConditionTest demo = new ConditionTest(); A a = new A(demo); B b = new B(demo); C c = new C(demo); new Thread(a).start(); new Thread(b).start(); new Thread(c).start(); } } class A implements Runnable{ private ConditionTest demo; public A(ConditionTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.a(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class B implements Runnable{ private ConditionTest demo; public B(ConditionTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.b(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class C implements Runnable{ private ConditionTest demo; public C(ConditionTest demo) { this.demo=demo; } @Override public void run() { while (true) { demo.c(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } 看懂这些代码,你再去看源码,我这里先不进行源码层面的分析,大家要循序渐进的体会我之前提到的队列和链表还有state这个状态的重要作用。