并行归并排序

归并排序是一种应用分治思想的稳定排序算法,算法分为两段,第一段先分数据,一直分到每个单元不能再二分为止。然后进入第二段归并,就是不断将有序的两段合并成一段。因为每次的分治阶段,也就是第一阶段中的每段之间在归并之前相互之间没有数据依赖,所以可以尝试并行实现。

直接多线程实现分析

Thread直接实现,首先我们要实现串行的算法,然后并行化。并行时每次遇到划分就生成新的线程,当线程资源耗尽时,就调用串行算法。每次划分都要join()归并结果,分治处理后的结果可以用merge方法合并。所以其实我们也仅仅只能做到一部分时间的加速(想想为什么),同时也受到系统本身硬件线程数量的制约。根据 Amdahl定律 面对复杂问题时,我们很难实现理想的线性加速比。

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import java.util.Random;

public class MergeSort {

private int[] nums;
private int[] mergeArray;

public MergeSort(int[] nums){
this.nums = nums;
mergeArray = new int[nums.length];
}

public Thread parallelMergeSort(final int low, final int high, final int numOfThread){

return new Thread(() -> {
if (numOfThread <= 1) {
mergeSort(low, high);
return;
}

int mid = (low + high) / 2;

Thread leftSorter = parallelMergeSort(low, mid, numOfThread / 2);
Thread rightSorter = parallelMergeSort(mid + 1, high, numOfThread / 2);

leftSorter.start();
rightSorter.start();

try {
leftSorter.join();
rightSorter.join();
} catch (Exception e) {
e.printStackTrace();
}

merge(low, mid, high);
});

}

public void mergeSort(int low, int high){
if(low >= high){
return;
}

int mid = (low + high) / 2;

mergeSort(low, mid);
mergeSort(mid + 1, high);
merge(low, mid, high);
}

private void merge(int low, int mid, int high){

for(int i = low; i <= high; i++)
mergeArray[i] = nums[i];

int i = low;
int j = mid + 1;
int k = low;

while (i <= mid && j <= high){
if (mergeArray[i] <= mergeArray[j]){
nums[k++] = mergeArray[i++];
} else {
nums[k++] = mergeArray[j++];
}
}

while (i <= mid){
nums[k++] = mergeArray[i++];
}

while (j <= high){
nums[k++] = mergeArray[j++];
}

}

public static void main (String[] args) throws java.lang.Exception{

//test here
Random random = new Random();

int[] nums = new int[2000000];
for (int i=0; i < nums.length; i++) {
nums[i] = random.nextInt(10000);
}

MergeSort mergeSort = new MergeSort(nums);

long currentTime = System.currentTimeMillis();
Thread thread = mergeSort.parallelMergeSort(0, nums.length - 1, 1);
thread.start();
thread.join();
long endTime = System.currentTimeMillis();

System.out.println(endTime - currentTime);

for (int i=0; i < nums.length; i++){
nums[i] = random.nextInt(10000);
}

MergeSort mergeSort2 = new MergeSort(nums);

currentTime = System.currentTimeMillis();
Thread thread2 = mergeSort2.parallelMergeSort(0, nums.length - 1, Runtime.getRuntime().availableProcessors());
thread2.start();
thread2.join();
endTime = System.currentTimeMillis();

System.out.println(endTime - currentTime);
}

}

Fork/Join框架分析

直接多线程的弊端:

  1. 线程本身的上下文切换占用的资源较多
  2. 实际的控制是我们自己完成的,其实上述实现并没有充分发挥全部线程,譬如说我们想要4个线程的并行能力,如果用上述实现应指定7条。想想为什么

这里用Java 7提供的更好的Fork/Join来实现,如果说这种实现有什么好处,第一点就是更加高级,相较于我们确实能用synchronize/wait/notify这种同步原语去实现而言要简单多了。第二点就是拥有诸如负载均衡、工作窃取等很多优秀的特性。这些在这里不详细阐述,感兴趣可以查阅相关资料。

具体的实现很类似上述代码,只要稍作修改即可。只是我们不主动调用fork(),改用invokeAll()方法,这个方法的好处是在分派任务会给当前线程留一个,因此可以用满所有线程。

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MergeSort extends RecursiveAction {

private int[] nums;

public MergeSort(int[] nums){
this.nums = nums;
}

@Override
public void compute(){
if(nums.length <= 10){
this.mergeSort(nums);
return;
}

int mid = nums.length / 2;

int[] left = Arrays.copyOfRange(nums, 0, mid);
int[] right = Arrays.copyOfRange(nums, mid + 1, nums.length);

MergeSort leftSorter = new MergeSort(left);
MergeSort rightSorter = new MergeSort(right);

invokeAll(leftSorter, rightSorter);

merge(left, right, nums);
}

public void mergeSort(int[] nums){
if(nums.length <= 1){
return;
}

int mid = nums.length / 2;

int[] left = Arrays.copyOfRange(nums, 0, mid);
int[] right = Arrays.copyOfRange(nums, mid + 1, nums.length);

mergeSort(left);
mergeSort(right);

merge(left, right, nums);
}

private void merge(int[] low, int[] high, int[] original){

int i = 0;
int j = 0;
int k = 0;

while (i < low.length && j < high.length){
if (low[i] <= high[j]){
original[k++] = low[i++];
} else {
original[k++] = high[j++];
}
}

while (i < low.length){
original[k++] = low[i++];
}

while (j < high.length){
original[k++] = high[j++];
}

}

public static void main (String[] args) throws java.lang.Exception{
//test here
Random random = new Random();

int[] nums = new int[10000000];
int[] nums2 = new int[10000000];
for (int i=0; i < nums.length; i++) {
nums[i] = random.nextInt(10000);
}

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
MergeSort mergeSort = new MergeSort(nums);

long currentTime = System.currentTimeMillis();
mergeSort.mergeSort(nums);
long endTime = System.currentTimeMillis();

System.out.println(endTime - currentTime);

for (int i=0; i < nums.length; i++) {
nums2[i] = random.nextInt(10000);
}
MergeSort mergeSort2 = new MergeSort(nums2);

currentTime = System.currentTimeMillis();
forkJoinPool.invoke(mergeSort2);
endTime = System.currentTimeMillis();

System.out.println(endTime - currentTime);
}

}

总结

实际测试的时候要注意缓存Intel睿频技术可能造成的影响,每次测试最好生成不同的测试数据。很多分治思想的算法均可以并行实现,比如找限定范围内所有的素数(有负载均衡问题),找最大值,求数列的和等。与归并排序一样,快排也是分治思想,所以也可以并行实现。

Java 8 之后加入了Stream的支持,进一步扩充了并行的特性。有关此概念可以参考函数式编程的内容。

参考

可以参考《计算机体系结构》、《多处理编程的艺术》与《算法导论》的相关章节进行进一步的理论分析。