java并发编程——并发容器和并发工具介绍

java.util.concurrent包下面为我们提供了丰富的类和接口供我们开发出支持高并发、线程安全的程序。下面将从三个方面对这些基础构建类做以介绍和总结。

同步容器类,介绍Vector,HashTable和Collections.SynchronizedXXX();

并发容器类,介绍ConcurrentHashMap,CopyOnWrite容器以及阻塞队列。

并发工具类,介绍CountLatch,FututeTask,Semaphore和CyclicBarrier。

第一部分:同步容器类

同步容器类包括:Vector和HashTable,这两个类是早期JDK的一部分。此外还包括了JDK1.2之后提供的同步封装器方法Collections.synchronizedXxx()。

#同步容器类的问题

我们都知道Vector是线程安全的容器,但当我们对Vector经行复合操作时往往会得到意想之外的结果。比如迭代操作:

for(int i=0;i<vector.size();i++){
    doSomething(vector.get(i));
}

例如两个线程A和B,A线程对容器进行迭代操作的时候B线程可能对容器进行了删除操作,这样就会导致A线程的迭代操作抛出IndexOutOfBoundsException。而这显然不是我们想得到的结果,所以,为了解决不可靠迭代的问题,我们只能牺牲伸缩性为容器加锁,代码如下:

​synchronized(vector){
    for(int i=0;i<vector.size();i++){
    doSomething(vector.get(i));
    }
}


加锁防止了迭代期间对容器的修改,但这同时也降低了容器的并发性。当容器很大时,其他线程要一直等待迭代结束,因此性能不高。

#迭代器与CoucurrentModificationException

上面的例子我们举了Vector这种“古老”的容器。但是实际上很多“现代”的容器类也并没有避免上面所提到的问题。比如:for-each循环和Iterator。当我们调用Iterator类的hasNext和next方法对容器进行迭代时,若其他线程并发修改该容器,那么此时容器表现出的行为是“及时失败”的,即抛出CoucurrentModificationException。因此,为了避免以上问题,我们又不得不为容器加锁,或者对容器进行“克隆”,在副本上进行操作,但这样做的性能显然是不高的。

#影藏的迭代器

这里要特别提到的是有些类的方法会隐藏的调用容器的迭代器,这往往是很容易被我们忽视的。看下面列子:

private final Set<Integer> set = new HashSet<Integer>();

public synchronized void add(Integer i){set.add(i);}

public void addMore(){
    for(int i=0;i<10;i++)
    add(i);
    System.out.println("this is set:"+set);
}

以上代码能看出对容器的迭代操作吗?乍看没有。但实际上最后一段“this is set:”+set的代码影藏的调用了set的toString方法,而toString方法又会影藏的调用容器的迭代方法,这样该类的addMore方法同样可能会抛出CoucurrentModificationException异常。更严重的问题是该类并不是线程安全的,我们可以使用SynchronizedSet来包装set类,对同步代码进行封装,这样就避免了问题。此外我们还需要特别注意:容器的equals、hashCode方法都会隐式的进行迭代操作,当一个容器作为另一个容器的健值或者元素时就会出现这种情况。同样,containsAll,removeAll等方法以及把容器当做参数的构造函数,都会进行迭代操作。所有这些间接迭代的操作都可能导致程序抛出CoucurrentModificationException。

第二部分:并发容器类

java.util.concurrent包下面为我们提供了丰富的高并发容器类。通过并发容器来替换同步容器,可以极大地提高伸缩性和降低风险。这些容器按照不同的用途分为以下几类:

#ConcurrentHashMap

Java 5.0增加的ConcurrentHashMap几乎是最常用的并发容器了。与·HashTable相比,ConcurrentHashMap不仅线程安全,而且还支持高并发、高吞吐。ConcurrentHashMap的底层实现使用了分段锁技术,而不是独占锁,它允许多个线程可以同时访问和修改容器,而不会抛出CoucurrentModificationException异常,极大地提高了效率。在这里要说明的是ConcurrentHashMap返回的迭代器是弱一致性的,而不是及时失败的。另外size、isEmpty等需要在整个容器上执行的方法其返回值也是弱一致性的,是近似值而非准确值。所以,在实际使用中要对此做权衡。与同步容器和加锁机制相比,ConcurrentHashMap优势明显,是我们优先考虑的容器。

#CopyOnWriteArrayList

CopyOnWrite容器用于替代同步的List,它提供了更好的并发性,并且在使用时不需要加锁或者拷贝容器。CopyOnWriteArrayList的主要应用就是迭代容器操作多而修改少的场景,迭代时也不会抛出CoucurrentModificationException异常。CopyOnWrite容器的底层实现是在迭代操作前都会复制一个底层数组,这保证了多线程下的并发性和一致性,但是当底层数组的数据量比较大的时候,就需要效率问题了。

#阻塞队列和生产者—消费者模型

Java 5.0之后新增加了Queue(队列)和BlockingQueue(阻塞队列)。Queue的底层实现其实就是一个LinkedList。队列是典型的FIFO先进先出的实现。阻塞队列提供了很多现成的方法可以满足我们实现生产者—消费者模型。

