08月31, 2020

Java并发编程不完全指北_[02_线程]

Java并发编程系统性梳理第二篇,从线程状态开始串联到线程的通信方法及其典型应用。

线程

当我们在讨论程序的并发编程时,实际上并发的对象就是程序的线程——线程作为操作系统可调度的最小单元配合着CPU运行时的高速切换以提高处理效率,所以深入了解并发编程也要深入了解线程的运作机制。

生命周期

线程技术存在于操作系统的层面,而编程语言对线程技术的支持实际上是对操作系统概念的封装整合,如Java语言将线程抽象于java.lang.Threadjava.lang.Runnable,其生命周期的标准也与操作系统间有所差异。

OS

在操作系统层面线程有四种基本状态,分别为: https://zh.wikipedia.org/wiki/%E7%BA%BF%E7%A8%8B#%E7%8B%80%E6%85%8B

产生(spawn)

阻塞(block)

非阻塞(unblock)

结束(finish)

Java

Java语言细化了OS层面对线程状态的表述,以满足编程语言对其更好的支持: alt

  • NEW 当new Thread() 或者 new Runable()创建线程对象后,此时线程状态处于NEW(初始化),其实OS层面此刻的线程并没有被真正创建,这一步的目的在于编程语言层面复写run()对线程执行逻辑进行定义。
  • RUNABLE 该状态下描述两种线程动作:
  • 在调用.start()后线程在OS层面被创建进入就绪状态并等待获取CPU时间片执行
  • 线程正在运行
  • WAITING 该状态是在调用.wait() .join() .park()的等待API后使线程进入“等待状态”同时线程被阻塞并等待其他线程对其通知唤醒
  • TIME-WAITING 在.wait() .join() .park()等待API的基础上可传入long类型的时间参数对等待时间进行控制,当时间结束后线程被激活,该方法通常被使用于竞争条件避免死锁的发生。

.join()的超时API方法签名:

   /**
     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     *
     * <p> This implementation uses a loop of {@code this.wait} calls
     * conditioned on {@code this.isAlive}. As a thread terminates the
     * {@code this.notifyAll} method is invoked. It is recommended that
     * applications not use {@code wait}, {@code notify}, or
     * {@code notifyAll} on {@code Thread} instances.
     *
     * @param  millis
     *         the time to wait in milliseconds
     */
    public final synchronized void join(long millis)
    throws InterruptedException {
       ...

    }
  • BLOCKED 当线程试图访问被synchronized .lock()等加锁API控制的资源内容且竞争失败时会暂且进入该状态并被阻塞,等待锁资源释放后重新竞争。
  • TERMINATED 在调用.interupted()`.stop()[废弃]会更改线程的**中断标记**, 该状态下表示线程周期已经结束.isInterupted()会返回线程标记状态,**注意会抛出java.lang.InterruptedException`的API在抛出异常前会清除中断标记**

.join()

 * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final void join() throws InterruptedException {
        join(0);
    }

线程快照

