基础构建模块 一、同步容器
同步容器类都是线程安全的,比如 Vector 和 HashTable
实现线程安全的方式是:将它们的状态都封装起来,所有对外接口都是同步方法
对容器状态的访问都串行化,保证每次只有一个线程访问容器
能保证单个操作是线程安全的,但是复合操作是不能保证线程安全的
常见的不安全的复合操作包括:迭代、跳转、条件运算
比如,条件运算的例子:
1 2 3 4 5 6 public class SyncContainer { public <T> T deleteLast (Vector<T> list) { int lastIndex = list.size() - 1 ; return list.remove(lastIndex); } }
这种复合操作,实际上是线程不安全的。
一般来说,同步容器的复合操作都需要加锁处理:
1 2 3 4 5 6 7 8 public class SyncContainer { public <T> T deleteLast (Vector<T> list) { synchronized (list) { int lastIndex = list.size() - 1 ; return list.remove(lastIndex); } } }
同步容器类的锁就是对象本身,所以是对同步容器对象加锁。
不过,对于迭代器的处理,加锁并不都是一种好方法:
容器元素很多时,对迭代操作加锁,会导致容器被长时间阻塞,影响程序的性能
比如:
1 2 3 4 5 6 7 8 9 public class SyncContainerIterator { public void visit (Vector<T> list) { synchronized (list) { for (T t : list) { } } } }
迭代时加锁,可能会使得容器锁无法释放,导致容器不可用。
所以,对于迭代器的处理,还可以用另一种方式处理:克隆容器。
1 2 3 4 5 6 7 8 public class SyncCloneContainerIterator { public void visit (Vector<T> list) { Vector<T> clone = (Vector<T>) list.clone(); for (T t : clone) { } } }
不过,克隆容器时,也存在明显的性能开销,但不会对原容器造成影响。
容器的迭代器使用,除了比较明显的调用以外,一些隐藏方法也可能会用到:
hashCode、equals、toString
containsAll、removeAll、retainAll
这些方法里面,实际上也隐含了对迭代器的使用,所以,这些方法也是不安全的。
二、并发容器
并发容器类都是线程安全的
实现线程安全的方式是:将它们的状态都封装起来,内部对状态的访问是加锁同步的
并发容器是针对多个线程并发访问设计的,允许多个线程同时访问并发容器
主要用于替代同步的容器,提高并发访问的性能
并发容器的锁不是并发容器对象本身,对其加锁毫无意义
比如说:
ConcurrentHashMap 用于替代同步的 Map
CopyOnWriteArrayList 用于在遍历操作为主要操作的情况下替代同步的 List
…
使用并发容器,可以获得比同步容器更高的并发性能。
由于并发容器的锁是对象本身,所以对并发容器对象加锁是没有意义的:
1 2 3 4 5 6 7 public class SyncContainer { public void delete (ConcurrentHashMap<K, V> map) { synchronized (map) { map.remove(key); } } }
ConcurrentHashMap 的锁并不是它自己,所以加锁操作毫无意义。
如果需要对整个容器进行独占访问,并发容器并不适合,这时应该用同步容器。
三、同步工具类
同步工具类,根据自身的状态来协调线程的控制流
特点:
封装了一些状态,这些状态决定执行的线程是继续执行还是等待
提供了一些方法来对状态进行访问
提供一些方法来等待同步工具类达到某个状态
同步工具类包括:
闭锁(Latch)
信号量(Semaphore)
栅栏(Barrier)
阻塞队列(BlockingQueue)
3.1 闭锁
延迟线程的进度直到其到达终止状态
用于确保某些活动直到其他活动都完成后才继续执行
比如说,一个线程等待其他线程完成工作后,再继续执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public long timeTask (int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch (1 ); final CountDownLatch endGate = new CountDownLatch (nThreads); for (int i = 0 ; i < nThreads; i++) { Thread t = new Thread () { @Override public void run () { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; }
闭锁的作用,就是等待某些线程异步操作完成,再继续往下执行。
3.2 信号量
控制同时访问某个特定资源的操作数量
控制同时执行某个操作的线程数量
比如说,信号量可以用于控制容器的元素数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class BoundHashSet <T> { private final Set<T> set; private final Semaphore sem; public BoundHashSet (int bound) { set = Collections.synchronizedSet(new HashSet <>()); sem = new Semaphore (bound); } public boolean add (T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false ; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) { sem.release(); } } } public boolean remove (T o) { boolean wasRemoved = set.remove(o); if (wasRemoved) { sem.release(); } return wasRemoved; } }
通过信号量的数量限制,可以控制同时访问某个资源的线程数量。
3.3 栅栏
阻塞一组线程直到某个事件发生
所有线程必须同时到达栅栏位置,才能继续执行
闭锁用于等待事件,栅栏则是等待线程
栅栏是可以重复使用的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public long timeTask (int nThreads, final Runnable task) throws Exception { class Time { public long value = 0 ; } final Time time = new Time (); final CyclicBarrier gate = new CyclicBarrier (nThreads + 1 , () -> time.value = System.nanoTime() - time.value); for (int i = 0 ; i < nThreads; i++) { Thread t = new Thread () { @Override public void run () { try { gate.await(); task.run(); gate.await(); } catch (Exception e) { e.printStackTrace(); } } }; t.start(); } gate.await(); gate.await(); return time.value; }
与闭锁不同,闭锁需要等到某个事件通知,如 countDown(),线程才会继续往下执行。
而栅栏等待的就是线程,只要所有线程都到达栅栏位置,就可以继续执行下去。
四、缓存案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class Memoizer <K, V> { private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap <>(); private final Computable<K, V> computer; public Memoizer (Computable<K, V> c) { computer = c; } public V get (K key) { while (true ) { Future<V> result = cache.get(key); if (result == null ) { FutureTask<V> task = new FutureTask <>(() -> computer.compute(key)); result = cache.putIfAbsent(key, task); if (result == null ) { result = task; new Thread (task).start(); } } try { return result.get(); } catch (Exception e) { e.printStackTrace(); } } } }