05_基础构建模块

基础构建模块

一、同步容器

  • 同步容器类都是线程安全的,比如 Vector 和 HashTable
  • 实现线程安全的方式是:将它们的状态都封装起来,所有对外接口都是同步方法
  • 对容器状态的访问都串行化,保证每次只有一个线程访问容器
  • 能保证单个操作是线程安全的,但是复合操作是不能保证线程安全的
  • 常见的不安全的复合操作包括:迭代、跳转、条件运算

比如,条件运算的例子:

1
2
3
4
5
6
public class SyncContainer {
public <T> T deleteLast(Vector<T> list) {
int lastIndex = list.size() - 1;
return list.remove(lastIndex);
}
}

这种复合操作,实际上是线程不安全的。

一般来说,同步容器的复合操作都需要加锁处理:

1
2
3
4
5
6
7
8
public class SyncContainer {
public <T> T deleteLast(Vector<T> list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.remove(lastIndex);
}
}
}

同步容器类的锁就是对象本身,所以是对同步容器对象加锁。

不过,对于迭代器的处理,加锁并不都是一种好方法:

  • 容器元素很多时,对迭代操作加锁,会导致容器被长时间阻塞,影响程序的性能

比如:

1
2
3
4
5
6
7
8
9
public class SyncContainerIterator {
public void visit(Vector<T> list) {
synchronized (list) {
for (T t : list) {
// do something
}
}
}
}

迭代时加锁,可能会使得容器锁无法释放,导致容器不可用。

所以,对于迭代器的处理,还可以用另一种方式处理:克隆容器。

1
2
3
4
5
6
7
8
public class SyncCloneContainerIterator {
public void visit(Vector<T> list) {
Vector<T> clone = (Vector<T>) list.clone();
for (T t : clone) {
// do something
}
}
}

不过,克隆容器时,也存在明显的性能开销,但不会对原容器造成影响。

容器的迭代器使用,除了比较明显的调用以外,一些隐藏方法也可能会用到:

  • hashCode、equals、toString
  • containsAll、removeAll、retainAll

这些方法里面,实际上也隐含了对迭代器的使用,所以,这些方法也是不安全的。

二、并发容器

  • 并发容器类都是线程安全的
  • 实现线程安全的方式是:将它们的状态都封装起来,内部对状态的访问是加锁同步的
  • 并发容器是针对多个线程并发访问设计的,允许多个线程同时访问并发容器
  • 主要用于替代同步的容器,提高并发访问的性能
  • 并发容器的锁不是并发容器对象本身,对其加锁毫无意义

比如说:

  • ConcurrentHashMap 用于替代同步的 Map
  • CopyOnWriteArrayList 用于在遍历操作为主要操作的情况下替代同步的 List

使用并发容器,可以获得比同步容器更高的并发性能。

由于并发容器的锁是对象本身,所以对并发容器对象加锁是没有意义的:

1
2
3
4
5
6
7
public class SyncContainer {
public void delete(ConcurrentHashMap<K, V> map) {
synchronized (map) {
map.remove(key);
}
}
}

ConcurrentHashMap 的锁并不是它自己,所以加锁操作毫无意义。

如果需要对整个容器进行独占访问,并发容器并不适合,这时应该用同步容器。

三、同步工具类

  • 同步工具类,根据自身的状态来协调线程的控制流
  • 特点:
    • 封装了一些状态,这些状态决定执行的线程是继续执行还是等待
    • 提供了一些方法来对状态进行访问
    • 提供一些方法来等待同步工具类达到某个状态
  • 同步工具类包括:
    • 闭锁(Latch)
    • 信号量(Semaphore)
    • 栅栏(Barrier)
    • 阻塞队列(BlockingQueue)

3.1 闭锁

  • 延迟线程的进度直到其到达终止状态
  • 用于确保某些活动直到其他活动都完成后才继续执行

比如说,一个线程等待其他线程完成工作后,再继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public long timeTask(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
// 所有线程等待开始信号
startGate.await();
try {
task.run();
} finally {
// 线程结束发起信号
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}

闭锁的作用,就是等待某些线程异步操作完成,再继续往下执行。

3.2 信号量

  • 控制同时访问某个特定资源的操作数量
  • 控制同时执行某个操作的线程数量

比如说,信号量可以用于控制容器的元素数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BoundHashSet<T> {
private final Set<T> set;
private final Semaphore sem;

public BoundHashSet(int bound) {
set = Collections.synchronizedSet(new HashSet<>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}

public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}

通过信号量的数量限制,可以控制同时访问某个资源的线程数量。

3.3 栅栏

  • 阻塞一组线程直到某个事件发生
  • 所有线程必须同时到达栅栏位置,才能继续执行
  • 闭锁用于等待事件,栅栏则是等待线程
  • 栅栏是可以重复使用的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public long timeTask(int nThreads, final Runnable task) throws Exception {
class Time {
public long value = 0;
}
final Time time = new Time();

// nThreads + 1 是加上了主线程
final CyclicBarrier gate = new CyclicBarrier(nThreads + 1,
() -> time.value = System.nanoTime() - time.value);

for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
// 等待所有线程开始
gate.await();

task.run();

// 等待所有线程结束
gate.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}

// 等待所有线程开始
gate.await();
// 等待所有线程结束
gate.await();

return time.value;
}

与闭锁不同,闭锁需要等到某个事件通知,如 countDown(),线程才会继续往下执行。

而栅栏等待的就是线程,只要所有线程都到达栅栏位置,就可以继续执行下去。

四、缓存案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Memoizer<K, V> {

private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<K, V> computer;

public Memoizer(Computable<K, V> c) {
computer = c;
}

public V get(K key) {
while (true) {
Future<V> result = cache.get(key);
if (result == null) {
FutureTask<V> task = new FutureTask<>(() -> computer.compute(key));
result = cache.putIfAbsent(key, task);

// 处理并发访问的情况
if (result == null) {
result = task;
new Thread(task).start();
}
}

try {
// 等待计算完成
return result.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}

}
作者

jiaduo

发布于

2022-05-15

更新于

2023-04-03

许可协议