异步PHP

十年前,我们有一个经典的LAMP堆栈:Linux,Apache,MySQL和PHP,它们以mod_php的慢速模式工作。世界已经改变,速度也变得重要。 PHP-FPM出现了,它可以显着提高PHP解决方案的性能,而不必紧急地重写某些内容。

同时,ReactPHP库是使用事件循环概念开发的,用于处理来自操作系统的信号并提供异步操作的结果。 ReactPHP-AMPHP想法的发展。与ReactPHP不同,该库使用相同的事件循环,但支持协程。它们使您可以编写看起来像同步的异步代码。也许这是用PHP开发异步应用程序的最新框架。



但是越来越需要速度,工具还不够,因此PHP异步编程的思想是加快查询处理和更好地利用资源的方法之一。

这就是Anton Shabovta谈论的zloyusr)是Onliner的开发人员。超过10年的经验:我从C / C ++的桌面应用程序开始,然后转向PHP的Web开发。他用C#和Python 3编写“家庭”项目,并在PHP中尝试DDD,CQRS,事件源,异步多任务。

本文基于Anton关于PHP Russia 2019的报告的笔录在其中,我们将了解PHP中的阻塞和非阻塞操作,我们将从内部研究事件循环和异步基元(例如Promise和协程)的结构。最后,我们将在ext-async,AMPHP 3和PHP 8中找到等待我们的内容。


我们介绍了两个定义。很长一段时间,我试图找到异步和异步操作的确切定义,但是我没有找到并编写我的。
异步是软件系统不阻塞执行主线程的能力。
异步操作是在完成之前不会阻塞程序执行流的操作。

这似乎很简单,但是首先您需要了解哪些操作会阻止执行流程。

封锁作业


PHP是一种解释器语言。他逐行读取代码,将其翻译成指令并执行。下例中的哪一行代码将被阻止?

public function update(User $user)
{
    try {
        $sql = 'UPDATE users SET ...';
        return $this->connection->execute($sql, $user->data());
    } catch (\PDOException $error) {
        log($error->getMessage());
    }

    return 0;
}

如果我们通过PDO连接到数据库,则执行线程将在SQL-server:的查询字符串上被阻止return $this->connection->execute($sql, $user->data());

这是因为PHP不知道SQL Server将处理该查询多长时间以及它是否将完全执行。它等待服务器的响应,并且程序一直没有运行。

PHP还阻止所有I / O操作的执行流程。

  • 文件系统fwritefile_get_contents
  • 数据库PDOConnectionRedisClient默认情况下,几乎所有用于连接数据库的扩展都以阻止模式工作。
  • 流程execsystemproc_open这些都是阻塞操作,因为所有处理流程都是通过系统调用构建的。
  • 与标准输入/输出工作readlineechoprint

此外,以下计时器会阻止执行sleepusleep这些操作中,我们明确地告诉线程入睡一段时间。PHP将一直闲置。

异步SQL客户端


但是现代PHP是一种通用语言,不仅限于1997年的PHP / FI之类的网络。因此,我们可以从头开始编写异步SQL客户端。任务不是最琐碎的,而是可以解决的。

public function execAsync(string $query, array $params = [])
{
    $socket = stream_socket_client('127.0.0.1:3306', ...);

    stream_set_blocking($socket, false);

    $data = $this->packBinarySQL($query, $params);
    
    socket_write($socket, $data, strlen($data));
}

这样的客户做什么?它连接到我们的SQL Server,将套接字置于非阻塞模式,以SQL Server可以理解的二进制格式打包请求,然后将数据写入套接字。

由于套接字处于非阻塞模式,因此来自PHP的写操作速度很快。

但是,这样的操作会返回什么?我们不知道SQL Server将响应什么。可能需要很长时间才能完成请求,或者根本不完成。但是需要退货吗?如果使用PDO并update在SQL Server上调用查询,则返回affected rows-此查询更改的行数。我们还不能退货,因此我们只承诺退货。

诺言


这是异步编程领域的一个概念。
Promise是异步操作结果的包装对象。而且,该操作的结果仍然是我们未知的。
不幸的是,没有单一的Promise标准,并且不可能直接将标准从JavaScript领域转移到PHP。

承诺如何运作


由于还没有结果,我们只能建立一些结果callbacks



当数据可用时,有必要执行callback onResolve



如果发生错误,将执行回调onReject以处理错误。



Promise界面看起来像这样。

