这篇文章将解释如何使用ExecutorService在Java中实现排序。排序的一般本质如下:- 数组分为多个部分
- 数组的每个部分均已排序。
- 我们遍历有序数组,将它们合并为一个
这里应用了合并排序的思想,但是数组仅分为两部分(不使用递归)。您可以使用以下功能进行合并:public static String[] merge( String[] leftPart, String[] rightPart ) {
int cursorLeft = 0, cursorRight = 0, counter = 0;
String[] merged = new String[leftPart.length + rightPart.length];
while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
merged[counter] = leftPart[cursorLeft];
cursorLeft+=1;
} else {
merged[counter] = rightPart[cursorRight];
cursorRight+=1;
}
counter++;
}
if ( cursorLeft < leftPart.length ) {
System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
}
if ( cursorRight < rightPart.length ) {
System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
}
return merged;
}
合并从此处获取的功能代码。合并的本质是这样的:在开始时,两个数组的第一个元素都位于指针上。接下来,比较与指针位置相对应的元素的值,并将较小元素的指针移至下一个元素,然后将元素本身添加到结果数组中。循环一直持续到我们到达数组之一的末尾,然后将第二个数组的其余部分复制到结果数组的末尾。因此,输出是一个排序的数组。还创建了一个用于多线程排序的类;在其中创建了一个run方法,当将start()方法应用于Thread类型的对象时将执行该方法。在我们的情况下,executorService将对此负责。这是合并类的代码,将创建其对象以实现多线程排序:
public class Merger implements Runnable{
private String[] unsorted, sorted;
public Merger(String[] unsorted) {
this.unsorted = unsorted;
}
public void run() {
int middle;
String[] left, right;
if ( unsorted.length <= 1 ) {
sorted = unsorted;
} else {
middle = unsorted.length / 2;
left = new String[middle];
right = new String[unsorted.length - middle];
System.arraycopy(unsorted, 0, left, 0, middle);
System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
SimpleMerger leftSort = new SimpleMerger(left);
SimpleMerger rightSort = new SimpleMerger(right);
leftSort.sort();
rightSort.sort();
sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
}
}
public String[] getSorted() {
return sorted;
}
}
为了对数组的各个部分进行排序,使用了内置的Java排序。以下是使用线程池进行排序的代码。时间测量是针对多线程和常规版本进行的(扰流器:多线程仅对大量数据提供加速):
public static void main(String[] args) throws Exception {
int arrSize = 1_000_000_0;
String[] unsorted = new String[arrSize];
Random randomizer = new Random();
for ( int i = 0; i < arrSize; i++ ) {
unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
}
List<Future> futures = new ArrayList<>();
int processorCount = Runtime.getRuntime().availableProcessors();
int batchSize = arrSize/processorCount;
long startTime = System.currentTimeMillis();
final ExecutorService executorService = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
ArrayList<Merger> mergers = new ArrayList<>();
for (int i = 0; i < processorCount; i++) {
String[] part = new String[batchSize];
System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
Merger merger = new Merger(part);
futures.add(executorService.submit(merger));
mergers.add(merger);
}
for (Future<Double> future : futures) {
future.get();
}
executorService.shutdown();
int j = 0;
String[] mergered = new String[arrSize];
for (Merger merger:mergers){
if (j == 0) {
mergered = merger.getSorted();
j+=1;
}
else{
String[] part = merger.getSorted();
mergered = SimpleMerger.merge( mergered, part);
}
}
long timeSpent = System.currentTimeMillis() - startTime;
System.out.println("Program execution time is " + timeSpent + " milliseconds");
if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
startTime = System.currentTimeMillis();
Arrays.sort(unsorted);
timeSpent = System.currentTimeMillis() - startTime;
System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
}
在main函数的开始,数组被任意行填充,其中包含从0到10,000,000的数字,设备上的线程数被视为线程数。batchSize变量负责并行排序的数组的维数。然后,使用固定数量的线程创建executorService。对于每个线程,将创建其自己的类合并对象,然后该对象将排序任务放入队列以供执行。在未来的帮助下,我们将等到计算完所有内容后,收集数组中所有排序的部分,然后将它们依次合并为结果数组。我们停止executorService,然后可以查看串行和并行实现的时间成本。代码在这里