Penyortiran multithreaded menggunakan kumpulan thread di Jawa

Posting ini akan menjelaskan cara menerapkan penyortiran di Jawa menggunakan ExecutorService. Inti umum dari penyortiran adalah sebagai berikut:

  1. Array dipecah menjadi beberapa bagian
  2. Setiap bagian dari array diurutkan.
  3. Kami pergi melalui array yang dipesan, menggabungkan mereka menjadi satu

Di sini ide-ide penyortiran gabungan diterapkan, tetapi array dibagi hanya menjadi dua bagian (rekursi tidak digunakan).

Anda dapat menggunakan fungsi berikut untuk menggabungkan:

public static String[] merge( String[] leftPart, String[] rightPart ) {
        int cursorLeft = 0, cursorRight = 0, counter = 0;
        String[] merged = new String[leftPart.length + rightPart.length];
        while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {
            if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {
                merged[counter] = leftPart[cursorLeft];
                cursorLeft+=1;
            } else {
                merged[counter] = rightPart[cursorRight];
                cursorRight+=1;
            }
            counter++;
        }
        if ( cursorLeft < leftPart.length ) {
            System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );
        }
        if ( cursorRight < rightPart.length ) {
            System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );
        }
        return merged;
    }

Gabungkan kode fungsi yang diambil dari sini .

Inti dari penggabungan adalah ini: pada awalnya, pointer berada pada elemen pertama untuk kedua array. Selanjutnya, nilai-nilai elemen yang sesuai dengan posisi pointer dibandingkan dan pointer untuk elemen yang lebih kecil digeser ke elemen berikutnya, elemen itu sendiri ditambahkan ke array yang dihasilkan. Siklus berlanjut sampai kita mencapai akhir dari salah satu array, kemudian sisa array kedua akan disalin ke akhir array yang dihasilkan. Jadi, outputnya adalah array yang diurutkan.

Kelas juga dibuat untuk pengurutan multithreaded; metode run dibuat di dalamnya, yang dieksekusi ketika metode start () diterapkan pada objek tipe Thread. Dalam kasus kami, executorService akan bertanggung jawab untuk ini. Berikut adalah kode dari kelas gabungan, objek yang akan dibuat untuk menerapkan pengurutan multithreaded:


public class Merger implements Runnable{
    private String[] unsorted, sorted;
    public Merger(String[] unsorted) {
        this.unsorted = unsorted;
    }

    public void run() {
        int middle;
        String[] left, right;
        // array is sorted
        if ( unsorted.length <= 1 ) {
            sorted = unsorted;
        } else {
            //
            middle = unsorted.length / 2;
            left = new String[middle];
            right = new String[unsorted.length - middle];
            //split array on two
            System.arraycopy(unsorted, 0, left, 0, middle);
            System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);
            SimpleMerger leftSort = new SimpleMerger(left);
            SimpleMerger rightSort = new SimpleMerger(right);
            leftSort.sort();
            rightSort.sort();
            //sort and merge
            sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());
         }
        }
    public String[] getSorted() {
        return sorted;
    }
}

Untuk mengurutkan bagian-bagian array, penyortiran java bawaan digunakan. Berikut ini adalah kode untuk menyortir menggunakan kumpulan utas. Pengukuran waktu dilakukan untuk versi multi-threaded dan konvensional (spoiler: multi-threaded hanya memberikan akselerasi pada sejumlah besar data):


public static void main(String[] args) throws Exception {
        int arrSize = 1_000_000_0;
        String[] unsorted = new String[arrSize];
        Random randomizer = new Random();

        for ( int i = 0; i < arrSize; i++ ) {
            unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));
        }

        List<Future> futures = new ArrayList<>();
        int processorCount = Runtime.getRuntime().availableProcessors();
        int batchSize = arrSize/processorCount;
        long startTime = System.currentTimeMillis();
        // create ExecutorService
        final ExecutorService executorService = Executors
                .newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Merger> mergers = new ArrayList<>();
        for (int i = 0; i < processorCount; i++) {
            String[] part = new String[batchSize];
            System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );
            // create merger
            Merger merger = new Merger(part);

            futures.add(executorService.submit(merger));
            //add merger to list to get result in future
            mergers.add(merger);
        }
        for (Future<Double> future : futures) {
            future.get();
        }
        executorService.shutdown();
        int j = 0;
        // array to get result
        String[] mergered = new String[arrSize];
        // sequential merge of all part of array
        for (Merger merger:mergers){
            if (j == 0) {
                mergered = merger.getSorted();
                j+=1;
            }
        else{
                String[] part = merger.getSorted();
                mergered = SimpleMerger.merge( mergered, part);
            }
   }
        long timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("Program execution time is " + timeSpent + " milliseconds");
        if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}
        startTime = System.currentTimeMillis();
        Arrays.sort(unsorted);
        timeSpent = System.currentTimeMillis() - startTime;
        System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");
    }

Di awal fungsi utama, array diisi dengan garis arbitrer yang berisi angka dari 0 hingga 10.000.000. Jumlah utas pada perangkat diambil sebagai jumlah utas. Variabel batchSize bertanggung jawab atas dimensi array untuk menyortir secara paralel. Kemudian, executorService dibuat dengan jumlah utas yang tetap.

Untuk setiap utas, objek penggabungan kelasnya dibuat, lalu yang ini menempatkan tugas penyortiran dalam antrian untuk dieksekusi. Dengan bantuan masa depan, kami menunggu sampai semuanya dihitung, mengumpulkan semua bagian array yang diurutkan dan menggabungkannya menjadi array yang dihasilkan. Kami menghentikan executorService dan dapat melihat biaya waktu dari implementasi serial dan paralel.

Kodenya ada di sini

All Articles