Bekerja pintar dengan RabbitMQ di NestJS

Ketika mengembangkan sistem keuangan, mekanisme standar untuk memproses tugas-tugas yang telah diselesaikan di sebagian besar implementasi protokol AMQP tidak selalu cocok. Pada titik tertentu, kami mengalami masalah seperti itu, tetapi hal pertama yang pertama.

Implementasi standar RabbitMQ di NestJS membuatnya mudah untuk menerima pesan dalam fungsi-fungsi yang didekorasi:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Rincian lebih lanjut tentang cara kerjanya dijelaskan di sini .
Juga bekerja dengan antrian di Nest juga tercakup dalam artikel ini di Habrรฉ .

Tampaknya sesuatu yang lain mungkin diperlukan. Namun, ada sejumlah kelemahan dalam implementasi saat ini.

Masalah 1


Untuk mengirim hasil ack ke saluran, Anda harus menarik saluran secara manual dari konteks RmqContext dan mengirim sinyal ke sana.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Keputusan


Gunakan pola yang sama yang digunakan ketika bekerja dengan http handler dengan mengembalikan hasil eksekusi langsung dari fungsi sebagai objek biasa, Janji atau Dapat Diamati.

Masalah 2


Jika Anda perlu mengirim hasilnya dalam antrian terpisah, maka di setiap pengontrol yang menggunakan Kelinci Anda perlu tahu namanya. Dengan kata lain, tidak ada cara untuk mengkonfigurasi ini di 1 tempat selama inisialisasi modul dan menggunakannya secara implisit.

Keputusan


Pada tahap konfigurasi modul, tentukan antrian untuk hasil operasi yang sukses dan untuk kesalahan. Kami memutuskan untuk mengirim hasil operasi yang sukses dalam satu antrian, dan hasil yang salah di yang lain. Menggunakan solusi untuk masalah 1, jika nilai kembali, Janji atau Dapat Diamati berhasil, maka hasilnya dikirim ke antrian operasi yang sukses, jika tidak, pesan ditolak dan jatuh ke dalam antrian kesalahan, RabbitMQ membuatnya mudah untuk melakukan ini menggunakan opsi x-dead. -pertukaran newsletter dan x-dead-letter-routing-key saat membuat antrian.

Masalah 3


Sebagai pengguna perpustakaan, pengembang perlu mengetahui rincian protokol AMQP untuk mendapatkan id dari pesan berikutnya, memahami apa ack itu dan kapan harus memanggilnya, dll.

Keputusan


Tambahkan dekorator untuk mendapatkan id pesan. Alih-alih ack, kembalikan hasil eksekusi dari fungsi handler.

Masalah 4


Mungkin masalah yang paling penting: menyampaikan pesan kepada pawang lebih dari sekali. Ketika datang ke transaksi keuangan, ini adalah poin yang sangat penting, karena situasi mungkin muncul ketika uang sudah dikirim, dan operasi jatuh pada langkah terakhir - ketika menulis ke database atau mengirim pesan pengakuan ke broker. Salah satu solusi yang jelas adalah dengan menulis ID pesan yang dihasilkan oleh produsen dalam database ketika pesan diterima oleh konsumen, sebelum memproses pesan, jika belum ada, jika ada, maka tolak pesan tersebut. Tetapi protokol AMQP memberikan bendera yang dikirim kembali yang mengidentifikasi apakah pesan ini pernah dikirimkan ke klien lain, yang dapat kita gunakan untuk mendeteksi pesan yang dikirim ulang dan mengirimkannya ke antrian dengan kesalahan.Dalam implementasi saat ini di Nest, tidak ada cara untuk tidak mengirimkan pesan tersebut.

Keputusan


Jangan mengirimkan pesan ini ke pawang, tetapi catat kesalahan pada tahap menerima pesan dari pengemudi. Tentu saja, perilaku ini dapat dikonfigurasi pada tahap dekorasi metode untuk secara eksplisit menunjukkan bahwa kami masih ingin menerima pesan untuk jenis tindakan ini.

Untuk menyelesaikan semua masalah di atas, implementasi protokolnya ditulis. Seperti apa inisialisasi:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Di sini kami menunjukkan nama-nama antrian untuk mengirimkan pesan, hasil, dan kesalahan, serta antrian individual yang tidak sensitif terhadap pengiriman ulang.

Pada level controller, pekerjaannya sama seperti bekerja dengan http

@AMQP(โ€˜say_heyโ€™)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

Hasil tugas akan jatuh ke dalam antrian hasil segera setelah Observable ini dijalankan. Parameter yang dihiasi dengan @AMQPRequest sesuai dengan bidang korelasiId dari protokol. Ini adalah pengidentifikasi unik untuk pesan yang dikirim.

Parameter @AMQPParam cocok dengan isi pesan itu sendiri. Jika JSON, maka pesan akan tiba di fungsi yang sudah dikonversi ke objek. Jika jenisnya sederhana, maka pesannya dikirim apa adanya.

Pesan berikut akan ada dalam surat mati:

@AMQP(โ€˜say_heyโ€™)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(โ€˜Buyโ€™);
}

Ada apa


Tambahkan Jenis Refleksi ke AMQPParam sehingga badan pesan dikonversi ke kelas yang diteruskan. Sekarang hanya kasta untuk diketik.

Semua kode dan instruksi pemasangan tersedia di GitHub .

Setiap pengeditan dan komentar dipersilakan.

All Articles