Multithreaded sorting using a thread pool in Java

This post will explain how to implement sorting in Java using ExecutorService. The general essence of sorting is as follows:

  1. The array is broken into parts
  2. Each part of the array is sorted.
  3. We go through the ordered arrays, merge them into one

Here the ideas of merge sorting are applied, but the array is divided only into two parts (recursion is not used).

You can use the following function to merge:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Merge function code taken from here .

The essence of the merge is this: at the beginning, the pointers are on the first element for both arrays. Next, the values โ€‹โ€‹of the elements corresponding to the positions of the pointer are compared and the pointer for the smaller element is shifted to the next element, the element itself is added to the resulting array. The cycle continues until we reach the end of one of the arrays, then the rest of the second array will be copied to the end of the resulting array. Thus, the output is a sorted array.

A class was also created for multithreaded sorting; a run method was created in it, which is executed when the start () method is applied to an object of type Thread. In our case, executorService will be responsible for this. Here is the code of the merge class, the objects of which will be created to implement multithreaded sorting:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

To sort the parts of the array, the built-in java sorting was used. The following is the code for sorting using a thread pool. Time measurements are carried out for multi-threaded and conventional versions (spoiler: multi-threaded gives acceleration only on a large amount of data):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

At the beginning of the main function, the array is filled with arbitrary lines that contain numbers from 0 to 10,000,000. The number of threads on the device is taken as the number of threads. The batchSize variable is responsible for the dimension of arrays for sorting in parallel. Then an executorService is created with a fixed number of threads.

For each thread, its own object of class merge is created, then this one puts the sorting task in the queue for execution. With the help of future, we wait until everything is calculated, collect all the sorted parts of the array and merge them in turn into the resulting array. We stop the executorService and can look at the time costs of a serial and parallel implementation.

The code is here

All Articles