عمل ذكي مع RabbitMQ في NestJS

عند تطوير الأنظمة المالية ، لا تكون الآليات القياسية لمعالجة المهام المكتملة في معظم عمليات تنفيذ بروتوكول AMQP مناسبة دائمًا. في مرحلة ما ، واجهنا مثل هذه المشكلة ، ولكن أولاً وقبل كل شيء.

يعمل التطبيق القياسي لـ RabbitMQ في NestJS على تسهيل استقبال الرسائل في الوظائف المزخرفة:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

مزيد من التفاصيل حول كيفية عمل هذا موصوفة هنا .
كما تم تغطية العمل مع قوائم الانتظار في Nest جيدًا في هذه المقالة عن حبري .

يبدو أن هناك حاجة إلى شيء آخر. ومع ذلك ، هناك عدد من العيوب في التنفيذ الحالي.

المشكلة 1


لإرسال نتيجة ack إلى قناة ، تحتاج إلى سحب القناة يدويًا خارج سياق RmqContext وإرسال إشارة إليها.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

القرار


استخدم نفس النمط الذي تم استخدامه عند العمل مع معالجات http عن طريق إرجاع نتيجة التنفيذ مباشرة من الوظيفة ككائن عادي أو وعد أو ملاحظة.

المشكلة 2


إذا كنت بحاجة إلى إرسال النتيجة في قائمة انتظار منفصلة ، فأنت بحاجة لمعرفة اسمها في كل وحدة تحكم تستخدم Rabbit. بمعنى آخر ، لا يمكن تكوين هذا في مكان واحد أثناء تهيئة الوحدة النمطية واستخدامه بشكل ضمني.

القرار


في مرحلة تكوين الوحدة النمطية ، حدد قوائم الانتظار للنتائج الناجحة للعمليات وللأخطاء. قررنا إرسال نتائج العمليات الناجحة في طابور واحد ، ونتائج العمليات الخاطئة في أخرى. باستخدام حل المشكلة 1 ، إذا نجحت قيمة الإرجاع أو الوعد أو الملاحظة ، يتم إرسال النتيجة إلى قائمة انتظار التشغيل الناجحة ، إذا لم يكن الأمر كذلك ، يتم رفض الرسالة وتقع في قائمة انتظار الخطأ ، يسهل RabbitMQ القيام بذلك باستخدام خيارات x-dead - تبادل الرسائل ومفتاح توجيه الحروف الميتة عند إنشاء قائمة الانتظار.

المشكلة 3


بصفتك مستخدم مكتبة ، يحتاج المطور إلى معرفة تفاصيل بروتوكول AMQP للحصول على معرف الرسالة التالية ، وفهم ما هو ack ومتى يتم الاتصال به ، إلخ.

القرار


قم بإضافة مصمم للحصول على معرف الرسالة. بدلاً من ack ، قم بإرجاع نتيجة التنفيذ من دالة المعالج.

المشكلة 4


ولعل المشكلة الأكثر أهمية: توصيل رسالة إلى المعالج أكثر من مرة. عندما يتعلق الأمر بالمعاملات المالية ، فهذه نقطة مهمة جدًا ، لأن الموقف قد ينشأ عندما تم إرسال الأموال بالفعل ، وسقطت العملية في الخطوة الأخيرة - عند الكتابة إلى قاعدة البيانات أو إرسال رسالة إقرار إلى وسيط الرسائل. أحد الحلول الواضحة هو كتابة معرف الرسالة الذي تم إنشاؤه بواسطة المنتج في قاعدة البيانات عندما يتلقى المستهلك الرسالة ، قبل معالجة الرسالة ، إذا لم تكن موجودة بالفعل ، إذا كان هناك واحد ، ثم رفض الرسالة. لكن بروتوكول AMQP يوفر إشارة معاد تسليمها تحدد ما إذا كانت هذه الرسالة تم تسليمها إلى عملاء آخرين ، والتي يمكننا استخدامها لاكتشاف الرسائل المعاد تسليمها وإرسالها إلى قائمة الانتظار مع وجود أخطاء.في التنفيذ الحالي في Nest ، لا توجد طريقة لعدم تسليم مثل هذه الرسائل.

القرار


لا تسلم هذه الرسالة إلى المعالج ، ولكن سجل الخطأ في مرحلة استلام الرسالة من برنامج التشغيل. بالطبع ، يمكن تكوين هذا السلوك في مرحلة تزيين الطريقة للإشارة صراحة إلى أننا ما زلنا نريد تلقي رسائل لهذا النوع من الإجراءات.

لحل جميع المشاكل المذكورة أعلاه ، تمت كتابة تنفيذه الخاص للبروتوكول. كيف تبدو التهيئة:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

هنا نشير إلى أسماء قوائم الانتظار لتسليم الرسائل والنتائج والأخطاء ، وكذلك قوائم الانتظار الفردية غير الحساسة لإعادة التسليم.

على مستوى وحدة التحكم ، يكون العمل مشابهًا للعمل مع http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

ستقع نتيجة المهمة في قائمة الانتظار الناتجة بمجرد تنفيذ هذا الملاحظة. تتوافق المعلمة المزينة بـ @ AMQPRequest مع حقل معرف الارتباط للبروتوكول. هذا معرف فريد للرسالة التي تم تسليمها.

تطابق المعلمةAMQPParam نص الرسالة نفسها. إذا كانت JSON ، فستصل الرسالة في الوظيفة التي تم تحويلها بالفعل إلى الكائن. إذا كان نوعًا بسيطًا ، فسيتم إرسال الرسالة كما هي.

الرسالة التالية ستكون في الرسالة الميتة:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

ماذا تفعل


إضافة انعكاس نوع إلى AMQPParam بحيث يتم تحويل نص الرسالة إلى الفئة التي يتم تمريرها. الآن إنها مجرد طبقة الطباع للكتابة.

تتوفر جميع التعليمات البرمجية والتثبيت على GitHub .

نرحب بأي تعديلات وتعليقات.

All Articles