下面的例子验证线程生命周期的流转,并从线程快照中查看线程的状态变化:

  • .print()synchronized锁保护并输出当前线程名称后睡眠60s:

    public static synchronized void print(){
      System.out.println("current-thread is: " + Thread.currentThread().getName());
      try {
          Thread.sleep(60000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
    }
    
  • Hunger线程类调用.print()

static class Hunger implements Runnable{
        @Override
        public void run() {
            print();
        }
    }
  • main创建启动 hunger-01/hunger-02 两个线程去竞争访问同步方法.print().join()阻塞等待两个线程执行完毕。
public static void main(String[] args) throws InterruptedException {

    Thread t1 = new Thread(new Hunger(), "hunger-01");
    Thread t2 = new Thread(new Hunger(), "hunger-02");

    t1.start();
    t2.start();

    t1.join();
    t2.join();

}
  • 在启动main方法后,使用jps命令查看进程ID:

    ➜  concurrency-demo jps
    61472 Launcher
    61473 ThreadStatDemo
    61219 Launcher
    61476 Jps
    61222 RemoteMavenServer36
    
  • jstack 61473 输出demo进程线程快照

  • main方法控制台输出:

    current-thread is: hunger-01 说明hunger-01争抢到了互斥锁,并开启了5s的睡眠

  • hunger-02在等待hunger-01释放锁对象,因此进入BLOCKED状态: (还值得注意"wait lock"的锁对象即ThreadStatDemo,因为这里互斥锁修饰在静态方法.print()上,锁对象即当前Class实例对象)

    "hunger-02" #12 prio=5 os_prio=31 tid=0x00007f7f2d81f800 nid=0xa703 waiting for monitor entry [0x000070001158b000] java.lang.Thread.State: BLOCKED (on object monitor)

     at net.check321.concurrencydemo.example.ThreadStatDemo.print(ThreadStatDemo.java:7)
     - **waiting to lock<0x000000076adaf3d8> (a java.lang.Class for net.check321.concurrencydemo.example.ThreadStatDemo** )
     at net.check321.concurrencydemo.example.ThreadStatDemo$Hunger.run(ThreadStatDemo.java:31)
     at java.lang.Thread.run(Thread.java:748)
    
  • 随即hunger-01调用.sleep()此时进入TIMED_WAITING状态

    "hunger-01" #11 prio=5 os_prio=31 tid=0x00007f7f2e09c800 nid=0xa803 waiting on condition [0x0000700011488000] java.lang.Thread.State: TIMED_WAITING (sleeping)

     at java.lang.Thread.sleep(Native Method)
     at net.check321.concurrencydemo.example.ThreadStatDemo.print(ThreadStatDemo.java:9)
     - locked <0x000000076adaf3d8> (a java.lang.Class for net.check321.concurrencydemo.example.ThreadStatDemo)
     at net.check321.concurrencydemo.example.ThreadStatDemo$Hunger.run(ThreadStatDemo.java:31)
     at java.lang.Thread.run(Thread.java:748)
    
  • 在主线程中.join()阻塞等待两个线程执行完毕,因此main进入WAITING状态

    "main" #1 prio=5 os_prio=31 tid=0x00007faa6c001800 nid=0x1003 in Object.wait() [0x000070000b8e2000] java.lang.Thread.State: WAITING (on object monitor)

     at java.lang.Object.wait(Native Method)
     - waiting on <0x000000076adb1d60> (a java.lang.Thread)
     at java.lang.Thread.join(Thread.java:1252)
     - locked <0x000000076adb1d60> (a java.lang.Thread)
     at java.lang.Thread.join(Thread.java:1326)
     at net.check321.concurrencydemo.example.ThreadStatDemo.main(ThreadStatDemo.java:23)
    

依据线程状态的通信

被动检测方式

并发编程要面临三个问题:

  • 线程分工
  • 线程同步
  • 线程互斥

其中线程同步即线程间通信协调分配执行时机以达到相互配合的目的,很容易想到一个方案:即线程间通过一个共享变量去协调。

public class DummyWaitNotifyDemo {

    private static volatile Boolean flag = false;

    public static void main(String[] args) throws InterruptedException {

        Thread first = new Thread(() -> {
            try {
                System.out.println("the first thread begin.");
                Thread.sleep(3000);
                flag = true;
                System.out.println("the first thread done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });


        Thread second = new Thread(() -> {
            try {
              while (!flag){
                  Thread.sleep(1000);
                  System.out.println("the second thread wait for a sec.");
              }
                System.out.println("the second thread done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        first.start();
        second.start();

        first.join();
        second.join();

        System.out.println("the main thread done.");
    }

}

例如上面这个demo中,其目的是secod线程需要等待first线程动作执行完成后去执行任务,所以在secod线程中循环检查共享标识位flag同时first线程在执行完成后更改共享标记以达到通知secod线程的目的。

输出结果:

the first thread begin.

the second thread wait for a sec.

the second thread wait for a sec.

the first thread done.

the second thread wait for a sec.

the second thread done.

the main thread done.

虽然达到了目的,但是这种方式的协调同步存在几个问题:

  • 相当于被动的检测同步标识位,被通知者无法很好的控制检测频率——在需要保证及时性的场景需要提高频率但是会浪费性能,反之则会牺牲同步的及时性,这又是一个前文中提起的平衡问题
  • 在demo的24行即频率控制处IDE已经给出警告提示,.sleep()会使线程进入TIMED_WAITING状态,虽然让出了CPU时间片但是并不会释放已持有的锁,所以在互斥情况下会阻塞锁的释放。

alt

等待/通知机制

比起被动检测标识位的方式,等待/通知机制利用了线程的状态变化以达到主动通知的效果。

Slf4j
public class WaitNotifyDemo {

    private static final Object lock = new Object();

    private static boolean flag = false;

    public static void main(String[] args) throws InterruptedException {

        Thread first = new Thread(() -> {
            synchronized (lock) {
                try {
                    log.info("the first thread begin.");
                    // do something.
                    Thread.sleep(3000);
                    flag = true;
                    lock.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("the first thread done.");
            }
        },"first");


        Thread second = new Thread(() -> {
            synchronized (lock) {
                // do something.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (!flag) {
                    try {
                        log.info("the second thread wait for a while.");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
                log.info("the second thread done.");
            }
        },"second");

        second.start();
        first.start();

        first.join();
        second.join();

        log.info("the main thread done.");
    }

}

执行结果:

16:15:03.750 [second] INFO net.check321.concurrencydemo.example.WaitNotifyDemo - the second thread wait for a while.

16:15:03.756 [first] INFO net.check321.concurrencydemo.example.WaitNotifyDemo - the first thread begin.

16:15:06.757 [first] INFO net.check321.concurrencydemo.example.WaitNotifyDemo - the first thread done.

16:15:06.757 [second] INFO net.check321.concurrencydemo.example.WaitNotifyDemo - the second thread done.

16:15:06.758 [main] INFO net.check321.concurrencydemo.example.WaitNotifyDemo - the main thread done.

把刚才的例子用等待/通知机制重写可以看到需要2个要素:

  1. 同一个锁对象,对应demo里的lock对象,该对象在各自线程里被synchronized修饰(这里同样可以替换成java.util.concurrent.locks.Lock实现,其区别在后文比较)。

  2. 执行条件,对应demo里的flag标识位(在java.util.concurrent.locks.Lock体系里叫java.util.concurrent.locks.Condition),当执行条件不满足时,需要使当前线程进入WAIT状态(.wait()await()),当前线程进入阻塞并释放锁资源,此时其它线程(first)将有机会竞争到锁资源从而执行相应逻辑后更改执行条件flag),最后通知(.notify() .signal())重新激活等待的线程。

在这个过程中值得注意的是:

  1. 这里的标识位去掉了volatile关键字的修饰,因为读写该对象的代码块已被synchronized锁互斥,前文提起的happens-before原则保证了该变量的线程安全。
  2. 之所以加入执行条件这一概念,是因为重新被激活唤醒的线程是需要重新进入互斥锁入口处重新竞争锁资源,而不是在.wait()的地方继续执行。
  3. second 线程 .run()逻辑里加入休眠一秒的逻辑是为了测试出锁竞争的效果,second 线程本身没有实际逻辑,所以在本机jdk8的环境下测试每次都被synchronized的自旋锁机制优化(从OS层面来讲变更线程状态是用户态转内核态的动作,该动作消耗系统资源所以JVM尽力优化避免),看不出该线程进入BLOCKED的效果。

JMC - FlightRecoder

除了jstack还有很多更成熟的方案去监控线程dump日志 JMC 全称 Java Misson Control,可以做到对JVM全方位数据监控,其中FlightRecoder功能顾名思义像飞机的黑匣子一样对执行过程进行记录分析,下面借助JMC的FlightRecoder文件分析以上demo的线程变化。

以IDEA为例,需要在demo启动命令加入参数:

-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=duration=1s,filename=WaitNotifyDemo_2020_08_27_142344.jfr

UnlockCommercialFeatures: 关闭商业检查 FlightRecorder 开启飞行记录 StartFlightRecording 生成飞行记录文件 duration dump间隔 filename 生成记录文件名

程序启动后去filename指定路径可查看日志文件,JMC基于事件机制,所以在Proiler-Events-Java Application可查看运行程序相关信息

alt

在Monitor Blocked选项卡里可见所有处于BLOCKED的线程记录,与预想的一样first由于竞争锁失败进入BLOCKED状态1s左右,从Prevoics Monitor Owner的值second可以看出当前锁对象被second线程占有1s,Monitor Address即lock对象的内存地址 0x7FC208804900

alt

secondWAIT后随即first获取锁资源执行3s休眠动作,故secondWAIT动作持续3s左右,后被first.notify()唤醒

main因为调用了两个线程的.join(),所以会WAIT阻塞等待至所有子线程执行完毕 1 + 3 一共用时4s左右

生产者消费者模型

在并发场景下有个常用的设计模型,即生产者消费者模型——通过一个公共的数据容器将数据生产者与消费者解耦。

在JavaEE领域常用的各种MQ中间件,通过消息队列来达到异步处理,流量控制,系统解耦等的目的。当今各种大型开源MQ项目如Kafka、RocketMQ、RabbitMQ等等都是基于这一模型加之对分布式大流量场景的增强支持,而单机队列本质是阻塞队列这一数据结构的思想。

阻塞队列

java.util.concurrent.BlockingQueue

JDK对阻塞队列的支持来源于该接口,看其实现主要分为XXXBlockingQueue阻塞队列,XXXBlockingDequeue阻塞双端队列,DelayQueue延迟队列

前缀XXX是实现的底层数据结构有所不同——Array数组或Linked链表 标准队列结构遵循FIFO先进先出原则,PriortyQueue则为优先队列,出队顺序由优先级决定。(例如Heap堆这种数据结构就是一种PriortyQueue,其堆化hepify的过程就是优先级排列的过程)

但是其实现本质都是利用了线程的等待/通知机制 alt

ArrayBlockingQueue

这里选择常用的ArrayBlockingQueue举例:

@Slf4j
public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {

        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put(i);
                    log.info("offer [{}] to queue.", i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        },"producer");

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i< 10; i++){
                    Integer ele = queue.take();
                    log.info("take [{}] from queue.", ele);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consumer");


        producer.start();
        Thread.sleep(3000);
        consumer.start();

        producer.join();
        consumer.join();

    }

}

上面demo模拟producerconsumer两个线程分别模拟生产者消费者,用一个长度为5的ArrayBlockingQueue充当结偶容器的角色。

producer先行启动循环往其内部添加10个元素,但是队列长度只有5,阻塞队列的特点是当队列元素数量满时就阻塞写入操作并且唤醒因队列为空而阻塞的消费者线程

consumer随后启动去队列中读取元素,取出元素后如果发现此时队列为空,消费者线程会阻塞自己的读取操作并且唤醒因队列已满而阻塞的生产者线程

简单来讲,在阻塞队列的设计模式下,生产者与消费者互为一对镜像操作,在达到特定条件后阻塞自己并唤醒对方。

alt

通过ArrayBlockingQueue等待/通知机制范例里的运用:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {


    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

        ... 
}
内部元素
  • final Object[] items,数组实现的元素容器
  • takeIndex , 维护出队操作的角标
  • putIndex ,维护入队操作的角标
  • count ,元素数量
  • final ReentrantLock lock; Lock版本的锁实现,对应范例的synchronized关键字
  • private final Condition notEmpty; private final Condition notFull; Lock版本的阻塞条件,对应范例的lock对象
入队操作
   /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
  • 出入队皆操作被 ReentrantLock互斥修饰
  • 根据阻塞队列的定义,当队列元素满时,当前线程应被阻塞 notFull.await()(这里用Condition notFull的await()方法是因为使用了Monitor模型,后文会详细介绍)
  • 相反的,如果队列元素不满,则把元素写入队列并通知激活 notEmpty.signal();消费者线程消费队列元素(这里用Condition notEmpty的signal()通知激活消费者也是Monitor模型)
出队操作
   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
  • 出队操作与入队操作互为镜像,通过notEmpty.await()notFull.signal()控制阻塞流程。
demo线程快照

回过头来看demo的执行结果,

15:00:50.691 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [0] to queue.
15:00:50.695 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [1] to queue.
15:00:50.695 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [2] to queue.
15:00:50.695 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [3] to queue.
15:00:50.695 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [4] to queue.
15:00:53.693 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [5] to queue.
15:00:53.693 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [0] from queue.
15:00:53.693 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [1] from queue.
15:00:53.693 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [6] to queue.
15:00:53.693 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [2] from queue.
15:00:53.693 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [7] to queue.
15:00:53.693 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [8] to queue.
15:00:53.693 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [3] from queue.
15:00:53.693 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [4] from queue.
15:00:53.694 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [5] from queue.
15:00:53.694 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [6] from queue.
15:00:53.694 [producer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - offer [9] to queue.
15:00:53.694 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [7] from queue.
15:00:53.694 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [8] from queue.
15:00:53.694 [consumer] INFO net.check321.concurrencydemo.example.BlockingQueueDemo - take [9] from queue.

对于大小为5的阻塞队列容器,当producer写入到元素4时,producer被阻塞并激活consumer开始消费,当consumer消费掉元素0时再次通知producer开始生产元素,如此反复,直到任务执行完毕。

总结

  • 介绍了线程的生命周期及其互相流转的方法,并从线程快照验证理论
  • 进而用线程状态实现线程间互相通信的方式,了解经典的等待/通知机制
  • 运用JFR帮助调试并发程序
  • 研究等待/通知机制的实例运用——Java中的容器阻塞队列

参考:

  1. 维基百科-线程
  2. Life Cycle of a Thread in Java
  3. 《Java并发编程的艺术》- 第四章 Java并发编程的基础
  4. 《深入理解Java虚拟机》(第3版)- 第五部分 高效并发
  5. Java Platform, Standard Edition Java Flight Recorder Runtime Guide

本文链接:https://check321.net/post/concurrency_in_java_02.html

-- EOF --

Comments

请在后台配置评论类型和相关的值。