Menggunakan paralelisasi saat memproses data dalam C #



Hari baik untuk semua! Saya seorang spesialis teknis yang bekerja dalam sistem audit internal, tanggung jawab saya termasuk membuat alat ETL dalam bahasa pemrograman C #.

Secara berkala, sumber data adalah file yang terstruktur dengan ketat dalam format xml, csv, json, atau lainnya. Terkadang jumlah mereka menjadi cukup besar dan terus meningkat. Misalnya, dalam salah satu tugas saya, jumlah file meningkat dengan rata-rata refresh rate sekitar 150.000 file per hari. Jika pada saat yang sama memproses satu file (membaca array byte dari hard disk menjadi memori, mengubah data yang diunduh dan menulisnya ke database) membutuhkan waktu satu detik, maka menjadi jelas bahwa pemrosesan semua file akan memakan waktu lebih dari 40 jam. Dalam hal ini, kami tidak akan dapat memproses file-file ini sampai akhir, karena kecepatan meningkatkan jumlah file akan jelas lebih tinggi daripada kecepatan pemrosesan mereka.

Salah satu solusi untuk masalah ini adalah mengembangkan aplikasi untuk membuat kumpulan benang yang terpisah satu sama lain. Utas akan memproses file yang dipilih dari antrian umum. Namun, dalam kasus ini, kesulitan muncul dengan sinkronisasi arus kerja dan pembagian sumber daya, karena saling mengunci sangat mungkin terjadi.

Untuk menghindari kesulitan ini, Microsoft menambahkan perpustakaan TPL ke framework .Net (dimulai dengan versi 4.0). Saya akan memberi tahu Anda cara menggunakan perpustakaan ini untuk menyelesaikan masalah ini.

Jadi, awalnya algoritma operasi terlihat sebagai berikut:

Direktori penyimpanan file dipindai dan daftar (misalnya, Daftar) yang berisi data tentang semua file dikembalikan;
Siklus dimulai (untuk atau foreach) di mana data dari file berikutnya dibaca ke dalam memori, jika perlu, diubah dan ditulis ke database.

Jelas, operasi yang paling memakan waktu adalah membaca data dari hard disk ke dalam memori dan menulis data dari memori ke database.

Mari kita coba optimalkan algoritma kami menggunakan pustaka TPL:

Langkah 1.

Ubah daftar yang dikembalikan dengan memindai direktori penyimpanan file dari Daftar ke ConcurrentQueue.
Kenapa kita melakukan ini? Faktanya adalah bahwa kelas ConcurrentQueue adalah thread aman, yaitu, jika pada saat yang sama dua utas mencoba untuk mengekstrak data dari daftar ini atau menulis data ke dalamnya, maka kami tidak akan membuang pengecualian (Pengecualian).
Poin 1 dari algoritma kami akan terlihat seperti ini: direktori penyimpanan file dipindai dan daftar ConcurrentQueue dikembalikan berisi data tentang semua file.

Butir 2:
Mari kita ubah desain membentuk siklus pemrosesan data dari file. Ganti dengan Parallel.For atau Parallel.ForEach.

Apa perbedaan antara konstruksi baru dan untuk? Semuanya sederhana dan pada dasarnya jelas dari nama konstruk bahasa. Semua iterasi loop dilakukan dalam utas paralel. Sebagai contoh, saya akan menunjukkan organisasi loop dengan konstruk Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

di mana:

listFiles adalah kumpulan tipe ConcurrentQueue yang berisi daftar file dalam direktori;
currentFile - elemen koleksi listFiles, yang dikembalikan oleh konstruk ForEach;
dataFile - suatu kondisional beberapa struktur data dalam memori, diperoleh dengan membaca isi file ke dalam memori;
getDataFile - fungsi bersyarat yang mengembalikan konten file dalam bentuk beberapa struktur data;
TransformData - prosedur bersyarat untuk mengubah data yang diterima;
WriteToDB adalah prosedur bersyarat untuk menulis data ke database.

Dalam contoh ini, menggunakan konstruk Parallel.ForEach, kami akan mengatur loop. Dalam siklus ini, dalam aliran paralel, data dibaca dari hard disk, transformasi dan penulisan ke database. Pada saat yang sama, tidak ada masalah dengan organisasi thread paralel. Jumlah utas paralel tergantung pada jumlah inti prosesor dan beban kerjanya.

Dengan menggunakan algoritma yang diusulkan, kami akan mempercepat pemrosesan file setidaknya 2 kali. Meskipun, tentu saja, angka ini akan bervariasi tergantung pada jumlah inti dan memori mesin tempat program akan berjalan.

Selain itu, untuk mempercepat program, Anda harus meletakkan catatan di basis data dalam aliran terpisah yang berfungsi terlepas dari yang utama. Ini dapat dilakukan dengan menggunakan koleksi ConcurrentQueue untuk menghindari konflik saat menambahkan data ke antrian.

Kami menulis ulang contoh di atas, dengan mempertimbangkan pengoptimalan penulisan ke basis data.
Misalkan pembaca file mengembalikan data kepada kami di DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

Seperti yang Anda lihat, alih-alih sebuah baris dengan panggilan ke prosedur tulis dalam database, kami cukup menambah koleksi ConcurrentQueue ListData yang diuraikan dan diinisialisasi dalam utas terpisah, contoh yang threadWriteToDB digunakan dalam loop kami.

Menulis ke basis data sudah dalam aliran terpisah. Menulis ke database dapat diatur seperti halnya bekerja dengan file menggunakan konstruksi Parallel.For dan / atau Paral-lel.Foreach.

Dalam tugas saya, di mana diperlukan untuk memproses jumlah file yang sebanding, sekarang dapat memproses rata-rata dari 200.000 hingga 400.000 file per hari, dan kecepatan dibatasi dengan memuat basis data dan lebar saluran data.

All Articles