06月13, 2020

Java并发编程不完全指北_[01_JMM]

Java并发编程系统性梳理第一篇,主要从执行效率、内存可见性、顺序性、JMM内存模型等方面梳理并发编程里的基础概念。

从一个例子开始

让我们先假设一个业务场景来体会并发带来的性能优势。

从一堆4月~6月跨度的日志文件里统计出所有ERROR级别的日志条数

alt

串行实现

  • 文件扫描

    // 扫描路境内所有log文件
      public  File[] scanFiles(String path){
          final File files = new File(path);
          final File[] logFiles = files.listFiles((dir, name) -> name.endsWith(".log"));
    
          assert logFiles != null;
          System.out.println("FILES: [" + logFiles.length + "]");
          return logFiles;
      }
    
  • 串行化分析

    public Integer analysis(File[] files) throws IOException {
    
          // 计数器
          int counter = 0;
    
          for (File file: files
               ) {
              final BufferedReader reader = new BufferedReader(new FileReader(file));
              String currentLine;
              // 按行读取,包涵'[ERROR]'即错误日志
              while ((currentLine = reader.readLine()) != null){
                  if(currentLine.contains("[ERROR]")){
                      counter += 1;
                  }
              }
              reader.close();
          }
    
          return counter;
      }
    

并行实现

 public Integer analysisInParallel(File[] files) throws ExecutionException, InterruptedException {

    // 原子整型;操作保证线程互斥
    final AtomicInteger counter = new AtomicInteger(0);

    // F/J线程池,提供于文件集合的流式操作
    final ForkJoinPool pool = new ForkJoinPool(8); // 8线程并行
    pool.submit(() -> { // 提交任务
        Arrays.stream(files)
                .parallel() // 开启并行处理,使用pool所提供的线程资源
                .forEach(f -> { // 每个日志文件
                try {
                // 流式并行读取每行内容
                int sum =  (int) Files.lines(Paths.get(f.getAbsolutePath()), Charset.defaultCharset())
                        .filter(c -> c.contains("[ERROR]")) // 选中[ERROR]级别日志
                        .count();
                counter.set(counter.get() + sum); // 原子计数

            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }).get();

    return counter.get();
}

测试结果

public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {

    CharactorCounter counter = new CharactorCounter();
    File[] files = counter.scanFiles("/Users/fyang/system_log/galaxy-open-api-zuul/error/");

    // 串行操作
    long start = System.currentTimeMillis();
    Integer errorCount = counter.analysis(files);
    long end = System.currentTimeMillis();
    System.out.println("TOTAL ERROR: " + errorCount + " SPEND IN: [" + (end - start) + "]ms");

    // 并行操作
    long pStart = System.currentTimeMillis();
    Integer errorCount2 = counter.analysisInParallel(files);
    long pEnd = System.currentTimeMillis();
    System.out.println("TOTAL ERROR: " + errorCount2 + " PARALLEL SPEND IN: [" + (pEnd - pStart) + "]ms");
}
  • 执行结果

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3314]ms TOTAL ERROR: 1394748 PARALLEL SPEND IN: [963]ms

性能提升

我运行测试的机器搭载一颗 Quad-Core Intel Core i7 4核处理器4核8线程处理器(HTT),在时间跨度22天容量超过1GB超过百万行的日志文件测试条件下,8线程并行的函数比串行函数效率提高了3倍有余,可以想象在测试样本继续放大的情况下,性能的提升会更为明显。性能提升也是并发程序带给我们最直观有效的体验。

所谓x核y线程处理器指:Intel处理器上的超线程技术(Hyper-Thraading Techlolegy| HTT)。x为物理处理器核心数,y为通过HTT技术共享寄存器组成的“逻辑处理核心”,可实现在单物理核心下并行操作(无线程上下文切换操作,操作系统会将逻辑核心与物理核心相等看待)

感谢Luca同学对CPU超线程技术对测试结果影响的指出

一些问题

线程数量与执行效率(CPU时间片与线程上下文切换)

你一定会好奇线程数量和运行效率的提升是呈正比的吗?是否只要我创建的线程数量越大我的执行效率就越高?

还是一样的测试样本,当我调整运行线程的数量会观察到一个反直觉的测试结果。

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [4329]ms THREAD SIZE: 2 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1955]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3492]ms THREAD SIZE: 3 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1457]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3456]ms THREAD SIZE: 4 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1017]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3626]ms THREAD SIZE: 5 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1145]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3681]ms THREAD SIZE: 6 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1322]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3662]ms THREAD SIZE: 7 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1208]ms