interface Promise
{
    const
        STATUS_PENDING = 0,
        STATUS_RESOLVED = 1,
        STATUS_REJECTED = 2
    ;

    public function onResolve(callable $callback);
    public function onReject(callable $callback);
    public function resolve($data);
    public function reject(\Throwable $error);
}

Promise具有设置回调和填充(resolve)的状态和方法,并带有数据或错误(reject)。但是存在差异和变化。方法可以被不同地调用,或者代替用于建立回调的单独方法,resolve并且reject可能有一些方法,例如在AMPHP中。

填充Promise resolvereject在单独的对象中提取Deferred-存储状态异步函数的常用技术它可以被视为Promise的一种工厂。这是一次性的:一拖一便许下诺言。



如果我们决定自己编写,如何在SQL客户端中应用呢?

异步SQL客户端


首先,我们创建了Deferred,使用套接字完成了所有工作,记录了数据并返回Promise-一切都很简单。

public function execAsync(string $query, array $params = [])
{
    $deferred = new Deferred;

    $socket = stream_socket_client('127.0.0.1:3306', ...);
    stream_set_blocking($socket, false);

    $data = $this->packBinarySQL($query, $params);
    socket_write($socket, $data, strlen($data));

    return $deferred->promise();
}

当我们有了Promise时,我们可以例如:

  • 设置回调并获得affected rows返回给我们的回调PDOConnection;
  • 处理错误,添加到日志中;
  • 如果SQL Server响应错误,请重试查询。

$promise = $this->execAsync($sql, $user->data());

$promise->onResolve(function (int $rows) {
    echo "Affected rows: {$rows}";
});

$promise->onReject(function (\Throwable $error) {
    log($error->getMessage());
});

问题仍然存在:我们设置了回调,谁将呼叫resolveand reject

事件循环


一个事件循环的概念-事件循环他能够在异步环境中处理消息。对于异步I / O,这些将是来自操作系统的消息,表明套接字已准备好读取或写入。

怎么运行的。

  • 客户端告诉事件循环,它对某种套接字感兴趣。
  • 事件循环通过系统调用轮询OS stream_select:是否已准备好套接字,是否已写入所有数据,是否是来自另一端的数据。
  • 如果操作系统报告套接字未准备好,已阻塞,则事件循环将重复循环。
  • 当操作系统通知套接字已准备就绪时,事件循环将控制权返回给客户端并启用(resolvereject)承诺。



我们在代码中表达了这一概念:采用最简单的情况,消除错误处理和其他细微差别,从而保留一个无限循环。在每次迭代中,它将轮询操作系统有关准备读取或写入的套接字的信息,并为特定套接字调用回调。

public static function run()
{
    while (true) {
        stream_select($readSockets, $writeSockets, null, 0);
        
        foreach ($readSockets as $i => $socket) {
            call_user_func(self::readCallbacks[$i], $socket);
        }

        // Do same for write sockets
    }
}

我们补充了我们的SQL客户端。我们通知事件循环,一旦来自SQL Server的数据到达我们正在使用的套接字,就需要将Deferred置于“完成”状态,并将数据从套接字传输到Promise。

public function execAsync(string $query, array $params = [])
{
    $deferred = new Deferred;
    ...
    Loop::onReadable($socket, function ($socket) use ($deferred) {
        $deferred->resolve(socket_read($socket));
    });

    return $deferred->promise();
}

事件循环可以处理我们的I / O与套接字一起使用他还能做什么?

  • JavaScript setTimeout setInterval — . N . Event Loop .
  • Event Loop . process control, .

Event Loop


编写事件循环不仅是可能的,而且是必要的。如果要使用异步PHP,重要的是编写自己的简单实现以了解其工作原理。但是,在生产中,我们当然不会使用它,但是我们将采用现成的实现方式:稳定,无错误且在工作中得到证明。

主要有三种实现。

ReactPHP。最古老的项目始于PHP 5.3。现在最低要求的PHP版本是5.3.8。该项目实现了JavaScript世界中Promises / A标准

AMPHP。我更喜欢使用这种实现。最低要求是PHP 7.0,因为下一版本已经是7.3。它在Promise的顶部使用协程。

旋风这是一个有趣的中文框架,开发人员在其中尝试将一些概念从Go世界移植到PHP。英文文档不完整,大部分文档都以中文发布在GitHub上。如果您会说这种语言,请继续,但是到目前为止,我不敢工作。



