Classificação multithread usando um pool de encadeamentos em Java

Esta postagem explicará como implementar a classificação em Java usando o ExecutorService. A essência geral da classificação é a seguinte:

  1. A matriz é dividida em partes
  2. Cada parte da matriz é classificada.
  3. Analisamos as matrizes ordenadas e as fundimos em uma

Aqui, as idéias de classificação de mesclagem são aplicadas, mas a matriz é dividida apenas em duas partes (a recursão não é usada).

Você pode usar a seguinte função para mesclar:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Mesclar o código de função retirado daqui .

A essência da mesclagem é a seguinte: no início, os ponteiros estão no primeiro elemento para ambas as matrizes. Em seguida, os valores dos elementos correspondentes às posições do ponteiro são comparados e o ponteiro do elemento menor é deslocado para o próximo elemento, o próprio elemento é adicionado à matriz resultante. O ciclo continua até chegarmos ao final de uma das matrizes; o restante da segunda matriz será copiado para o final da matriz resultante. Assim, a saída é uma matriz classificada.

Também foi criada uma classe para classificação multithread; um método run foi criado nela, que é executado quando o método start () é aplicado a um objeto do tipo Thread. No nosso caso, executorService será responsável por isso. Aqui está o código da classe de mesclagem, cujos objetos serão criados para implementar a classificação multithread:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Para classificar as partes da matriz, foi utilizada a classificação interna do java. A seguir está o código para classificação usando um pool de threads. As medições de tempo são realizadas nas versões multithread e convencional (spoiler: multithread dá aceleração apenas em uma grande quantidade de dados):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

No início da função principal, a matriz é preenchida com linhas arbitrárias que contêm números de 0 a 10.000.000.O número de threads no dispositivo é considerado como o número de threads. A variável batchSize é responsável pela dimensão das matrizes para classificação em paralelo. Em seguida, um executorService é criado com um número fixo de threads.

Para cada encadeamento, seu próprio objeto de mesclagem de classe é criado, e este coloca a tarefa de classificação na fila para execução. Com a ajuda do futuro, esperamos até que tudo seja calculado, coletamos todas as partes classificadas da matriz e as fundimos por sua vez na matriz resultante. Paramos o executorService e podemos analisar os custos de tempo de uma implementação serial e paralela.

O código está aqui

All Articles