FILES: [22] TOTAL ERROR: 1394748 SPEND IN: [3597]ms THREAD SIZE: 8 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [989]ms

我不断的调整并行流式计算的线程数量从2线程到4线程执行效率明显提升,4线程的执行效率约为2线程的2倍([1017]ms | [1955]ms)。可是当继续增加工作线程的数量时(5~7)执行效率反而会退化([1145]ms |[1322]ms |[1208]ms),继续增加到8线程时该程序执行效率达到最优值。

由此引出并发编程的第一个基础知识:并发任务下CPU的执行机制

alt

如上图所示,当只有一个运行任务时,CPU会把全部资源用于该任务处理。当多任务并行执行时,CPU需要将资源分配给不同的任务。CPU最小的处理单元是核心,当单核心面对多任务时分配时,CPU会在单位时间内在不同任务间切换,这个处理时间称之为“时间片”(Time slice),只要时间片时间足够短(10+ms)就会造成任务在并行的“错觉”(类似游戏显示技术中的fps,将120帧动画放在一秒内输出就会让肉眼与大脑得到极为流畅的视觉体验)。从另一方面来看,CPU切换涉及到对当前任务信息的保存/读取(线程上下文操作)所以其本身会影响到并发程序的执行效率。

回过头来看ForkJoinPool中的线程数量变化与执行效率的关系——并行流式计算(Stream)本身基于分治算法Fork/Join模型(参考归并排序/MapReduce等,后文会提及)将任务拆分给工作线程去执行,各线程执行完毕后再汇总执行结果。在测试案例中4核CPU恰好能在4线程下将每个核心任务跑满,当5-7线程时,4核开始存在CPU时间片切换与提前完成任务的线程需等待处理中的任务线程所以出现了效率退化。当8线程时,每个核心分配2线程1个独立线程对任务拆分并进行各自的较为均衡的任务切换无线程上下文切换所以达到了最优效率。

按照思路继续提高线程数至16,即每个核心维护4线程任务:

FILES: [22] THREAD SIZE: 16 TOTAL ERROR: 1394748 PARALLEL SPEND IN: [1214]ms

执行效率依然出现退化,可以推断的是在当前测试条件下(22个文件,4核CPU)4线程的上下文切换耗时已大于任务处理本身的执行耗时。

因此,在编写并发程序时,要考虑到切换上下文切换所带来的性能损耗,根据处理数据量的大小与执行机器的CPU核心数量设置合适的并发线程数是提高执行效率的关键之一。

合适的并发容器(内存可见性问题)

在测试案例中,你一定注意到了对于“串行化“程序和“并行化”程序我选择了不同的计数容器——intAtomicInteger。这是因为并行任务下多线程切换带来的另一个副作用——内存可见性问题,而AtomicInteger利用volatile关键字保证了计数变量的数据同步(后文详解)。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    ...
}

在Java并发的世界里还有各种各样的并发容器支持着不同的并发场景,他们往往基于着不同的底层技术(CAS/Copy-On-Write/Semphore/Lock...), 所以根据具体的业务场景去选择不同的并发容器是保证数据正确的关键

合适的同步机制(操作的原子性)

再模拟一个简单的电商售货的场景:

public class UnsafeDemo01 {

    // 库存
    private int stock = 100;