ReactPHP


让我们看看使用ReactPHP for MySQL的客户端的外观。

$connection = (new ConnectionFactory)->createLazyConnection();

$promise = $connection->query('UPDATE users SET ...');
$promise->then(
    function (QueryResult $command) {
        echo count($command->resultRows) . ' row(s) in set.';
    },
    function (Exception $error) {
        echo 'Error: ' . $error->getMessage();
    });

一切几乎都与我们写的相同:我们创建onnection并执行请求。我们可以设置回调以处理结果(return affected rows):

    function (QueryResult $command) {
        echo count($command->resultRows) . ' row(s) in set.';
    },

和回调以进行错误处理:

    function (Exception $error) {
        echo 'Error: ' . $error->getMessage();
    });

从这些回调中,您可以构建长链,因为thenReactPHP中的每个结果还返回Promise。

$promise
    ->then(function ($data) {
        return new Promise(...);
    })
    ->then(function ($data) {
        ...
    }, function ($error) {
        log($error);
    })
    ...

这是一个称为回调地狱的问题的解决方案。不幸的是,在ReactPHP实现中,当需要10-11个回调来正确连接RabbitMQ时,这会导致“ Promise hell”问题使用此类代码并进行修复很困难。我很快意识到这不是我的,而是切换到AMPHP。

Amphp


这个项目比ReactPHP还年轻,它提倡了不同的概念- 协程如果您考虑在AMPHP中使用MySQL,那么您会发现这与PDOConnection在PHP中使用几乎相同

$pool = Mysql\pool("host=127.0.0.1 port=3306 db=test");

try {
    $result = yield $pool->query("UPDATE users SET ...");

    echo $result->affectedRows . ' row(s) in set.';
} catch (\Throwable $error) {
    echo 'Error: ' . $error->getMessage();
}

在这里,我们创建一个池,连接并执行请求。我们可以通过通常的错误来处理错误try...catch,我们不需要回调。

但是在异步调用之前,关键字-出现在此处yield

发电机


关键字yield将我们的函数变成生成器。

function generator($counter = 1)
{
    yield $counter++;

    echo "A";

    yield $counter;

    echo "B";

    yield ++$counter;
}

一旦PHP解释器yield在体内遇到函数,它就会意识到这是一个生成器函数。在调用时会创建一个类对象,而不是执行Generator

生成器继承迭代器接口。

$generator = generator(1);

foreach ($generator as $value) {
    echo $value;
}

while ($generator->valid()) {
    echo $generator->current();

    $generator->next();
}

因此,它可以运行周期foreach,并while和其他人。但是,更有趣的是,迭代器具有方法currentnext让我们一步一步地解决它们。

运行我们的功能generator($counter = 1)我们称生成器方法current()变量的值将被返回$counter++

一旦执行生成器next(),代码将转到生成器内部的下一个调用yield两者之间的整个代码段都yield将执行,这很酷。继续旋转生成器,我们得到结果。

协程


但是生成器具有更有趣的功能-我们可以从外部将数据发送到生成器。在这种情况下,它不是生成器,而是协程或协程。

function printer() {  
    while (true) {     
        echo yield;       
    }                             
}                                

$print = printer();
$print->send('Hello');
$print->send(' PHPRussia');
$print->send(' 2019');
$print->send('!');

在代码的这一部分中,有趣的是它while (true)不会阻塞执行流程,而是会执行一次。我们将数据发送到Corutin并接收'Hello'发送多收'PHPRussia'原理很明确。

除了将数据发送到生成器之外,您还可以从内部发送错误并进行处理,这很方便。

function printer() {
    try {
        echo yield;
    } catch (\Throwable $e) {
        echo $e->getMessage();
    }
}

printer()->throw(new \Exception('Ooops...'));

总结一下。Corutin是程序的组件,该程序支持在维持当前状态的同时停止和继续执行Corutin会记住他的调用堆栈和内部数据,并可以在将来使用它们。

发电机与承诺


让我们看一下生成器和Promise接口。

class Generator
{
    public function send($data);
    public function throw(\Throwable $error);
}

class Promise
{
    public function resolve($data);
    public function reject(\Throwable $error);
}

它们的外观相同,但方法名称不同。我们可以发送数据并向生成器和Promise抛出错误。

如何使用?让我们编写一个函数。

