在上一部分中,我们改进了日志记录系统。我们使用直接类型交换器,而不是使用扇出类型交换器,这使我们能够选择性地接收消息。尽管有所改进,但是我们的系统仍然存在局限性,例如,我们无法基于多个条件接收消息。例如,在我们的系统中,我们可能不仅要根据消息的严重性级别,还要根据消息源来重定向消息。例如,如syslog unix工具中所示,它不仅根据严重性级别(info / warn / crit ..),还根据消息源(auth / cron / kern ...)重定向消息。这可以给我们带来更大的灵活性,例如,我们可以只接收来自“ cron”的关键消息,也可以接收来自“ kern”的所有消息。要实现这样的系统,我们必须熟悉更复杂类型的交换器主题。主题类型交换器
发送到主题类型交换器的消息应具有路由密钥,该路由密钥是一组由点分隔的单词。单词可以是任意的,但通常与消息的某些功能相关。以下是一些有效路由键的示例:“ stock.usd.nyse”,“ nyse.vmw”,“ quick.orange.rabbit”。密钥限制为255个字节。绑定密钥应以类似的方式指定。主题类型交换器中消息的路由类似于直接类型交换器的路由-将消息与绑定键匹配的路由发送到队列。但是,有两个区别:- 绑定键中的*(星号)只能替换为一个单词
- 绑定键中的#(磅)可以用零个或多个单词代替
这可以通过以下图像进行说明:(
图像取自RabbitMQ官方网站)在此示例中,我们将发送代表动物的消息。邮件路由键由三个词(和两个点)组成。第一个单词代表速度,第二个单词代表颜色,第三个单词代表物种。在我们的示例中,两个队列连接到交换器:带有绑定键“ * .orange。*”的 Q1 和带有两个绑定键“ *。*。Rabbit”和“ lazy。#”的 Q2 。这些链接将意味着以下路由:- Q1对所有橙色动物都感兴趣。
- Q2 (rabbit) (lazy)
带有路由密钥“ quick.orange.rabbit”的消息将在两个队列中传递。带有密钥“ lazy.orange.elephant”的消息也将同时传递给这两个消息。但是,带有键“ quick.orange.fox”的消息只会放在第一位,而带有键“ lazy.brown.fox”的消息只会放在第二位。尽管路由密钥与两个绑定密钥都匹配,但“ Lazy.pink.rabbit”将再次发送一次。“ Quick.brown.fox”将不会进入任何队列,因为路由密钥与任何绑定密钥都不匹配。如果您尝试发送的消息中路由键中的字数较小或较大(例如,“橙色”或“ quick.orange.male.rabbit”),则该消息将被丢弃。如果将#指定为绑定键,则主题类型交换器的行为类似于扇出类型交换器。或者,如果您未在绑定键中指定* *或#,而直接指定一个单词,则为直接显示。将代码放在一起
我们将在我们的日志记录系统中使用主题类型交换器。让我们从一个假设开始,我们的消息路由键看起来像“ source.strict”。该代码与上一部分几乎相同,这里是send.php:try {
$producer = \MonsterMQ\Client\Producer();
$producer->connect('127.0.0.1', 5672);
$producer->logIn('guest', 'guest');
$producer->newTopicExchange('topic-logs');
$routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$message = implode(' ', array_slice($argv, 2));
$message = empty($message) ? "Hello World!" : $message;
$producer->publish($message, $routingKey, 'topic-logs');
echo "\n Sent {$message} \n";
} catch(\Exception $e) {
var_dump($e);
}
代码worker-1.phptry {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-1')->setExclusive()->declare();
$consumer->newTopicExchange('topic-logs');
$bindingKeys = array_slice($argv, 1);
if (empty($bindingKeys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($bindingKeys as $key) {
$producer->queue('queue-1')->bind('topic-logs', $key);
}
$consumer->consume('queue-1');
echo " \n Waiting for logs. To exit press CTRL+C\n";
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
代码worker-2.phptry {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-2')->setExclusive()->declare();
$consumer->newTopicExchange('topic-logs');
$bindingKeys = array_slice($argv, 1);
if (empty($bindingKeys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($bindingKeys as $key) {
$producer->queue('queue-2')->bind('topic-logs', $key);
}
$consumer->consume('queue-2');
echo " \n Waiting for logs. To exit press CTRL+C\n";
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
要仅接收第一个工作人员的关键消息,请致电php worker-1.php "*.critical"
要绑定第二个工作线程使用的队列,请使用两个绑定键调用交换器:php worker-2.php "kern.*" "*.critical"
要发送消息,请执行以下操作:php send.php "kern.critical" "A critical kernel error"
试用这些程序。