每天使用级联队列处理数百万个事件

成千上万的队列,在某些服务中,成千上万的队列正在传递,而大量的数据通过这些队列在我们产品的内部旋转。所有这些必须以某种神奇的方式进行处理,而不是被枪杀。在这篇文章中,我将告诉您我们在家中使用什么架构方法,拥有相当适中的技术堆栈,并且在我们的“厨房”中没有小型数据中心。



我们有什么?


因此,一方面,我们拥有一个众所周知的技术堆栈:Nginx,PHP,PostgreSQL,Redis。另一方面,每分钟在我们的系统中发生数以万计的事件,在高峰时它可以达到数十万个事件。为了弄清楚这些事件是什么以及我们应如何应对,我将做一个小小的题外话,然后告诉您我们如何开发基于事件的自动化系统。

ManyChat是用于营销自动化的平台。 Facebook页面的所有者可以将其连接到我们的平台,并配置与订户的交互自动化(换句话说,创建聊天机器人)。自动化通常由许多可能不互连的交互链组成。在这些自动化链中,订户可能会发生某些动作,例如,在系统中分配特定标签,或分配/更改订户卡中字段的值。此数据还使您可以细分受众并与页面的订户建立更相关的互动。

我们的客户确实希望基于事件的自动化-在订户内触发特定事件(例如,标记)时能够自定义动作执行的能力。

由于触发事件可以在不同的自动化链中运行,因此,对于来自客户端的所有基于事件的操作,必须有一个配置点,并且从我们的处理端,应该有一条总线来处理来自不同自动化点的订户上下文的更改,这一点很重要。

在我们的系统中,有一条公用总线,订阅者发生的所有事件都通过该总线。每天有超过5亿个事件。它们的处理非常精细-这是数据仓库中的记录,因此页面所有者有机会历史查看其订户发生的一切。

为了实现基于事件的系统,我们似乎已经拥有了一切,并且足以将业务逻辑集成到公共事件总线的处理中。但是我们对新系统有一些要求:

  • 我们不想在处理主事件总线时性能下降
  • 对于我们来说,保持新系统中的消息处理顺序很重要,因为这可能与设置自动化功能的客户的业务逻辑有关。
  • 当具有大量订户的活动页面阻塞队列并阻止“小”页面事件的处理时,避免嘈杂的邻居的影响

如果将逻辑处理集成到公共事件总线的处理中,则性能将严重下降,因为我们必须检查每个事件是否符合已配置的自动化。作为自动化设置的一部分,可以应用某些过滤器(例如,仅针对30岁以上的女性客户触发事件时启动自动化)。也就是说,当处理主总线中的事件时,将处理对数据库的大量额外请求,并且相当重的逻辑也将开始比较当前的订户上下文和自动化设置。此选项不适合我们,因此我们进一步考虑。



级联队列的组织


由于与基于事件的系统相关的业务逻辑与用于处理来自主总线的事件的逻辑非常可分离,因此我们决定将共享总线所需的事件类型放在单独的队列中,以在单独的数据流中进行进一步处理。因此,我们消除了与处理主事件总线中的性能下降相关的问题。

在同一阶段,我们决定将事件转移到下一个级联队列以将这些事件置于每个bot的单独队列中会很酷。因此,将每个漫游器的活动与其转向框架隔离开来,这使我们能够解决与嘈杂邻居的影响有关的问题。

现在,我们的数据流程图如下所示:



但是,为了使该方案起作用,我们需要解决处理新队列的问题。

我们平台上有超过一百万个连接的页面(机器人),这意味着仅在基于事件层的级别上,我们就有可能在我们的方案中获得约100万个队列。从技术角度来看,这对我们来说并不可怕。作为队列服务器,我们使用Redis及其标准数据类型,例如LIST,SORTED SET等。这意味着每个队列是谁是RAM中Redis的标准数据结构,可以动态创建或删除它,这使我们能够轻松灵活地操作系统中的大量队列。我将在单独的文章中更深入地讨论将Redis用作队列服务器的技术细节,但现在让我们回到我们的体系结构。

显然,每个机器人都有不同的活动,并且在“现在需要处理”状态下获得100万个队列的可能性非常小。但是在某个时间点,很有可能我们会有成千上万个需要处理的活动队列。这些队列的数量不断变化。这些队列本身也发生了变化,其中一些被完全减去并删除,其中一些是动态创建的,并填充了事件以进行处理。因此,我们需要想出一种有效的方法来处理它们。

处理庞大的队列


因此,我们有一堆队列。在每个时间点,可能会有一个随机量。处理每个队列的一个重要条件(在他的帖子开头提到)是,每个页面内的事件应严格顺序地进行处理。这意味着在某个时间点,每个队列不能由一个以上的工人处理,以避免竞争问题。

但是要使队列与处理程序的比率为1:1是一项可疑的任务。队列的数量在不断变化,无论是向上还是向下。运行的处理程序的数量也不是无限的,至少我们对操作系统和硬件有一定的限制,并且我们不希望工作人员闲置在空队列上。为了解决处理程序和队列之间的交互问题,我们实现了一个循环系统来处理我们的队列池。