function recoil(\Generator $generator)
{
    $promise = $generator->current();

    $promise->onResolve(function($data) use ($generator) {
        $generator->send($data);
        recoil($generator);
    };

    $promise->onReject(function ($error) use ($generator) {
        $generator->throw($error);
        recoil($generator);
    });
}

该函数获取生成器的当前值: $promise = $generator->current();

我有点夸张了。是的,我们必须检查返回给我们的当前值是否是某种instanceofPromise。如果是这样,那么我们可以要求他回电。当Promise成功并以递归方式启动该函数时,它将在内部将数据发送回生成器recoil

    $promise->onResolve(function($data) use ($generator) {
        $generator->send($data);
        recoil($generator);
    };

可以通过错误完成相同的操作。例如,如果Promise失败,则SQL服务器说:“连接太多”,那么我们可以将错误抛出到生成器中,然后继续下一步。

所有这些将我们带到了协作多任务的重要概念。

合作多任务


这是一种多任务处理,其中仅在当前任务显式声明自己准备好为处理器分配时间以执行其他任务之后,才执行下一个任务。

我很少遇到简单的事情,例如仅使用一个数据库。通常,在更新用户的过程中,您需要更新数据库中搜索索引中的数据,然后清理或更新缓存,然后再向RabbitMQ发送15条消息。在PHP中,一切看起来都像这样。



我们一个接一个地执行操作:我们更新了数据库,索引,然后更新了缓存。但是默认情况下,PHP会阻止此类操作(I / O),因此,如果仔细观察,实际上一切都是如此。



在黑暗的地方,我们封锁了。他们花费最多的时间。

如果我们以异步模式工作,那么这些部分不存在,执行时间线是间歇性的。



您可以将它们全部粘合在一起,然后一件一件地制作。



这都是为了什么?如果您看一下时间轴的大小,一开始会花费很多时间,但是一旦我们将其粘合在一起,应用程序就会加速。

事件循环和协作式多任务处理的概念很早就已在各种应用程序中使用:Nginx,Node.js,Memcached和Redis。它们都在Event Loop内部使用,并基于相同的原理构建。

自从我们开始讨论Nginx和Node.js Web服务器以来,让我们回想一下如何在PHP中处理请求。

用PHP处理请求


浏览器发送一个请求,该请求到达HTTP服务器,该服务器后面有一个FPM流池。线程之一将这个请求变为现实,连接我们的代码并开始执行它。



当下一个请求到达时,另一个FPM线程将接起它,连接代码并执行它。

这种工作方案有很多优点

  • 简单的错误处理如果出现问题,并且其中一个请求失败了,我们不需要做任何事情-下一个请求将会到来,这不会影响其工作。
  • 我们不考虑记忆我们不需要清理或监视内存。在下一个请求时,将清除所有内存。

这是一个很酷的方案,从一开始就在PHP中起作用,但仍然可以成功工作。但是也有弊端

  • 限制进程数如果服务器上有50个FPM线程,则第51个请求到达时,它将等待直到其中一个线程变为空闲。
  • 上下文切换的成本操作系统在FPM流之间切换请求。此处理器级别的操作称为上下文切换。它很昂贵,并采取了大量措施。有必要保存所有寄存器,调用堆栈,处理器中的所有内容,然后切换到另一个进程,加载其寄存器和调用堆栈,在那里再次执行某些操作,再次切换,再次保存...很长时间。

让我们以不同的方式处理这个问题-我们将用PHP本身编写一个HTTP服务器。

异步HTTP服务器




可以办到。我们已经学习了如何在非阻塞模式下使用套接字,并且HTTP连接是相同的套接字。它的外观和工作方式如何?

这是在AMPHP框架中启动HTTP服务器的示例。

Loop::run(function () {
    $app = new Application();
    $app->bootstrap();

    $sockets = [Socket\listen('0.0.0.0:80')];

    $server = new Server($sockets, new CallableRequestHandler(
        function (Request $request) use ($app) {
            $response = yield $app->dispatch($request);

            return new Response(Status::OK, [], $response);
        })
    );

    yield $server->start();
});

一切都非常简单:加载Application并创建一个套接字池(一个或多个)。

接下来,我们启动服务器,将其设置为Handler,该服务器将在每个请求上执行,并将请求发送给我们Application的服务器以获取响应。

最后要做的是启动服务器yield $server->start();

在ReactPHP中,它看起来大致相同,但是只有150个用于不同选项的回调,这不是很方便。

问题


PHP中存在一些异步问题。

缺乏标准。每个框架:Swoole,ReactPHP或AMPHP都实现了自己的Promise接口,并且它们不兼容。

从理论上讲,AMPHP可以与ReactPHP中的Promise进行交互,但是有一个警告。如果用于ReactPHP的代码编写得不太好,并且在某个地方隐式调用或创建了事件循环,那么事实证明两个事件循环将在内部旋转。

JavaScript具有实现Guzzle的相对较好的Promises / A +标准。如果框架遵循它,那就太好了。但是到目前为止,还不是。

内存泄漏。当我们以常规FPM模式在PHP中工作时,我们可能不会考虑内存。即使某个扩展程序的开发人员忘记编写出色的代码,忘记运行Valgrind并在内存流中的某个位置运行,也没关系-下一个请求将被清除并重新开始。但是在异步模式下,您负担不起,因为迟早我们会掉下去OutOfMemoryException

可以修理,但困难又痛苦。在某些情况下,Xdebug可以帮助strace解析引起的错误OutOfMemoryException

阻塞操作。在编写异步代码时,不要阻塞事件循环至关重要。一旦阻塞执行流程,应用程序就会变慢,我们的每个协程都会开始运行得更慢。

kelunik / loop-block 软件包将帮助找到AMPHP的此类操作。他将计时器设置为非常小的间隔。如果计时器不起作用,那么我们就被堵在某个地方。该程序包有助于查找阻塞位置,但并非总是如此:在某些扩展程序中可能不会注意到阻塞。

库支持:Cassandra,Influx,ClickHouse。所有异步PHP的主要问题是对库的支持。我们不能用通常的PDOConnectionRedisClient其他司机为大家-我们需要无阻塞的实现。它们还必须以非阻塞模式用PHP编写,因为C驱动程序很少提供可以集成到异步代码中的接口。

我使用Cassandra数据库驱动程序得到的最奇怪的经历。他们提供操作ExecuteAsyncGetAsync等人,但在同一时间,他们返回一个对象Future与单个方法get阻止。有机会异步获取某些东西,但是要等待结果,我们仍然会阻塞整个循环。以某种方式(例如,通过回调)执行此操作无效。我什至为Cassandra写了我的客户,因为我们在工作中使用它。

类型指示这是AMPHP和corutin的问题。

class UserRepository
{
    public function find(int $id): \Generator
    {
        $data = yield $this->db->query('SELECT ...', $id);

        return User::fill($data);
    }
}

如果它出现在function中yield,那么它将成为生成器。在这一点上,我们不能再指定正确的返回数据类型。

PHP 8


PHP 8等待着我们什么?我将告诉您有关我的假设或更确切地说是我的愿望(编者注:Dmitry Stogov 知道 PHP 8中将实际出现的内容)。

事件循环 它有可能出现,因为正在进行将某种形式的事件循环引入内核的工作。如果发生这种情况,我们将有一个函数await,如JavaScript或C#,它将使我们能够在某个位置等待异步操作的结果。在这种情况下,我们不需要任何扩展,一切都将在内核级别异步进行。


class UserRepository
{
    public function find(int $id): Promise<User>
    {
        $data = await $this->db->query('SELECT ...', $id);

        return User::fill($data);
    }
}


泛型 Go正在等待泛型,我们正在等待泛型,每个人都在等待泛型。

class UserRepository
{
    public function find(int $id): Promise<User>
    {
        $data = yield $this->db->query('SELECT ...', $id);

        return User::fill($data);
    }
}

但是我们不是在等待泛型的集合,而是要表明Promise的结果将恰好是User对象。

为什么要这样?

为了速度和性能。
PHP是一种大多数操作都受I / O约束的语言。我们很少编写与处理器中的计算紧密相关的代码。最可能的是,我们使用套接字:我们需要向数据库发出请求,读取内容,返回响应,发送文件。异步允许您加快此类代码的速度。如果我们查看1000个请求的平均响应时间,则可以将速度提高8倍,将10,000个请求的速度提高近6倍!

2020年5月13日,我们将第二次聚集在PHP俄罗斯,讨论该语言,库和框架,提高生产率的方式以及炒作解决方案的陷阱。我们已经接受了前4 份报告,但是征文通知仍在进行中。应用,如果你想与社区分享您的体验。

Source: https://habr.com/ru/post/undefined/


All Articles