Clasificaci贸n multiproceso utilizando un grupo de subprocesos en Java

Esta publicaci贸n explicar谩 c贸mo implementar la ordenaci贸n en Java usando ExecutorService. La esencia general de la clasificaci贸n es la siguiente:

  1. La matriz se divide en partes.
  2. Cada parte de la matriz est谩 ordenada.
  3. Pasamos por las matrices ordenadas, las fusionamos en una

Aqu铆 se aplican las ideas de clasificaci贸n de fusi贸n, pero la matriz se divide solo en dos partes (no se utiliza la recursi贸n).

Puede usar la siguiente funci贸n para fusionar:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Fusionar c贸digo de funci贸n tomado de aqu铆 .

La esencia de la fusi贸n es esta: al principio, los punteros est谩n en el primer elemento para ambas matrices. A continuaci贸n, se comparan los valores de los elementos correspondientes a las posiciones del puntero y el puntero para el elemento m谩s peque帽o se desplaza al siguiente elemento, el elemento mismo se agrega a la matriz resultante. El ciclo contin煤a hasta llegar al final de una de las matrices, luego el resto de la segunda matriz se copiar谩 al final de la matriz resultante. Por lo tanto, la salida es una matriz ordenada.

Tambi茅n se cre贸 una clase para la ordenaci贸n multiproceso; se cre贸 un m茅todo de ejecuci贸n, que se ejecuta cuando el m茅todo start () se aplica a un objeto de tipo Thread. En nuestro caso, ejecutorService ser谩 responsable de esto. Aqu铆 est谩 el c贸digo de la clase de fusi贸n, cuyos objetos se crear谩n para implementar la ordenaci贸n multiproceso:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Para ordenar las partes de la matriz, se utiliz贸 la ordenaci贸n Java incorporada. El siguiente es el c贸digo para ordenar usando un grupo de subprocesos. Las mediciones de tiempo se llevan a cabo para versiones multihilo y convencionales (spoiler: multihilo da aceleraci贸n solo en una gran cantidad de datos):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

Al comienzo de la funci贸n principal, la matriz se llena con l铆neas arbitrarias que contienen n煤meros del 0 al 10,000,000. El n煤mero de hilos en el dispositivo se toma como el n煤mero de hilos. La variable batchSize es responsable de la dimensi贸n de las matrices para ordenar en paralelo. Luego se crea un ejecutorService con un n煤mero fijo de subprocesos.

Para cada subproceso, se crea su propio objeto de fusi贸n de clase, luego este coloca la tarea de clasificaci贸n en la cola para su ejecuci贸n. Con la ayuda del futuro, esperamos hasta que todo se calcule, recopilamos todas las partes ordenadas de la matriz y las fusionamos en la matriz resultante. Paramos el servicio ejecutor y podemos ver los costos de tiempo de una implementaci贸n en serie y paralela.

El codigo esta aqui

All Articles