使用Java中的线程池进行多线程排序

这篇文章将解释如何使用ExecutorService在Java中实现排序。排序的一般本质如下:

  1. 数组分为多个部分
  2. 数组的每个部分均已排序。
  3. 我们遍历有序数组,将它们合并为一个

这里应用了合并排序的思想,但是数组仅分为两部分(不使用递归)。

您可以使用以下功能进行合并:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

合并从此处获取的功能代码

合并的本质是这样的:在开始时,两个数组的第一个元素都位于指针上。接下来,比较与指针位置相对应的元素的值,并将较小元素的指针移至下一个元素,然后将元素本身添加到结果数组中。循环一直持续到我们到达数组之一的末尾,然后将第二个数组的其余部分复制到结果数组的末尾。因此,输出是一个排序的数组。

还创建了一个用于多线程排序的类;在其中创建了一个run方法,当将start()方法应用于Thread类型的对象时将执行该方法。在我们的情况下,executorService将对此负责。这是合并类的代码,将创建其对象以实现多线程排序:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

为了对数组的各个部分进行排序,使用了内置的Java排序。以下是使用线程池进行排序的代码。时间测量是针对多线程和常规版本进行的(扰流器:多线程仅对大量数据提供加速):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

在main函数的开始,数组被任意行填充,其中包含从0到10,000,000的数字,设备上的线程数被视为线程数。batchSize变量负责并行排序的数组的维数。然后,使用固定数量的线程创建executorService。

对于每个线程,将创建其自己的类合并对象,然后该对象将排序任务放入队列以供执行。在未来的帮助下,我们将等到计算完所有内容后,收集数组中所有排序的部分,然后将它们依次合并为结果数组。我们停止executorService,然后可以查看串行和并行实现的时间成本。

代码在这里

All Articles