Java并发编程学习五:并发容器

一、HashMap为什么线程不安全?

HashMap是Map的主要实现类之一,但是它并不具备线程安全的特点。

以HashMap的put方法来看

public V put(K key, V value) {

    if (key == null)
        return putForNullKey(value);
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {

        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {

            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    } 

    //modCount++ 是一个复合操作
    modCount++;

    addEntry(hash, key, value, i);
    return null;
}

且不去具体分析put方法的内部逻辑,单看代码中的modCount++操作,前面在对线程安全的学习部分,就提到过这个操作并不是原子操作,它的实际执行步骤有三步,每一步操作都有可能被打断。

除了非原子操作,HashMap还有什么原因导致它是线程不安全的呢?

1. 扩容时获取值可能为null

HashMap 本身默认的容量不是很大,如果不停地往 map 中添加新的数据,它便会在合适的时机进行扩容。而在扩容期间,它会新建一个新的空数组,并且用旧的项填充到这个新的数组中去。那么,在这个填充的过程中,如果有线程获取值,很可能会取到 null 值,而不是原来添加的值

public class HashMapNotSafe {

    public static void main(String[] args) {

        final Map<Integer, String> map = new HashMap<>();

        final Integer targetKey = 0b1111_1111_1111_1111; // 65 535
        final String targetValue = "v";
        map.put(targetKey, targetValue);

        new Thread(() -> {

            IntStream.range(0, targetKey).forEach(key -> map.put(key, "someValue"));
        }).start();

        while (true) {

            if (null == map.get(targetKey)) {

                throw new RuntimeException("HashMap is not thread safe.");
            }
        }
    }
}

新建一个HashMap,同时新建一个线程不断向HashMap添加元素,其中key从 0 到 65535 (左闭右开)逐渐递增,value一直是"someValue"。将线程启动,并进入while循环。循环判断的条件是,map中取出的targetKey 对应的value不为null。

按理来说,放入HashMap的key对应的value应该都是"someValue"。但实际的运行结果如下:

Exception in thread "main" java.lang.RuntimeException: HashMap is not thread safe.
at lesson29.HashMapNotSafe.main(HashMapNotSafe.java:25)

说明在HashMap扩容期间,取出的key对应的value为null。

2. 同时put时碰撞导致数据丢失

当有多个线程同时使用 put 来添加元素,而且恰好两个 put 的 key 是一样的,它们发生了碰撞,也就是根据 hash 值计算出来的 bucket 位置一样,并且两个线程又同时判断该位置是空的,可以写入,所以这两个线程的两个不同的 value 便会添加到数组的同一个位置,这样最终就只会保留一个数据,丢失一个数据。

3. 无法保证可见性

如果线程 1 给某个 key 放入了一个新值,那么线程 2 在获取对应的 key 的值的时候,它的可见性是无法保证的,也就是说线程 2 可能可以看到这一次的更改,但也有可能看不到。

4. 死循环导致CPU 100%

这种情况发生的场景是在扩容的时候。内部新建新的 HashMap 的时候,扩容的逻辑会反转散列桶中的节点顺序,当有多个线程同时进行扩容的时候,由于 HashMap 并非线程安全的,所以如果两个线程同时反转的话,便可能形成一个循环,并且这种循环是链表的循环,相当于 A 节点指向 B 节点,B 节点又指回到 A 节点,这样一来,在下一次想要获取该 key 所对应的 value 的时候,便会在遍历链表的时候发生永远无法遍历结束的情况,也就发生 CPU 100% 的情况。

二、ConcurrentHashMap

由于HashMap是线程不安全的,后续又有了ConcurrentHashMap,可以保证线程安全。

ConcurrentHashMap在Java 7和Java 8中的设计思路有很大的区别。

1. Java 7中的ConcurrentHashMap

在ConcurrentHashMap 内部进行了 Segment 分段,Segment 继承了 ReentrantLock,可以理解为一把锁,各个 Segment 之间都是相互独立上锁的,互不影响。相比于 Hashtable (后面介绍)每次操作都需要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。

每个Segment 的底层数据结构与 HashMap 类似,仍然是数组和链表组成的拉链法结构。默认有 0~15 共 16 个 Segment,所以最多可以同时支持 16 个线程并发操作(操作分别分布在不同的 Segment 上)。16 这个默认值可以在初始化的时候设置为其他值,但是一旦确认初始化以后,是不可以扩容的。

file

2. Java 8中的ConcurrentHashMap

Java 8在Java 7的基础上,引入了红黑树。当链表中的长度大于某个阈值,同时满足一定的容量要求的时候,会将链表转化为红黑树。

file

那么为什么要引入红黑树呢?

红黑树是每个节点都带有颜色属性的二叉查找树,颜色为红色或黑色,红黑树的本质是对二叉查找树 BST 的一种平衡策略,可以理解为是一种平衡二叉查找树,查找效率高,会自动平衡,防止极端不平衡从而影响查找效率的情况发生。

由于自平衡的特性,红黑树的查找性能近似于二分查找,时间复杂度为O(log(n))。而对于链表,在最坏的情况下,时间复杂度为 O(n)。在节点越来越多的情况下,红黑树的优势更加明显。

红黑树的一些其它特性包括:

  • 每个节点要么是红色,要么是黑色,但根节点永远是黑色的。
  • 红色节点不能连续,也就是说,红色节点的子和父都不能是红色的。
  • 从任一节点到其每个叶子节点的路径都包含相同数量的黑色节点。
3. Java 8中的源码

首先是基本的内部存储结构Node

static class Node<K,V> implements Map.Entry<K,V> {

    final int hash;
    final K key;
    volatile V val;
    // 保证可见性,内部还有一个指向下一个节点的next指针
    volatile Node<K,V> next;
    // ...
}

put()方法用于存储数据

final V putVal(K key, V value, boolean onlyIfAbsent) {

    if (key == null || value == null) {

        throw new NullPointerException();
    }
    //计算 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K, V>[] tab = table; ; ) {

        Node<K, V> f;
        int n, i, fh;
        //如果数组是空的,就进行初始化
        if (tab == null || (n = tab.length) == 0) {

            tab = initTable();
        }
        // 找该 hash 值对应的数组下标
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

            //如果该位置是空的,就用 CAS 的方式放入新值
            if (casTabAt(tab, i, null,
                    new Node<K, V>(hash, key, value, null))) {

                break;
            }
        }
        //hash值等于 MOVED 代表在扩容
        else if ((fh = f.hash) == MOVED) {

            tab = helpTransfer(tab, f);
        }
        //槽点上是有值的情况
        else {

            V oldVal = null;
            //用 synchronized 锁住当前槽点,保证并发安全
            synchronized (f) {

                if (tabAt(tab, i) == f) {

                    //如果是链表的形式
                    if (fh >= 0) {

                        binCount = 1;
                        //遍历链表
                        for (Node<K, V> e = f; ; ++binCount) {

                            K ek;
                            //如果发现该 key 已存在,就判断是否需要进行覆盖,然后返回
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {

                                oldVal = e.val;
                                if (!onlyIfAbsent) {

                                    e.val = value;
                                }
                                break;
                            }
                            Node<K, V> pred = e;
                            //到了链表的尾部也没有发现该 key,说明之前不存在,就把新值添加到链表的最后
                            if ((e = e.next) == null) {

                                pred.next = new Node<K, V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    //如果是红黑树的形式
                    else if (f instanceof TreeBin) {

                        Node<K, V> p;
                        binCount = 2;
                        //调用 putTreeVal 方法往红黑树里增加数据
                        if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                value)) != null) {

                            oldVal = p.val;
                            if (!onlyIfAbsent) {

                                p.val = value;
                            }
                        }
                    }
                }
            }
            if (binCount != 0) {

                //检查是否满足条件并把链表转换为红黑树的形式,默认的 TREEIFY_THRESHOLD 阈值是 8
                if (binCount >= TREEIFY_THRESHOLD) {

                    treeifyBin(tab, i);
                }
                //putVal 的返回是添加前的旧值,所以返回 oldVal
                if (oldVal != null) {

                    return oldVal;
                }
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

get()方法用于获取数据

public V get(Object key) {

    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算 hash 值
    int h = spread(key.hashCode());
    //如果整个数组是空的,或者当前槽点的数据是空的,说明 key 对应的 value 不存在,直接返回 null
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {

        //判断头结点是否就是我们需要的节点,如果是则直接返回
        if ((eh = e.hash) == h) {

            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果头结点 hash 值小于 0,说明是红黑树或者正在扩容,就用对应的 find 方法来查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //遍历链表来查找
        while ((e = e.next) != null) {

            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

3. Java 7和Java 8的对比
  • 并发度

  • Java 7 中,每个 Segment 独立加锁,最大并发个数就是 Segment 的个数,默认是 16

  • Java 8 中,锁粒度更细,理想情况下 table 数组元素的个数(也就是数组长度)就是其支持并发的最大个数

  • 并发安全的原理

  • Java 7 采用 Segment 分段锁来保证安全,而 Segment 是继承自 ReentrantLock

  • Java 8 中放弃了 Segment 的设计,采用 Node + CAS + synchronized 保证线程安全

  • Hash 碰撞解决方案

  • Java 7 在 Hash 冲突时,会使用拉链法,也就是链表的形式

  • Java 8 先使用拉链法,在链表长度超过一定阈值时,将链表转换为红黑树,来提高查找效率

  • 查询时间复杂度

  • Java 7 遍历链表的时间复杂度是 O(n),n 为链表长度。

  • Java 8 如果变成遍历红黑树,那么时间复杂度降低为 O(log(n)),n 为树的节点个数

4. 为什么是8个才转为红黑树?

ConcurrentHashMap中,当链表长度大于或等于阈值(默认为 8)的时候,如果同时还满足容量大于或等于 MIN_TREEIFY_CAPACITY(默认为 64)的要求,就会把链表转换为红黑树。同样,后续如果由于删除或者其他原因调整了大小,当红黑树的节点小于或等于 6 个以后,又会恢复为链表形态。

我们知道,引入红黑树是为了提高查询速度,那么为什么不一开始就用红黑树?又为什么只有在链表长度大于8时才转为红黑树?

首先,单个 TreeNode 需要占用的空间大约是普通 Node 的两倍,所以只有当包含足够多的 Nodes 时才会转成 TreeNodes,而是否足够多就是由 TREEIFY_THRESHOLD 的值决定的。而当桶中节点数由于移除或者 resize 变少后,又会变回普通的链表的形式,以便节省空间。

其次,理想情况下,如果 hashCode 分布良好,链表长度符合泊松分布,各个长度的命中概率依次递减,当长度为 8 的时候,概率仅为 0.00000006。这是一个小于千万分之一的概率,通常Map 里面是不会存储这么多的数据的,所以通常情况下,并不会发生从链表向红黑树的转换。

综上所述,红黑树虽然查询速度快,但是由于单个节点的空间占用太大,因此不能一开始就用红黑树。而且理想情况下,链表长度超过8的很少,这时用链表的速度不会太慢,用红黑树没有必要。

但是,理想归理想,JDK是不能阻止用户自定义hash算法,将hash算法变得不均匀

@Override
public int hashCode() {

    return 1;
}

此时使用HashMap就会出现链表长度很长

public class HashMapDemo {

    public static void main(String[] args) {

        HashMap map = new HashMap<HashMapDemo,Integer>(1);
        for (int i = 0; i < 1000; i++) {

            HashMapDemo hashMapDemo1 = new HashMapDemo();
            map.put(hashMapDemo1, null);
        }
        System.out.println("运行结束");
    }

    @Override
    public int hashCode() {

        return 1;
    }
}

所以,链表长度超过 8 就转为红黑树的设计,更多的是为了防止用户自己实现了不好的哈希算法时导致链表过长,从而导致查询效率低,而此时转为红黑树更多的是一种保底策略,用来保证极端情况下查询的效率。

5. ConcurrentHashMap vs Hashtable

Hashtable 在 JDK1.0 的时候就存在了,并在 JDK1.2 版本中实现了 Map 接口,成为了集合框架的一员,它也是线程安全的类。

Hashtable 和ConcurrentHashMap 的区别如下:

a. 实现线程安全的方式不同
Hashtable 之所以是线程安全的,是因为几乎每个方法都被 synchronized 关键字所修饰了,这也就保证了线程安全。同样原理的还有Collections.SynchronizedMap(new HashMap())。

ConcurrentHashMap 在Java 8中,实现线程安全的原理是利用了 CAS + synchronized + Node 节点的方式。

b. 迭代时修改的不同

Hashtable(包括 HashMap)不允许在迭代期间修改内容,否则会抛ConcurrentModificationException 异常。源码如下:

public T next() {

    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
    return nextElement();
}

next() 方法中,会首先判断 modCount 是否等于 expectedModCount。其中 expectedModCount 是在迭代器生成的时候随之生成的,并且不会改变。它所代表的含义是当前 Hashtable 被修改的次数,而每一次去调用 Hashtable 的包括 addEntry()、remove()、rehash() 等方法中,都会修改 modCount 的值。迭代的过程中,去对整个 Hashtable 的内容做了修改的话,也就同样会反映到 modCount 中。迭代器在进行 next 的时候,也可以感知到,于是就会发现 modCount 不等于 expectedModCount,就会抛出 ConcurrentModificationException 异常。

对于ConcurrentHashMap,即便在迭代期间修改内容,也不会抛出ConcurrentModificationException。

三、CopyOnWriteArrayList

CopyOnWriteArrayList是一个线程安全的类,在它之间已经有了 ArrayList 和 LinkedList 作为 List 的数组和链表的实现,而且也有了线程安全的 Vector 和 Collections.synchronizedList() 可以使用。

对于Vector 和 Collections.synchronizedList() ,它们实现线程安全的原理都是使用 synchronized,锁的粒度比较大,都是方法级别的锁,在并发量高的时候,很容易发生竞争,并发效率相对比较低

从JDK1.5 开始,Java 并发包里提供了使用 CopyOnWrite 机制实现的并发容器 CopyOnWriteArrayList 作为主要的并发 List,CopyOnWrite 的并发集合还包括 CopyOnWriteArraySet,其底层也是利用 CopyOnWriteArrayList 实现的

1. CopyOnWriteArrayList的读写规则

我们知道读写锁的思想是:读读共享、其他都互斥(写写互斥、读写互斥、写读互斥)。而CopyOnWriteArrayList的思想更激进,CopyOnWriteArrayList 读取是完全不用加锁的,更厉害的是,写入也不会阻塞读取操作,也就是说你可以在写入的同时进行读取,只有写入和写入之间需要进行同步,也就是不允许多个写入同时发生,但是在写入发生时允许读取同时发生。

2. CopyOnWriteArrayList的线程安全原理

对于CopyOnWriteArrayList,当容器需要被修改的时候,不直接修改当前容器,而是先将当前容器进行 Copy,复制出一个新的容器,然后修改新的容器,完成修改之后,再将原容器的引用指向新的容器。

基于这样的修改过程,读和写使用不同的容器。每次写是都是创建新的副本,而每次读的时候都是读的旧容器,而旧容器是不可变的,也是线程安全的,无需进一步的同步操作。

正因为如此,所有 CopyOnWriteArrayList才能做到在写入发生时允许读取同时发生。ArrayList 是无法做到的,其原理也是和Hashtable一样,会抛出ConcurrentModificationException 异常。

* 描述: 演示CopyOnWriteArrayList迭代期间可以修改集合的内容
*/
public class CopyOnWriteArrayListDemo {

    public static void main(String[] args) {

        CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{

     1, 2, 3});

        System.out.println(list); //[1, 2, 3]

        //Get iterator 1
        Iterator<Integer> itr1 = list.iterator();

        //Add one element and verify list is updated
        list.add(4);

        System.out.println(list); //[1, 2, 3, 4]

        //Get iterator 2
        Iterator<Integer> itr2 = list.iterator();

        System.out.println("====Verify Iterator 1 content====");

        itr1.forEachRemaining(System.out::println); //1,2,3

        System.out.println("====Verify Iterator 2 content====");

        itr2.forEachRemaining(System.out::println); //1,2,3,4

    }

}

这两个迭代器打印出来的内容是不一样的。第一个迭代器打印出来的是 [1, 2, 3],而第二个打印出来的是 [1, 2, 3, 4]。虽然它们的打印时机都发生在第四个元素被添加之后,但它们的创建时机是不同的。由于迭代器 1 被创建时的 List 里面只有三个元素,后续无论 List 有什么修改,对它来说都是无感知的。

3. CopyOnWriteArrayList的缺点
  • 内存占用:因为 CopyOnWrite 的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,这一点会占用额外的内存空间。
  • 复制开销大:复制过程不仅会占用双倍内存,还需要消耗 CPU 等资源,会降低整体性能。
  • 数据一致性问题:由于 CopyOnWrite 容器的修改是先修改副本,所以这次修改对于其他线程来说,并不是实时能看到的,只有在修改完之后才能体现出来。
4. 适用场景
  • 读操作可以尽可能的快,而写即使慢一些也没关系
  • 读多写少
5. 源码分析

a. 内部数据结构

/** 可重入锁对象 */
final transient ReentrantLock lock = new ReentrantLock();

/** CopyOnWriteArrayList底层由数组实现,volatile修饰,保证数组的可见性 */
private transient volatile Object[] array;

/**
* 得到数组
*/
final Object[] getArray() {

    return array;
}

/**
* 设置数组
*/
final void setArray(Object[] a) {

    array = a;
}

/**
* 初始化CopyOnWriteArrayList相当于初始化数组
*/
public CopyOnWriteArrayList() {

    setArray(new Object[0]);
}

ReentrantLock 锁用来保证修改操作的线程安全。Object[] 数组是被 volatile 修饰的,可以保证数组的可见性,这正是存储元素的数组。CopyOnWriteArrayList 的底层是利用数组实现的。

b. add方法

public boolean add(E e) {

    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {

        // 得到原数组的长度和元素
        Object[] elements = getArray();
        int len = elements.length;

        // 复制出一个新数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);

        // 添加时,将新元素添加到新数组中
        newElements[len] = e;

        // 将volatile Object[] array 的指向替换成新数组
        setArray(newElements);
        return true;
    } finally {

        lock.unlock();
    }
}

首先需要利用 ReentrantLock 的 lock 方法进行加锁,获取锁之后,得到原数组的长度和元素,也就是利用 getArray 方法得到 elements 并且保存 length。之后利用 Arrays.copyOf 方法复制出一个新的数组,得到一个和原数组内容相同的新数组,并且把新元素添加到新数组中。完成添加动作后,需要转换引用所指向的对象,利用 setArray(newElements) 操作就可以把 volatile Object[] array 的指向替换成新数组,最后在 finally 中把锁解除。

写操作是在原来容器的拷贝上进行的,并且在读取数据的时候不会锁住 list。

c. 迭代器 COWIterator 类
迭代器有两个重要的属性,分别是 Object[] snapshot 和 int cursor。其中 snapshot 代表数组的快照,也就是创建迭代器那个时刻的数组情况,而 cursor 则是迭代器的游标。迭代器的构造方法如下:

private COWIterator(Object[] elements, int initialCursor) {

    cursor = initialCursor;
    snapshot = elements;
}

迭代器在被构建的时候,会把当时的 elements 赋值给 snapshot,而之后的迭代器所有的操作都基于 snapshot 数组进行。

public E next() {

    if (! hasNext())
        throw new NoSuchElementException();
    return (E) snapshot[cursor++];
}

在next 方法中可以看到,返回的内容是 snapshot 对象,所以,后续就算原数组被修改,这个 snapshot 既不会感知到,也不会受影响,执行迭代操作不需要加锁,也不会因此抛出异常。迭代器返回的结果,和创建迭代器的时候的内容一致。