归并排序是一种应用分治思想的稳定排序算法,算法分为两段,第一段先分数据,一直分到每个单元不能再二分为止。然后进入第二段归并,就是不断将有序的两段合并成一段。因为每次的分治阶段,也就是第一阶段中的每段之间在归并之前相互之间没有数据依赖,所以可以尝试并行实现。
直接多线程实现分析
用Thread
直接实现,首先我们要实现串行的算法,然后并行化。并行时每次遇到划分就生成新的线程,当线程资源耗尽时,就调用串行算法。每次划分都要join()
归并结果,分治处理后的结果可以用merge
方法合并。所以其实我们也仅仅只能做到一部分时间的加速(想想为什么),同时也受到系统本身硬件线程数量的制约。根据 Amdahl定律 面对复杂问题时,我们很难实现理想的线性加速比。
完整实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| import java.util.Random;
public class MergeSort {
private int[] nums; private int[] mergeArray;
public MergeSort(int[] nums){ this.nums = nums; mergeArray = new int[nums.length]; }
public Thread parallelMergeSort(final int low, final int high, final int numOfThread){
return new Thread(() -> { if (numOfThread <= 1) { mergeSort(low, high); return; }
int mid = (low + high) / 2;
Thread leftSorter = parallelMergeSort(low, mid, numOfThread / 2); Thread rightSorter = parallelMergeSort(mid + 1, high, numOfThread / 2);
leftSorter.start(); rightSorter.start();
try { leftSorter.join(); rightSorter.join(); } catch (Exception e) { e.printStackTrace(); }
merge(low, mid, high); });
}
public void mergeSort(int low, int high){ if(low >= high){ return; }
int mid = (low + high) / 2;
mergeSort(low, mid); mergeSort(mid + 1, high); merge(low, mid, high); }
private void merge(int low, int mid, int high){
for(int i = low; i <= high; i++) mergeArray[i] = nums[i];
int i = low; int j = mid + 1; int k = low;
while (i <= mid && j <= high){ if (mergeArray[i] <= mergeArray[j]){ nums[k++] = mergeArray[i++]; } else { nums[k++] = mergeArray[j++]; } }
while (i <= mid){ nums[k++] = mergeArray[i++]; }
while (j <= high){ nums[k++] = mergeArray[j++]; }
}
public static void main (String[] args) throws java.lang.Exception{
Random random = new Random();
int[] nums = new int[2000000]; for (int i=0; i < nums.length; i++) { nums[i] = random.nextInt(10000); }
MergeSort mergeSort = new MergeSort(nums);
long currentTime = System.currentTimeMillis(); Thread thread = mergeSort.parallelMergeSort(0, nums.length - 1, 1); thread.start(); thread.join(); long endTime = System.currentTimeMillis();
System.out.println(endTime - currentTime);
for (int i=0; i < nums.length; i++){ nums[i] = random.nextInt(10000); }
MergeSort mergeSort2 = new MergeSort(nums);
currentTime = System.currentTimeMillis(); Thread thread2 = mergeSort2.parallelMergeSort(0, nums.length - 1, Runtime.getRuntime().availableProcessors()); thread2.start(); thread2.join(); endTime = System.currentTimeMillis();
System.out.println(endTime - currentTime); }
}
|
Fork/Join框架分析
直接多线程的弊端:
- 线程本身的上下文切换占用的资源较多
- 实际的控制是我们自己完成的,其实上述实现并没有充分发挥全部线程,譬如说我们想要4个线程的并行能力,如果用上述实现应指定7条。想想为什么
这里用Java 7
提供的更好的Fork/Join
来实现,如果说这种实现有什么好处,第一点就是更加高级,相较于我们确实能用synchronize/wait/notify
这种同步原语去实现而言要简单多了。第二点就是拥有诸如负载均衡、工作窃取等很多优秀的特性。这些在这里不详细阐述,感兴趣可以查阅相关资料。
具体的实现很类似上述代码,只要稍作修改即可。只是我们不主动调用fork()
,改用invokeAll()
方法,这个方法的好处是在分派任务会给当前线程留一个,因此可以用满所有线程。
完整实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction;
public class MergeSort extends RecursiveAction {
private int[] nums;
public MergeSort(int[] nums){ this.nums = nums; }
@Override public void compute(){ if(nums.length <= 10){ this.mergeSort(nums); return; }
int mid = nums.length / 2;
int[] left = Arrays.copyOfRange(nums, 0, mid); int[] right = Arrays.copyOfRange(nums, mid + 1, nums.length);
MergeSort leftSorter = new MergeSort(left); MergeSort rightSorter = new MergeSort(right);
invokeAll(leftSorter, rightSorter);
merge(left, right, nums); }
public void mergeSort(int[] nums){ if(nums.length <= 1){ return; }
int mid = nums.length / 2;
int[] left = Arrays.copyOfRange(nums, 0, mid); int[] right = Arrays.copyOfRange(nums, mid + 1, nums.length);
mergeSort(left); mergeSort(right);
merge(left, right, nums); }
private void merge(int[] low, int[] high, int[] original){
int i = 0; int j = 0; int k = 0;
while (i < low.length && j < high.length){ if (low[i] <= high[j]){ original[k++] = low[i++]; } else { original[k++] = high[j++]; } }
while (i < low.length){ original[k++] = low[i++]; }
while (j < high.length){ original[k++] = high[j++]; }
}
public static void main (String[] args) throws java.lang.Exception{ Random random = new Random();
int[] nums = new int[10000000]; int[] nums2 = new int[10000000]; for (int i=0; i < nums.length; i++) { nums[i] = random.nextInt(10000); }
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); MergeSort mergeSort = new MergeSort(nums);
long currentTime = System.currentTimeMillis(); mergeSort.mergeSort(nums); long endTime = System.currentTimeMillis();
System.out.println(endTime - currentTime);
for (int i=0; i < nums.length; i++) { nums2[i] = random.nextInt(10000); } MergeSort mergeSort2 = new MergeSort(nums2);
currentTime = System.currentTimeMillis(); forkJoinPool.invoke(mergeSort2); endTime = System.currentTimeMillis();
System.out.println(endTime - currentTime); }
}
|
总结
实际测试的时候要注意缓存
和Intel睿频
技术可能造成的影响,每次测试最好生成不同的测试数据。很多分治思想的算法均可以并行实现,比如找限定范围内所有的素数(有负载均衡问题),找最大值,求数列的和等。与归并排序一样,快排也是分治思想,所以也可以并行实现。
Java 8 之后加入了Stream
的支持,进一步扩充了并行的特性。有关此概念可以参考函数式编程的内容。
参考
可以参考《计算机体系结构》、《多处理编程的艺术》与《算法导论》的相关章节进行进一步的理论分析。