    // 库存-1
    public void decreaseStock() {

        // 不能超卖
        while (stock >= 1) {
            System.out.println(Thread.currentThread());
            stock -= 1;
            try {
                // 等待一会让其它线程介入
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {

        final UnsafeDemo01 merchant = new UnsafeDemo01();

        // 顾客
        final Thread customer1 = new Thread(merchant::decreaseStock);
        final Thread customer2 = new Thread(merchant::decreaseStock);
        final Thread customer3 = new Thread(merchant::decreaseStock);

        customer1.start();
        customer2.start();
        customer3.start();

        // 等待全部线程执行完毕
        customer1.join();
        customer2.join();
        customer3.join();

        System.out.println("STOCK: [" + merchant.stock + "]");
    }
}

执行结果:

... Thread[Thread-1,5,main] Thread[Thread-2,5,main] Thread[Thread-0,5,main] Thread[Thread-2,5,main] Thread[Thread-1,5,main] Thread[Thread-1,5,main] Thread[Thread-0,5,main] Thread[Thread-2,5,main] Thread[Thread-0,5,main] Thread[Thread-1,5,main] Thread[Thread-2,5,main] STOCK: [-2]

三名顾客线程交替购买,在并发的过程里由于线程切换暴露了stock -= 1;发生了操作原子性问题,最终导致了库存超售。 在某些业务场景,如商品秒杀,其多线程并发对共享数据进行操作时,如何正确进行原子性操作是了保证数据正确的另一个关键点

程序执行的优化机制(操作的有序性)

双重检查锁的“单例”

在之前微服务闲谈03_网关服务中我们一起探究了双重检查锁下单例有概率失效的问题。

贴出原文相关部分先回顾一下:

DoublecheckDummySingleton [线程安全?JVM指令重排序所造带来的风险]
  • 带条件的同步检查可缩小同步粒度
  • 双重检查在编程语言语义上保证线程安全
public class DoublecheckDummySingleton {

    private DoublecheckDummySingleton() {
    }

    private static DoublecheckDummySingleton instance;

    public static DoublecheckDummySingleton getInstance() {

        if (instance == null) {
            synchronized (DoublecheckDummySingleton.class) {
                if (instance == null) {
                    instance = new DoublecheckDummySingleton();
                }
            }
        }

        return instance;
    }
}
  • 执行优化

除了增加高速缓存之外,为了使得处理器内部的运算单元能尽量被充分利用,处理器可能会对输入代码进行乱序执行(Out-Of-OrderExecution)优化,处理器会在计算之后将乱序执行的结果重组,保证该结果与顺序执行的结果是一致的,但并不保证程序中各个语句计算的先后顺序与输入代码中的顺序一致,因此,如果存在一个计算任务依赖另外一个计算任务的中间结果,那么其顺序性并不能靠代码的先后顺序来保证。与处理器的乱序执行优化类似,Java虚拟机的即时编译器中也有类似的指令重排序(InstructionReorder)优化。

周志明. 深入理解Java虚拟机:JVM高级特性与最佳实践(第2版) (原创精品系列) (Kindle 位置 6776-6780). 机械工业出版社. Kindle 版本.

  • as-if-serial语义

    不管怎么重排序( 编译器 和 处理器 为了 提高 并行 度),( 单线程) 程序的执行结果不能被改变。 编译器、 runtime 和 处理器都必须遵守 as- if- serial 语义。

方腾飞; 魏鹏; 程晓明. Java并发编程的艺术 (Java核心技术系列) (Kindle 位置 789-790). 机械工业出版社. Kindle 版本.

double pi = 3. 14; // A 
double r = 1. 0; // B 
double area = pi * r * r; // C

alt

……

问题根源: Foo foo = new Foo();

memory = allocate();    // 1: 分配对象的内存空间 
ctorInstance(memory);   // 2: 初始化对象 
instance = memory;     // 3: 设置instance 指向刚分配的内存地址

alt

memory = allocate();    // 1: 分配对象的内存空间 
instance = memory;     // 2: 设置instance 指向刚分配的内存地址
ctorInstance(memory);   // 3: 初始化对象 

重排序优化

从上文可看出双重检查锁的失效源自于指令重排序带来的负面影响,从CPU到操作系统到语言编译器各方的优化在非同步的内存模型下皆有可能出现“反直觉”的结果。

JMM

以上,我们可以将遇到的问题拆解为线程并发的执行效率,内存可见,执行顺序三个方面,那么理解上述问题有一些需明确的前提:

###Java 线程通信方式

线程间通信方式有两种,利用总线间的消息收发传递和对共享内存(即堆内存Shared variables/Heap memory)的读写,Java线程的通信属于后者。如A线程对内存共享变量X的写操作需要B线程通过对X变量的读来得知。

Java内存模型(重要)

JMM(Java Memery Model )即Java内存模型是理解内存可见性的关键。 当今的JMM构建与JSR133标准(http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html)之上,其主要描述了:

  1. 内存模型对于CPU多级缓存技术下的限制
  2. Read/Write的前后关系所导致的可见性问题(Happens-Before原则)
  3. CPU/操作系统/语言层面的优化导致的顺序一致性问题 (As-if-serial 语义)
  4. 互斥锁机制(Monitor/屏障指令)
  5. 线程通信模型 (等待通知机制 Wait-Notify)

可以说Java并发编程都是由JMM为基础发散而来,它解释了多线程并发所导致的线程不安全问题以及为安全与效率平衡的并发技术提供理论基础。

JMM概念模型

alt

JMM内存模型可抽象上图——内存被分为线程私有内存(Stack)与线程共享内存(Heap)。共享内存是用于上面提到的线程间通信,它对所有线程可见;而私有内存是为了执行效率考虑(虽然内存读写效率相对硬盘优势明显但在CPU寄存器与多级缓冲区面前仍然相形见绌)在每个线程内部持有共享内存中变量的拷贝且只对线程内部可见。

对于线程A的操作X += 1 :

  1. 从共享内存中读取 X的值 0 并缓存至私有内存中。
  2. 将X的值+1运算赋值并回写至共享内存。

所以在当前情况下ThreaA ThreaB ThreadC并行执行时,线程各自获取到CPU时间片的单位时间内所见的变量X是不可控的,即在那个时刻共享内存中的X。这就是线程间通信的可见性问题,也正是这种不确定性问题带来了并发编程中各种反直觉体验(如上文的电商库存计算bug)。

volatile 与 happens-before

  • volatile

为了解决以上的可见性问题,JSR133提出了volatile关键字。volatile本意为“不稳定的,易变的”,这里恰如其分的用来解决有内存可见性问题的变量,被volatile所修饰的变量需要遵循happens-before原则。

  • happens-before

happens-before字面意思为“先前发生”,用于描述操作间内存可见性的关系。 在JMM模型中从两个角度描述了happens-before的关系,例如 A -hb-> B (A操作 happens-before B操作)

  1. JMM向Java语义层面:A -hb-> B A的执行顺序在B之前且A的执行结果一定对B可见。
  2. JMM向操作系统层面: A -hb-> B 如果A的执行结果一定对B可见,那么A与B的执行顺序可没有约束,即在保证A操作的写结果对B操作的读是可见的,那么执行编译器或CPU可乱序执行AB操作。

以上,我们可以总结出:

A -hb-> B, A的操作结果对B操作可见。

在JSR133的FAQ对同步(What does synchronization do?)小节里明确了happens-before的具体规则:

http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#synchronization

  1. 程序顺序,自然顺序规则: 线程内的*遵循一个全序顺序即我们代码编写的先后顺序,即每一个操作顺序happens-before于后续操作。
  2. Monitor规则: 对于同一个Monitor的加锁happens-before于解锁操作。Monitor的典型实现为synchronized同步代码块,可以想象同步代码块内部的操作结果对后续同步块外部操作是可见的。(这也是为什么当我们在上面的电商库存例子中使用synchronized同步后即可正常计算并发后的库存数量)
  3. volatile规则:对于同一个volatile变量的写操作happens-before于所有后续的读操作
  4. 线程启动规则:对一个线程的启动.start(),当前操作happens-before于这个.start()操作。
  5. 线程等待规则:对一个线程内部的所有操作happens-before于外部等待.join()操作之后的操作。

例如:

    public static void main(String[] args) throws InterruptedException {

    final UnsafeDemo01 merchant = new UnsafeDemo01();

    // 顾客
    final Thread customer1 = new Thread(merchant::decreaseStock);
    final Thread customer2 = new Thread(merchant::decreaseStock);
    final Thread customer3 = new Thread(merchant::decreaseStock);

    int x = 0;

    customer1.start();
    customer2.start();
    customer3.start();

    // 等待全部线程执行完毕
    customer1.join();
    customer2.join();
    customer3.join();

    System.out.println("STOCK: [" + merchant.stock + "]");
}

对于当前线程main-thread中的变量x = 0 , 对线程 customer1 customer2 customer3 内部可见。

线程 customer1 customer2 customer3 内部对变量stock操作对于当前线程main-thread System.out.println("STOCK: [" + merchant.stock + "]"); 可见。

源自于平衡性的追求

我们再拔高一点看并发模型的问题其实就是对于内存模型取舍。 再细化一下上文中的JMM概念模型图会发现操作系统为了减少线程写操作对用消息总线的占用而加入了写缓冲区的概念,对于一个常规的写操作会从线程私有内存写入缓冲区在批量一次性刷新至共享内存中,而这个缓冲区内存对同个处理器下的线程来说是相互可见的。

alt

例如:

上图中 TA 与 TB 分别更改 x 与 y 的值并且读取对方更改的值赋于私有变量a/b ,最终我们预想对于TA a = y = 2 ,对于TB b = x = 1,但是很有可能的情况是 a = b = x = y = 0。

这是源自于当今主流x86架构下的CPU为了效率考虑都引入了如上的自带写缓冲区的内存模型,在该模型下允许内存级别的写-读乱序(Store-Load),因为写操作(x = 1 / y = 2)先将值写入了写缓冲区等待缓冲区的统一刷新操作,此时后续的读操作从共享内存内读取(a = y / b = x)出未经修改的值 0 。

在单线程内因为 x / a / y 变量不存在依赖关系,所以先执行 x = 1 还是 先执行 a = y 并未违反 as-if-serial 语义 (不会对最终a结果产生影响) ,所以该乱序重排在编译器层面也是可以被接受的。

而另一种内存模型顺序一致性模型

alt

该模型下多线程对共享变量的读写操作全部串行化执行,这种模型下以性能为代价换取了数据操作的一致性。

所以,JMM选择了顺序性一致性模型为指导加入了happens-before为前提的内存概念模型,在保证最终结果一致的前提下适当打破顺序性一致性的执行顺序约束以提高指令的并行程度,在数据安全与执行效率间找到了合适的平衡点。

无处不在的平衡性

  • 在数据库的事务概念里也有相似的理念——ACID四原则中,为了性能考虑常常会用I(Isolation 隔离性)适当破坏C(Consistency 一致性)去提高性能,其实现就是常常提到的“隔离级别”。 例如:MySQL(InnoDB)的默认隔离级别为 RR (可重复读 Repeatable-Read)与Oracle的默认隔离级别RC(读已提交 Read-Commited)间的区别。

RR: Write --> | Read --> Read |

RC: | Write --> Read --> Read |

上面的顺序图示意解释了两种隔离级别的差异:

  • RR 在[读读]操作加上了读锁,故在一个[读读]事务单元里写操作无法执行以实现RR级别的特性——一个事务单元的多次读操作是一致的。
  • RC 在[读读]操作的基础上添加了读锁可升级为写锁,即两次读操作间可以穿插写操作,故在事务单元里多次读操作可能会读到不同的值。

从数据一致性角度来看RR比RC有着更高的隔离级别,另一方面从执行效率来说RC比RR因为在读写的并行度更高所以有更好的执行效率。(但这单单只是从读写锁的角度分析事务隔离级别的实现对执行效率的影响,不代表Oracle就一定在默认隔离级别下有比MySQL有着更好的事务处理性能,因为事实上MySQL利用MVCC技术实现事务处理本质上是CopyOnWrite而非简单的读写锁。)

这就是数据库事务层面利用隔离性对一致性的适当破坏来实现数据一致与性能间的平衡。

更多数据库事务原理分析可参考阿里分布式事务专家沈询相关文章:

http://jm.taobao.org/2017/02/09/20170209/

  • 分布式系统概念里有一个CAP原则,也被称为不可能三角,即在分布式系统中C (Consistency 一致性) A (Ability 可用性)P (Partition Tolerence 分区容忍性)最多只能达到其中两点而放弃另外一点。在一般的分布式架构下为了保证系统的高可用性会选择A与P而放弃一段时间的C,只要求达到最终一致性。

故在多种条件下做出取舍而求平衡是计算机设计中的艺术,这也是JMM设计的微妙之处,希望你可以细细体会。

总结

  • 从一个统计任务开始我们体会到了多线程并发对执行效率的明显提升,要做好线程数量的预估因为线程的上下文切换仍然需要消耗性能。

  • 因为内存可见性的问题在并发编程中出现了反直觉的数据错误。而比可见性问题更隐蔽的是从硬件到编程语言编译器的层层优化会让指令乱序执行。

  • 可见性和顺序性问题源自于Java的内存模型以及为了解决此问题而提出的JSR133即JMM。

  • JMM是Java并发编程的理论基础,理解JMM的设计思想的关键在于追求“平衡性”。

参考:

1. JSR-133: JavaTM Memory Model and Thread Specification

2. JSR 133 (Java Memory Model) FAQ

3.《Java并发编程的艺术》

4. 阿里沈询:分布式事务原理与实践

本文链接:https://check321.net/post/concurrency_in_java_01.html

-- EOF --

Comments

请在后台配置评论类型和相关的值。