在这里,控制线为我们提供了帮助。



当事件从共享总线转发到特定机器人的基于事件的队列时,我们还将此机器人队列的标识符放入控制队列中。控制队列仅存储池中需要处理的队列的标识符。仅唯一值存储在控制队列中,也就是说,相同的机器人队列标识符将仅在控制队列中存储一次,而不管它在其中写入了多少次。在Redis上,这是使用SORTED SET数据结构实现的。

此外,我们可以区分一定数量的工人,每个工人将从控制队列中接收机器人队列的标识符以进行处理。因此,每个工作人员将从分配给他的队列中独立地处理该块,在处理了该块之后,将已处理队列的标识符返回给控件,从而将其返回给我们的轮询。最主要的是不要忘记为整个事物提供锁,这样两个工作人员就无法并行处理同一机器人队列。如果在工作人员已经对其进行处理时,bot标识符进入控制队列,则可能出现这种情况。对于锁,我们还使用Redis作为密钥:带有TTL的值存储。

当我们从控制队列中获取带有漫游器队列标识符的任务时,我们将TTL锁定放在获取的队列上并开始处理它。如果其他使用者使用已从控制队列中处理的队列来处理任务,则他将无法锁定,无法将任务返回到控制队列并接收下一个任务。使用者处理了机器人队列后,他删除了锁并转到控制队列以进行下一个任务。

最终方案如下:



结果,使用当前方案,我们解决了主要已发现的问题:

  • 主事件总线中的性能下降
  • 事件处理违规
  • 吵闹的邻居的影响

如何处理动态负载?


该方案正在运行,但是在其中,我们有固定数量的使用者用于动态数量的队列。显然,使用这种方法,每当队列数量急剧增加时,我们就会下沉队列。对于我们的员工来说,在需要时动态启动或熄灭似乎很好。如果这不会使推出新代码的过程大大复杂化,那也很好。在这样的时刻,动手去写程序管理器会很痒。将来,我们只是这样做了,但是这个故事是不同的。

我们决定思考,为什么不再次使用所有熟悉的工具。因此,我们获得了内部API,该API可用于NGINX + PHP-FPM的标准捆绑包。因此,我们可以用API替换固定的工作程序池,然后让NGINX + PHP-FPM自己解决和管理工作程序,这足以让我们在控制队列和内部API之间只有一个控制使用者,这将队列标识符发送到我们的API,处理,队列本身将在PHP-FPM提出的工作程序中进行处理。

新方案如下:



它看起来很漂亮,但我们的控件使用者在一个线程中工作,而我们的API则同步工作。这意味着在PHP-FPM磨碎队列时,使用者每次都会挂起。这不适合我们。

使我们的API异步


但是,如果我们可以将一个任务发送到我们的API,并使其在那里处理业务逻辑,并且我们的控制使用者将遵循控制队列中的下一个任务,然后将其拉回到API中,依此类推。说到做到。

该实现需要几行代码,概念证明如下所示:

class Api {
    	public function actionDoSomething()
    	{
    		$data = $_POST;
    		$this->dropFPMSession();
    		// ,        ,     
    		//     
    	}
    
    
    	protected function dropFPMSession()
    	{
    		ignore_user_abort(true); 
    		//          
    		ob_end_flush(); //  
    		flush(); //  
    		@session_write_close(); // 
    	
    		fastcgi_finish_request(); 
    		//          
    	}
    }

在dropFPMSession()方法中,我们断开与客户端的连接,并给它一个200的响应,之后我们可以在后处理中执行任何繁琐的逻辑。在我们的案例中,客户是控制消费者。对于他来说,重要的是将任务从控制队列中快速分散到API上进行处理,并知道任务已到达API。

使用这种方法,我们摆脱了与消费者的动态控制及其自动缩放相关的许多难题。

可进一步扩展


结果,我们子系统的体系结构开始由三层组成:数据层,流程和内部API。同时,信息经过所有数据流,有关处理后的事件/任务所属的机器人。显然,我们可以使用密钥/机器人标识符进行分片,同时继续水平扩展我们的系统。

如果我们将体系结构想象成一个坚固的单元,它将看起来像这样:



增加了此类单元的数量之后,我们可以在它们前面放置一个瘦平衡器,这将把我们的事件/任务投入到必要的单元中,具体取决于分片键。



因此,我们为系统的水平缩放获得了很大的余量。

在实现业务逻辑时,您不应忘记线程安全性概念,否则您会得到意想不到的结果。

这种具有级联队列并将繁重的业务逻辑删除到异步处理中的方案已经在系统的多个部分中使用了两年以上。每个子系统在这段时间内的负载增长了数十倍,并且所提出的实现方式使我们能够轻松快速地进行扩展。同时,我们继续在我们的主堆栈上工作,而不用新的工具/语言对其进行扩展并且不增加它,从而增加了新工具的引入和支持的开销。

All Articles