生产者—消费者模型简单理解就是一个缓冲容器,协调生产者和消费者之间的关系。生产者生产数据扔到容器里,消费者直接从容器里消费数据,大家不需要关心彼此,只需要和容器打交道,这样就实现了生产者和消费者的解耦。

队列分为有界队列和无界队列,无界队列会因为数据的累计造成内存溢出,使用时要小心。阻塞队列有很多种实现,最常用的有ArrayBlockingQueue和LinkedBlockingQueue。阻塞队列提供了阻塞的take和put方法,如果队列已满,那么put方法将等待队列有空间时在执行插入操作;如果队列为空,那么take方法将一直阻塞直到有元素可取。有界队列是一个强大的资源管理器,它能抑制产生过多的工作项,使程序更加健壮。

#其他并发容器类

除了以上较常用的并发容器外,Java还为我们提供了一些个性化的容器类以满足我们的需求。BlockingDeque,PriorityBlockingQueue,DelayQueue和SynchronousQueue。

Deque是一种双端队列,它支持在两端进行插入和删除元素,Deque继承自Queue。BlockingDeque就是支持阻塞的双端队列,常用的实现有LinkedBlockingDeque。双端队列最典型的应用是工作密取,每个消费者都有各自的双端队列,它适用于既是生产者又是消费者的场景。

PriorityBlockingQueue是一个可以按照优先级排序的阻塞队列,当你希望按照某种顺序来排序时非常有用。

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。

SynchronousQueue实际上不能算作一个队列,他不提供元素的存储功能,它只是维护一组线程,这些线程只是 等待将元素加入或者移除队列。SynchronousQueue相当于扮演一个直接交付的角色,因此它的put和get方法是一直阻塞的,有更少的延迟。

第三部分:并发工具类

如果遇到以上并发容器类无法解决的问题,大多数情况下我们可以使用并发工具类来解决问题。所有的同步工具类都包含共同的属性:“他们封装了一些状态,这些状态将决定执行并发工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另外一些用于高效的等待并发工具类进入到预期状态。”并发工具类主要包括以下几类:

#CountDownLatch

中文翻译为“闭锁”,Java API上是这样描述的:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。我们可以这样理解:CountDownLatch就相当于一扇门,当N个人都到齐之后才可以打开这扇门。门上面有一个计数器,用于记录打开门还需要的人数,比如需要五人,初始化计数器就显示五人,来了两人以后,计数器就变成了三,当五个人都到齐之后计数器变为零,此时门就打开了,所有人都可以进入了。看以下示例,创建一定数量的线程,用它们并发的执行某个指定任务。

public class CountDownLatchTask {
    public long timeTask(int nThreads,final Runnable task) 
            throws InterruptedException{
        final CountDownLatch startTask = new CountDownLatch(1);
        final CountDownLatch endTask = new CountDownLatch(nThreads);

        for(int i=0;i<nThreads;i++){
            Thread t = new Thread(){
                public void run(){
                    try{
                        startTask.await();
                        try{
                            task.run();
                        }finally{
                            endTask.countDown();
                        }
                    }catch(InterruptedException e){

                    }
                }
            };
            t.start();
        }

        long start = System.currentTimeMillis();
        startTask.countDown();
        endTask.await();
        long end = System.currentTimeMillis();
        return end-start;
    }
}

#FutureTask

FutureTask表示一个可以取消的异步计算。FutureTask表示的计算是通过Callable实现的,Callable与Runnable的区别就是Callable可以返回值或者抛出异常。所以我们可以用它来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动,从而减少等待的时间。

#Semaphore

信号量用来控制同时访问某个资源的数量,或者同时执行某个操作的数量。所以信号量的典型应用就是用来实现某种资源池,或者对容器施加边界。看下面代码实例,为一个Set设置边界:

public class SemaphoreSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public SemaphoreSet(int bounds){
        set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bounds);
    }

    public boolean add(T o) throws InterruptedException{
        sem.acquire();
        boolean isSuccess = false;
        try{
            isSuccess = set.add(o);
            return isSuccess;
        }finally{
            if(!isSuccess)
                sem.release();
        }
    }

    public boolean remove(T o) throws InterruptedException{
        sem.acquire();
        boolean isSuccess = false;
        try{
            isSuccess = set.remove(o);
            return isSuccess;
        }finally{
            if(!isSuccess)
                sem.release();
        }
    }
}

#CyclicBarrier

栅栏类似于闭锁。它们的区别是:闭锁是一次性对象,一旦达到终止状态就不能被重置,而栅栏类似于水坝,当达到某个水位线以后开闸放水,放水完毕后又可以被重置。闭锁与栅栏的另一个区别体现在:所有线程必须都达到栅栏位置,才能继续执行。闭锁等待事件,而栅栏等待线程。

综上所述,通过对同步容器、并发容器和并发工具的介绍,大家大致了解了每种容器的使用场景以及各自的局限。并发容器和并发工具的出现极大地提高了程序的吞吐量和并发性,是大型网站开发过程中的必不可少的工具。