استخدام التوازي عند معالجة البيانات في C #



يوم جيد للجميع! أنا أخصائي تقني أعمل في نظام تدقيق داخلي ، وتشمل مسؤولياتي إنشاء أدوات ETL في لغة البرمجة C #.

تصبح الملفات المنظمة بشكل صارم من xml أو csv أو json أو أي تنسيق آخر مصادر للبيانات من وقت لآخر. في بعض الأحيان يصبح عددهم كبيرًا جدًا ويتزايد باستمرار. على سبيل المثال ، في إحدى المهام الخاصة بي ، زاد عدد الملفات بمتوسط ​​معدل تحديث يبلغ حوالي 150.000 ملف يوميًا. إذا كانت معالجة ملف واحد (في نفس الوقت (قراءة مجموعة من وحدات البايت من القرص الثابت في الذاكرة ، وتحويل البيانات التي تم تنزيلها وكتابتها إلى قاعدة البيانات) تستغرق ثانية ، يصبح من الواضح أن معالجة جميع الملفات ستستغرق أكثر من 40 ساعة. في هذه الحالة ، لن نتمكن من معالجة هذه الملفات حتى النهاية ، لأن سرعة زيادة عدد الملفات ستكون أعلى بوضوح من سرعة معالجتها.

أحد حلول هذه المشكلة هو تطوير تطبيق لإنشاء مجموعة من مؤشرات الترابط المستقلة عن بعضها البعض. سوف معالجة مؤشرات الترابط الملفات المحددة من قائمة الانتظار العامة. ومع ذلك ، في هذه الحالة ، تنشأ صعوبات مع تزامن تدفقات العمل وتقاسم الموارد ، لأن الأقفال المتبادلة محتملة للغاية.

لتجنب هذه الصعوبات ، أضافت Microsoft مكتبة TPL إلى إطار عمل .NET (بدءًا من الإصدار 4.0). سأخبرك بكيفية استخدام هذه المكتبة لحل هذه المشكلة.

لذلك ، تبدو خوارزمية التشغيل في البداية كما يلي:

يتم فحص دليل تخزين الملفات ويتم إرجاع قائمة (على سبيل المثال ، قائمة) تحتوي على بيانات حول جميع الملفات ؛
تبدأ دورة (بالنسبة أو foreach) يتم فيها قراءة البيانات من الملف التالي إلى الذاكرة ، إذا لزم الأمر ، وتحويلها وكتابتها إلى قاعدة البيانات.

من الواضح أن معظم العمليات التي تستغرق وقتًا طويلاً هي قراءة البيانات من القرص الصلب إلى الذاكرة وكتابة البيانات من الذاكرة إلى قاعدة البيانات.

دعونا نحاول تحسين الخوارزمية الخاصة بنا باستخدام مكتبة TPL:

الخطوة 1. قم

بتغيير القائمة التي تم إرجاعها عن طريق مسح دليل تخزين الملفات من List إلى ConcurrentQueue.
لماذا نفعل ذلك؟ والحقيقة هي أن فئة ConcurrentQueue آمنة لمؤشر الترابط ، أي إذا حاول موضوعان في نفس الوقت استخراج البيانات من هذه القائمة أو كتابة البيانات إليها ، فلن نرمي الاستثناءات (استثناء).
ستبدو النقطة 1 من الخوارزمية على النحو التالي: يتم فحص دليل تخزين الملفات ويتم إرجاع قائمة ConcurrentQueue التي تحتوي على بيانات حول جميع الملفات.

النقطة 2:
دعنا نغير التصميم بتشكيل دورة معالجة البيانات من ملف. استبدل ب Parallel.For أو Parallel.ForEach.

ما الفرق بين البناء الجديد ولأجل؟ كل شيء بسيط وواضح في الأساس من اسم اللغة. يتم تنفيذ جميع عمليات تكرار الحلقة في خيوط متوازية. كمثال ، سأعرض تنظيم الحلقة مع بناء Parallel.ForEach:

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	var dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		WriteToDB(dataFile);
               });

حيث:

listFiles عبارة عن مجموعة من نوع ConcurrentQueue تحتوي على قائمة بالملفات في الدليل ؛
CurrentFile - عنصر من عناصر listFiles ، والذي يتم إرجاعه بواسطة بنية ForEach ؛
dataFile - وهو شرطي لبعض هيكل البيانات في الذاكرة ، يتم الحصول عليه من خلال قراءة محتويات الملف في الذاكرة ؛
getDataFile - دالة شرطية تقوم بإرجاع محتويات ملف في شكل بعض بنية البيانات ؛
TransformData - الإجراء المشروط لتحويل البيانات المستلمة ؛
WriteToDB هو إجراء شرطي لكتابة البيانات إلى قاعدة البيانات.

في هذا المثال ، باستخدام بنية Parallel.ForEach ، سننظم الحلقة. في هذه الدورة ، في التدفقات المتوازية ، تتم قراءة البيانات من القرص الثابت ، وتحويلها وكتابتها إلى قاعدة البيانات. في الوقت نفسه ، لا توجد مشاكل في تنظيم عمل التدفقات الموازية. يعتمد عدد الخيوط المتوازية على عدد نوى المعالج وعبء العمل.

باستخدام الخوارزمية المقترحة ، سوف نقوم بتسريع معالجة الملفات مرتين على الأقل. على الرغم من أن هذا الرقم سيختلف بالطبع طبقًا لعدد النوى وذاكرة الجهاز الذي سيتم تشغيل البرنامج عليه.

أيضًا ، لتسريع البرنامج ، تحتاج إلى وضع السجل في قاعدة البيانات في دفق منفصل يعمل بغض النظر عن البرنامج الرئيسي. يمكن القيام بذلك باستخدام مجموعة ConcurrentQueue لتجنب التعارضات عند إضافة البيانات إلى قائمة الانتظار.

نعيد كتابة المثال أعلاه ، مع مراعاة تحسين الكتابة إلى قاعدة البيانات.
افترض أن قارئ الملفات يعيد البيانات إلينا في DataTable):

Parallel.ForEach(listFiles, (currentFile) =>
       	  {
              	DataTable dataFile = getDataFile(currentFile.FullName);
		TransformData(dataFile);
		threadWriteToDB.ListData.Enqueue(dataFile);
               });

كما ترى ، بدلاً من سطر يحتوي على استدعاء لإجراء الكتابة في قاعدة البيانات ، فإننا ببساطة نضيف إلى مجموعة ConcurrentQueue ListData الموصوفة والمهيأة في سلسلة رسائل منفصلة ، وهو مثيل يستخدم threadWriteToDB في حلقتنا.

الكتابة إلى قاعدة البيانات موجودة بالفعل في دفق منفصل. يمكن تنظيم الكتابة إلى قاعدة البيانات بطريقة مماثلة للعمل مع الملفات باستخدام إنشاءات Parallel.For و / أو Paral-lel.Foreach.

في مهمتي ، حيث كانت هناك حاجة إلى معالجة عدد قابل للمقارنة من الملفات ، الآن ، في المتوسط ​​، يمكن معالجة 200.000 إلى 400000 ملف يوميًا ، والسرعة محدودة عن طريق تحميل قاعدة البيانات وعرض قناة البيانات.

All Articles