Menggunakan RabbitMQ dengan MonsterMQ Bagian 2

gambar
(Gambar diambil dari situs web resmi RabbitMQ )

Pada artikel pertama, kami menulis dua program PHP yang menggunakan RabbitMQ: satu mengirim pesan, yang lain diterima. Dalam artikel ini, kita akan membahas cara membuat antrian yang akan mendistribusikan tugas yang menghabiskan banyak waktu di antara banyak pekerja (penangan pesan).

Gagasan utamanya bukanlah menyelesaikan tugas yang menghabiskan banyak waktu secara langsung, tetapi menambahkannya ke antrian dalam bentuk pesan. Kemudian, pekerja yang bekerja menerima pesan dari antrian, yang ia konsumsi dan melakukan tugas seolah-olah di latar belakang.
Konsep ini sangat relevan untuk aplikasi web, di mana, misalnya, Anda mungkin perlu melakukan beberapa tugas yang memakan waktu, yang hasilnya tidak segera diperlukan, misalnya, mengirim email atau membuat permintaan HTTP ke aplikasi pihak ketiga (misalnya, melalui libcurl untuk PHP).

Memasak


Di bagian ini, kami akan mengirim pesan yang akan mewakili tugas untuk pekerja. Karena kita tidak memiliki tugas nyata, seperti memproses gambar atau membuat file pdf, kita akan berpura-pura sibuk menggunakan fungsi sleep () . Kami akan menganggap jumlah poin dalam pesan sebagai kompleksitas tugas. Setiap titik berarti satu detik kerja, misalnya, pesan Halo ... akan membutuhkan waktu tiga detik kerja.

Kami akan sedikit mengubah skrip send.php kami dari pelajaran terakhir sehingga dapat mengirim pesan yang sewenang-wenang.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Skrip accept.php kami juga perlu diubah. Ini harus mensimulasikan operasi selama satu detik untuk setiap titik dalam pesan yang diterima. Mari kita rename file tersebut ke worker.php dan tulis kode berikut di dalamnya:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Selanjutnya, jalankan kedua skrip di terminal yang berbeda:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

Salah satu keuntungan menggunakan antrian tugas adalah kemampuan untuk mendistribusikan pekerjaan di antara banyak pekerja. Mari kita coba menjalankan dua skrip pekerja.php sekaligus di dua terminal yang berbeda. Dan yang ketiga kami akan mengirim pesan dengan skrip send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Selanjutnya, mari kita lihat apa yang dibawa oleh pekerja kita:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

Secara default, RabbitMQ akan mengirim setiap pesan berikutnya ke penerima berikutnya yang mengkonsumsi. Rata-rata, setiap penerima akan menerima jumlah pesan yang sama, metode penyebaran pesan ini disebut round-robin (dalam lingkaran). Coba ini dengan tiga pekerja atau lebih.

Konfirmasi pesan


Tugas dapat memakan waktu tertentu. Mungkin Anda tertarik pada apa yang akan terjadi pada pesan jika Anda menyelesaikan pekerjaan penerima yang tidak berhasil memproses pesan ini sampai akhir. Dengan kode kami saat ini, pesan akan dikembalikan ke antrian, karena pengakuan pesan diaktifkan secara default di MonsterMQ.

Ketika kami menggunakan konfirmasi pesan, kami memberi tahu RabbitMQ bahwa pesan tersebut telah diproses dan ia memiliki hak untuk menghapusnya dari antrian. Jika penerima menyelesaikan pekerjaannya tanpa mengirim konfirmasi (misalnya, sebagai akibat dari penghentian koneksi TCP yang tidak terduga), RabbitMQ akan memahami bahwa pesan itu tidak diproses dan akan mengembalikannya ke antrian, mencoba mengirimkannya ke pekerja lain yang tersedia. Dengan demikian, Anda dapat yakin bahwa pesan tidak akan hilang, bahkan jika terjadi penghentian tak terduga dari salah satu pekerja.

Untuk menonaktifkan pengakuan pesan di MonsterMQ, Anda bisa meneruskan true sebagai argumen kedua untuk mengkonsumsi ()

$consumer->consume('test-queue', true);

Lupa konfirmasi pesan yang diterima adalah kesalahan yang cukup umum, mudah ditoleransi, yang dapat menyebabkan konsekuensi serius. Jika ini terjadi, pesan akan dikirim berulang-ulang dan juga akan terakumulasi dalam antrian yang mengambil lebih banyak memori. Untuk men-debug kesalahan ini, gunakan rabbitmqctl untuk menampilkan bidang messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Di windows, letakkan sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Keamanan Antrian


Terlepas dari kenyataan bahwa pesan kami tidak akan hilang jika ada pekerja yang tiba-tiba menghentikan pekerjaan mereka, namun, kami masih dapat kehilangan antrian yang dibuat jika RabbitMQ ditutup.

Untuk melindungi diri Anda dari kehilangan antrian, Anda perlu mendeklarasikan antrian sebagai awet (persisten, awet). Karena antrian idempoten, yaitu, kita tidak dapat mengubah atau membuatnya kembali dengan memanggil metode deklarasi dengan nama yang sama, kita harus mendeklarasikan antrian baru. Mari kita lakukan sebagai berikut

$consumer->queue('new-queue')->setDurable()->declare();

Ingatlah untuk mengubah kode pengumuman antrian di kode pengirim juga.

Distribusi pesan yang adil


Anda mungkin telah memperhatikan bahwa pesan di antara dua pekerja kami masih belum didistribusikan secara adil. Misalnya, jika setiap pesan genap memakan waktu, dan setiap pesan ganjil diproses dengan cepat, maka satu pekerja yang menerima pesan yang memakan waktu akan selalu sibuk, sedangkan yang kedua akan menganggur. Untuk menghindari ini, Anda dapat menggunakan kualitas layanan . Mari tambahkan baris berikut ke kode kita

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Baris ini memberi tahu RabbitMQ untuk tidak mengirim pesan ke penerima sampai ia memproses dan mengakui pesan yang sekarang. perConsumer () menerapkan kualitas layanan ke semua saluran penerima, gunakan metode perChannel () jika Anda ingin menerapkan kualitas layanan hanya untuk saluran saat ini.

gambar
(Gambar diambil dari situs resmi RabbitMQ )

Menyatukan seluruh kode


Ini akan terlihat seperti send.php (pengirim) kami

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Dan pekerja penerima .php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Itu saja, dalam pelajaran selanjutnya kita akan belajar cara mengirim pesan ke banyak penerima dari antrian yang sama.

All Articles