08_02_串行算法的并行化

串行算法的并行化

一、并行化要求

  • 递归的每次迭代都是独立的
  • 将每个迭代拆分成任务并发执行

二、多个结果的获取

  • 使用 ExecutorService.invokeAll() 方法等待所有并发线程执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ParallelInvokeAllResults {

public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
// 统计所有迭代节点,节点之间相互独立
List<Node<T>> allNodes = new ArrayList<>();
for (Node<T> node : nodes) {
collectAllNodes(node, allNodes);
}

// 所有节点的计算任务
List<Callable<T>> tasks = new ArrayList<>(allNodes.size());
for (Node<T> node : nodes) {
tasks.add(node::compute);
}

// 并行方式计算
final ExecutorService exec = Executors.newCachedThreadPool();
List<Future<T>> futures = exec.invokeAll(tasks);

// 收集并行计算结果
List<T> results = new ArrayList<>(futures.size());
for (Future<T> f : futures) {
try {
results.add(f.get());
} catch (Exception e) {
results.add(null);
}
}
return results;
}

/**
* 收集所有节点
*/
private <T> void collectAllNodes(Node<T> node, List<Node<T>> allNodes) {
allNodes.add(node);
if (node.getChildren() != null) {
for (Node<T> c : node.getChildren()) {
collectAllNodes(c, allNodes);
}
}
}

}
  • 使用共享变量 BlockingQueue 实例,用于存放并发线程的返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ParallelParamQueueResults {

public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
// 并行方式计算
final ExecutorService exec = Executors.newCachedThreadPool();
final Queue<T> resultQueue = new ConcurrentLinkedQueue<>();

// 递归并行计算结果
parallelRecursive(exec, nodes, resultQueue);

// 等待计算结果
exec.shutdown();
exec.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

return resultQueue;
}

/**
* 并行递归计算
*/
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (Node<T> node : nodes) {
exec.execute(() -> {
results.add(node.compute());
});
parallelRecursive(exec, node.getChildren(), results);
}
}

}

三、单个结果的获取

  • 使用 ExecutorService.invokeAny() 方法等待任意一个并发线程执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ParallelInvokeAnyResult {

public <T> T getParallelResult(List<Node<T>> nodes) throws Exception {
// 统计所有迭代节点,节点之间相互独立
List<Node<T>> allNodes = new ArrayList<>();
for (Node<T> node : nodes) {
collectAllNodes(node, allNodes);
}

// 所有节点的计算任务
List<Callable<T>> tasks = new ArrayList<>(allNodes.size());
for (Node<T> node : nodes) {
tasks.add(node::compute);
}

// 并行方式计算
final ExecutorService exec = Executors.newCachedThreadPool();
T result = exec.invokeAny(tasks);

return result;
}

/**
* 收集所有节点
*/
private <T> void collectAllNodes(Node<T> node, List<Node<T>> allNodes) {
allNodes.add(node);
if (node.getChildren() != null) {
for (Node<T> c : node.getChildren()) {
collectAllNodes(c, allNodes);
}
}
}

}

  • 使用共享变量闭锁 CountDownLatch(1) 实例,保证只设置一次返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ParallelLatchResult {

public <T> T getParallelResult(List<Node<T>> nodes) throws Exception {
final ExecutorService exec = Executors.newCachedThreadPool();

// 递归并行计算结果
ValueLatch<T> result = new ValueLatch<>();
parallelRecursive(exec, nodes, result);

// 等待计算结果
exec.shutdown();
exec.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

return result.getValue();
}

/**
* 并行递归计算
*/
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final ValueLatch<T> valueLatch) {
for (Node<T> node : nodes) {
exec.execute(() -> {
T result = node.compute();
if (result != null) {
valueLatch.setValue(result);
}
});
parallelRecursive(exec, node.getChildren(), valueLatch);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ValueLatch<T> {
private T value = null;
private final CountDownLatch latch = new CountDownLatch(1);

public boolean isSet() {
return latch.getCount() == 0;
}

public synchronized void setValue(T value) {
if (!isSet()) {
this.value = value;
latch.countDown();
}
}

public T getValue() throws InterruptedException {
latch.await();
synchronized (this) {
return value;
}
}
}

四、无结果的处理

  • 统计任务的数量,当最后一个任务执行完成后,如果还是没有结果,返回 null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ParallelNoResult {

public <T> T getParallelResult(List<Node<T>> nodes) throws Exception {
final ExecutorService exec = Executors.newCachedThreadPool();
final AtomicInteger countTask = new AtomicInteger(0);

// 递归并行计算结果
ValueLatch<T> result = new ValueLatch<>();
parallelRecursive(exec, nodes, result, countTask);

// 等待计算结果
exec.shutdown();
exec.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

return result.getValue();
}

/**
* 并行递归计算
*/
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final ValueLatch<T> valueLatch,
final AtomicInteger countTask) {
for (Node<T> node : nodes) {
exec.execute(() -> {
try {
// 统计任务的数量
countTask.incrementAndGet();
T result = node.compute();
if (result != null) {
// 有可能导致没有返回结果
valueLatch.setValue(result);
}
} finally {
if (countTask.decrementAndGet() == 0) {
// 最后一个任务结束时,返回 null
valueLatch.setValue(null);
}
}
});
parallelRecursive(exec, node.getChildren(), valueLatch, countTask);
}
}

}
作者

jiaduo

发布于

2022-05-15

更新于

2023-04-03

许可协议