03月27, 2017

多线程同步容器

本文总结了并发编程下常见的同步容器与带来的典型问题。

J.U.Cjava.util.concurrent包下对多线程并发场景支持的实现类。

ArrayList [Unsafe]

日常常用的java.util.ArrayList是一个线程不安全的容器,测试:

// unsafe container
      public static List<Integer> container = new ArrayList<>();

10000次执行100并发(每次执行容器元素+1):

   public static void main(String[] args) throws InterruptedException {
        // thread pool
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(threadCount);
        CountDownLatch countDownLatch = new CountDownLatch(exeCount);

        for (int i = 0; i < exeCount; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    counting();
                    semaphore.release();
                } catch (InterruptedException e) {
                    log.error("semaphore error: ", e);
                }
                countDownLatch.countDown();
            });

        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("{} container size: {}", container.getClass().getName(), container.size());
    }

结果与预期不符:

10:40:28.642 [main] INFO net.check321.concurrency.clazz.container.concurrency.ArrayListTest - java.util.ArrayList container size: 9923

这是因为ArrayList为了性能优势内部没有任何同步或并发处理,所以出现其同步版本容器java.util.Vector

Vector [Safe]

从其构造看来与ArrayList一致:

public class Vector<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, Serializable {
   ……
}

只是同步了内部方法:

  • 新增元素
    public synchronized boolean add(E var1) {
      ++this.modCount;
      this.ensureCapacityHelper(this.elementCount + 1);
      this.elementData[this.elementCount++] = var1;
      return true;
    }
    
  • 删除元素
   public synchronized E remove(int var1) {
        ++this.modCount;
        if (var1 >= this.elementCount) {
            throw new ArrayIndexOutOfBoundsException(var1);
        } else {
            Object var2 = this.elementData(var1);
            int var3 = this.elementCount - var1 - 1;
            if (var3 > 0) {
                System.arraycopy(this.elementData, var1 + 1, this.elementData, var1, var3);
            }

            this.elementData[--this.elementCount] = null;
            return var2;
        }
    }
  • 元素个数
 public synchronized int size() {
        return this.elementCount;
    }

ConcurrentModificationException

在多线程同时操作容器时,会抛出异常: java.util.ConcurrentModificationException

public static void main(String[] args) {
        init();

        // 遍历元素
        new Thread(() -> {
            for (Integer item : container) {
                log.info("item: {}",item);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("InterruptedException :", e);
                }
            }
        }).start();

        // 删除一个元素
        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error("InterruptedException :", e);
            }
            container.remove(2);
        }).start();
    }

在List容器父类java.util.AbstractList中有一个公共变量protected transient int modCount = 0;用来记录容器被操作的次数。在新增或删除元素等方法实现中会+1操作次数:

  • 这里以java.util.Vectoradd()方法为例
    public synchronized boolean add(E var1) {
     ++this.modCount;
     this.ensureCapacityHelper(this.elementCount + 1);
     this.elementData[this.elementCount++] = var1;
     return true;
    }
    
    而在容器迭代器java.util.ListIterator实现方法里会调用this.checkForComodification();检查该变量是否等于迭代器初始化时的数值。
  • 初始化expectedModCount:
  private Itr() {
            this.lastRet = -1;
            this.expectedModCount = Vector.this.modCount;
        }
  • checkForComodification()检查变量值是否与逾期一致,否则就抛出java.util.ConcurrentModificationException异常:
   final void checkForComodification() {
        if (Vector.this.modCount != this.expectedModCount) {
            throw new ConcurrentModificationException();
        }
    }

fast-fail机制

上述情况中,在遍历容器元素时因发现容器结构被其他线程修改而出现的快速反应机制。可以大限度的保证容器元素的线程安全。

CopyOnWriteArrayList

从容器名可看出该ArrayList容器在写操作时会生成容器Copy再进行处理,这种读写分离的设计避免了线程不安全带来的元素处理错误。

  • 构造方法

    /**
    * Creates an empty list.
    */
    public CopyOnWriteArrayList() {
      setArray(new Object[0]);
    }
    
  • 添加元素

  /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

CopyOnWriteArrayList[Safe]存在于J.U.C包下,支持线程并发的安全操作。在写操作时(如add):

  1. 使用重入锁ReentrantLock加锁。ReentrantLock lock = this.lock;
  2. 新建数组Object[] newElements = Arrays.copyOf(elements, len + 1);
  3. Copy数组元素至新数组Object[] newElements = Arrays.copyOf(elements, len + 1);
  4. 将新元素加入数组尾部newElements[len] = e;
  5. 将容器指针指向新数组对象setArray(newElements);
  6. 解锁lock.unlock();

因为该容器线程安全所以在多线程并发修改下不会抛出java.util.ConcurrentModificationException异常。

缺点:虽然线程安全但是因为每次写操作时需要复制数组元素,所以在写多读少的场景下该容器执行效率不高。

本文链接:https://check321.net/post/concurrency-containers.html

-- EOF --