Multithread-Sortierung mithilfe eines Thread-Pools in Java

In diesem Beitrag wird erläutert, wie Sie die Sortierung in Java mithilfe von ExecutorService implementieren. Das allgemeine Wesen der Sortierung ist wie folgt:

  1. Das Array ist in Teile zerlegt
  2. Jeder Teil des Arrays ist sortiert.
  3. Wir gehen die geordneten Arrays durch und führen sie zu einem zusammen

Hier werden die Ideen der Zusammenführungssortierung angewendet, aber das Array ist nur in zwei Teile unterteilt (Rekursion wird nicht verwendet).

Sie können die folgende Funktion zum Zusammenführen verwenden:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Funktionscode von hier zusammenführen .

Die Essenz der Zusammenführung ist folgende: Zu Beginn befinden sich die Zeiger auf dem ersten Element für beide Arrays. Als nächstes werden die Werte der Elemente, die den Positionen des Zeigers entsprechen, verglichen und der Zeiger für das kleinere Element wird zum nächsten Element verschoben, wobei das Element selbst zu dem resultierenden Array hinzugefügt wird. Der Zyklus wird fortgesetzt, bis das Ende eines der Arrays erreicht ist. Anschließend wird der Rest des zweiten Arrays an das Ende des resultierenden Arrays kopiert. Somit ist die Ausgabe ein sortiertes Array.

Es wurde auch eine Klasse für die Multithread-Sortierung erstellt, in der eine Ausführungsmethode erstellt wurde, die ausgeführt wird, wenn die start () -Methode auf ein Objekt vom Typ Thread angewendet wird. In unserem Fall ist executorService dafür verantwortlich. Hier ist der Code der Zusammenführungsklasse, deren Objekte erstellt werden, um die Multithread-Sortierung zu implementieren:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Um die Teile des Arrays zu sortieren, wurde die integrierte Java-Sortierung verwendet. Das Folgende ist der Code zum Sortieren unter Verwendung eines Thread-Pools. Zeitmessungen werden für Multithread- und konventionelle Versionen durchgeführt (Spoiler: Multithreading beschleunigt nur bei einer großen Datenmenge):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

Zu Beginn der Hauptfunktion wird das Array mit beliebigen Zeilen gefüllt, die Zahlen von 0 bis 10.000.000 enthalten. Die Anzahl der Threads auf dem Gerät wird als Anzahl der Threads verwendet. Die Variable batchSize ist für die Dimension der Arrays zum parallelen Sortieren verantwortlich. Anschließend wird ein executorService mit einer festen Anzahl von Threads erstellt.

Für jeden Thread wird ein eigenes Objekt der Klassenzusammenführung erstellt, das die Sortieraufgabe zur Ausführung in die Warteschlange stellt. Mit Hilfe der Zukunft warten wir, bis alles berechnet ist, sammeln alle sortierten Teile des Arrays und führen sie nacheinander zum resultierenden Array zusammen. Wir stoppen den executorService und können die Zeitkosten einer seriellen und parallelen Implementierung untersuchen.

Der Code ist hier

All Articles