Tri multithread à l'aide d'un pool de threads en Java

Ce billet expliquera comment implémenter le tri en Java à l'aide d'ExecutorService. L'essence générale du tri est la suivante:

  1. Le tableau est divisé en plusieurs parties
  2. Chaque partie du tableau est triée.
  3. Nous parcourons les tableaux ordonnés, les fusionnons en un

Ici, les idées de tri par fusion sont appliquées, mais le tableau n'est divisé qu'en deux parties (la récursivité n'est pas utilisée).

Vous pouvez utiliser la fonction suivante pour fusionner:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Code de fonction de fusion extrait d'ici .

L'essence de la fusion est la suivante: au début, les pointeurs se trouvent sur le premier élément des deux tableaux. Ensuite, les valeurs des éléments correspondant aux positions du pointeur sont comparées et le pointeur pour le plus petit élément est déplacé vers l'élément suivant, l'élément lui-même est ajouté au tableau résultant. Le cycle continue jusqu'à ce que nous atteignions la fin de l'un des tableaux, puis le reste du deuxième tableau sera copié à la fin du tableau résultant. Ainsi, la sortie est un tableau trié.

Une classe a également été créée pour le tri multithread; une méthode run y a été créée, qui est exécutée lorsque la méthode start () est appliquée à un objet de type Thread. Dans notre cas, executorService en sera responsable. Voici le code de la classe de fusion, dont les objets seront créés pour implémenter le tri multithread:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Pour trier les parties du tableau, le tri Java intégré a été utilisé. Voici le code de tri à l'aide d'un pool de threads. Les mesures de temps sont effectuées pour les versions multi-threads et conventionnelles (spoiler: multi-threaded donne l'accélération uniquement sur une grande quantité de données):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

Au début de la fonction principale, le tableau est rempli de lignes arbitraires qui contiennent des nombres de 0 à 10 000 000. Le nombre de threads sur le périphérique est considéré comme le nombre de threads. La variable batchSize est responsable de la dimension des tableaux pour le tri en parallèle. Un executorService est ensuite créé avec un nombre fixe de threads.

Pour chaque thread, son propre objet de fusion de classe est créé, puis celui-ci place la tâche de tri dans la file d'attente pour exécution. Avec l'aide du futur, nous attendons que tout soit calculé, collectons toutes les parties triées du tableau et les fusionnons à leur tour dans le tableau résultant. Nous arrêtons le executorService et pouvons regarder les coûts de temps d'une implémentation série et parallèle.

Le code est ici

All Articles