除了使用协程创建生成器之外,您还可以尝试使用它们来线性化现有的异步代码。让我们尝试通过一个小例子来做到这一点。采取在actor框架上编写的代码,并在协程上重写此代码的一个功能。为了构建项目,我们将使用来自coroutines分支的gcc 。我们的目标是从面条中获取回调: abActor.getA(ABActor::GetACallback([this](int a) {
abActor.getB(ABActor::GetBCallback([a, this](int b) {
abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
abActor.getA(ABActor::GetACallback([this](int a) {
abActor.getB(ABActor::GetBCallback([a, this](int b) {
std::cout << "Result " << a << " " << b << std::endl;
}));
}));
}));
}));
}));
有点:const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;
因此,让我们开始吧。演员们
首先,我们需要创建一个简单的参与者框架。创建一个成熟的actor框架是一项艰巨而艰巨的任务,因此我们只实现其中的一种。首先,创建一个基类:class Actor {
public:
using Task = std::function<void()>;
public:
virtual ~Actor();
public:
void addTask(const Task &task);
void tryRunTask();
private:
std::queue<Task> queue;
mutable std::mutex mutex;
};
这个想法基本上很简单:将作为功能对象的任务放在队列中,当我们尝试运行任务时,我们尝试完成此任务。该类的实现确认了我们的意图:Actor::~Actor() = default;
void Actor::addTask(const Task &task) {
std::lock_guard lock(mutex);
queue.push(task);
}
void Actor::tryRunTask() {
std::unique_lock lock(mutex);
if (queue.empty()) {
return;
}
const Task task = queue.front();
queue.pop();
lock.unlock();
std::invoke(task);
}
下一个类是我们的参与者将所属的“线程”:class Actor;
class ActorThread {
public:
~ActorThread();
public:
void addActor(Actor &actor);
void run();
private:
std::vector<std::reference_wrapper<Actor>> actors;
};
这里的一切也很简单:在程序的最开始,我们使用addActor方法将actor“绑定”到线程,然后使用run方法启动线程。ActorThread::~ActorThread() = default;
void ActorThread::addActor(Actor &actor) {
actors.emplace_back(actor);
}
void ActorThread::run() {
while (true) {
for (Actor &actor: actors) {
actor.tryRunTask();
}
}
}
启动线程时,我们进入无限循环,并尝试从每个参与者执行一项任务。这不是最好的解决方案,但可以用于演示。现在让我们看一下演员类的代表:class ABActor: public Actor {
public:
using GetACallback = Callback<void(int result)>;
using GetBCallback = Callback<void(int result)>;
using SaveABCallback = Callback<void()>;
public:
void getA(const GetACallback &callback);
void getB(const GetBCallback &callback);
void saveAB(int a, int b, const SaveABCallback &callback);
private:
void getAProcess(const GetACallback &callback);
void getBProcess(const GetBCallback &callback);
void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
int a = 10;
int b = 20;
};
此类本身存储2个数字-a和b,并根据要求返回其值或覆盖它们。作为回调,它接受带有必需参数的功能对象。但是,请注意一个事实,即不同的参与者可以在不同的线程中启动。因此,如果在工作结束时我们仅调用传递给该方法的回调,则将在当前的可执行线程中调用此回调,而不是在调用我们的方法并创建了此回调的线程中调用此回调。因此,我们需要在回调上创建一个包装器,以解决这种情况:template<typename C>
class Callback {
public:
template<typename Functor>
Callback(Actor &sender, const Functor &callback)
: sender(sender)
, callback(callback)
{}
public:
template<typename ...Args>
void operator() (Args&& ...args) const {
sender.addTask(std::bind(callback, std::forward<Args>(args)...));
}
private:
Actor &sender;
std::function<C> callback;
};
该包装器会记住原始的actor,并且当您尝试执行自己时,它只会向原始actor的任务队列中添加真正的回调。结果,ABActor类的实现如下所示:void ABActor::getA(const GetACallback &callback) {
addTask(std::bind(&ABActor::getAProcess, this, callback));
}
void ABActor::getAProcess(const ABActor::GetACallback &callback) {
std::invoke(callback, a);
}
void ABActor::getB(const GetBCallback &callback) {
addTask(std::bind(&ABActor::getBProcess, this, callback));
}
void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
std::invoke(callback, b);
}
void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}
void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
this->a = a;
this->b = b;
std::invoke(callback);
}
在类的接口方法中,我们仅将传递的参数绑定到类的相应“插槽”,从而创建一个任务,并将此任务放入此类的任务队列中。当任务线程开始执行任务时,它将因此调用正确的“插槽”,该插槽将执行其所需的所有操作并调用回调,后者又会将真正的回调发送到导致它的任务队列中。让我们编写一个将使用ABActor类的actor:class ABActor;
class WokrerActor: public Actor {
public:
WokrerActor(ABActor &actor)
: abActor(actor)
{}
public:
void work();
private:
void workProcess();
private:
ABActor &abActor;
};
void WokrerActor::work() {
addTask(std::bind(&WokrerActor::workProcess, this));
}
void WokrerActor::workProcess() {
abActor.getA(ABActor::GetACallback(*this, [this](int a) {
std::cout << "Result " << a << std::endl;
}));
}
放在一起:int main() {
ABActor abActor;
WokrerActor workerActor(abActor);
ActorThread thread;
thread.addActor(abActor);
thread.addActor(workerActor);
workerActor.work();
thread.run();
}
让我们关注整个代码链。首先,我们创建必要的对象并在它们之间建立连接。然后,将workProcess任务添加到actor Worker任务队列。当线程启动时,它将在队列中找到我们的任务并开始执行它。在执行过程中,我们调用ABActor类的getA方法,从而将相应的任务放入ABActor类的队列中,并完成执行。接下来,线程将从ABActor类中获取新创建的任务并执行该任务,这将导致执行getAProcess代码。此代码将调用回调,并将必要的参数传递给它-变量a。但是,由于他拥有的回调是包装器,因此,实际上,带有填充参数的真实回调将放入Worker类的队列中。并且,在该循环的下一个迭代中,当线程退出并从Worker类执行回调时,我们将看到“结果10”行的输出。Actor框架是将分散在不同物理流中的类相互交互的一种非常方便的方法。正如您应该确信的那样,类设计的特殊性在于,在每个单独的参与者中,所有动作都是完全在一个线程中执行的。流的唯一同步点是在actor框架的实现细节中完成的,并且对于程序员而言是不可见的。因此,程序员可以编写单线程代码,而不必担心包装互斥体并跟踪竞争情况,死锁和其他麻烦。不幸的是,这种解决方案是有代价的。由于只能从回调中访问执行另一个actor的结果,因此actor代码迟早会变成如下所示: abActor.getA(ABActor::GetACallback(*this, [this](int a) {
abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
abActor.getA(ABActor::GetACallback(*this, [this](int a) {
abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
std::cout << "Result " << a << " " << b << std::endl;
}));
}));
}));
}));
}));
让我们看看是否可以使用C ++ 20的创新-协程避免这种情况。但是首先,我们将指定限制。自然,我们绝不能更改参与者框架的代码。同样,我们不能更改Actor类实例的公共和私有方法的签名-ABActor和WorkerActor。让我们看看是否可以摆脱这种情况。协程。第1部分。服务员
corutin的主要思想是在创建协程时,会在堆上为其创建一个单独的堆栈框架,我们可以随时从中退出,同时保持当前的执行位置,处理器寄存器和其他必要信息。然后,我们还可以随时返回暂停的协程的执行并完成它,直到结束或下一次暂停为止。std :: coroutine_handle <>对象负责管理此数据,该数据本质上表示一个指向堆栈框架的指针(和其他必要的数据),并具有一个resume方法(或其类似物,运算符()),可将我们返回到协程的执行。基于此数据,我们首先编写getAAsync函数,然后尝试进行概括。因此,假设我们已经有std :: coroutine_handle <> coro类的实例,我们需要做什么?您必须调用已经存在的方法ABActor :: getA,它将根据需要解决此情况,但是首先您需要为getA方法创建一个回调。让我们回想一下,回调已返回到getA方法回调-getA方法的结果。此外,此回调在线程的工作线程中调用。因此,从此回调中,我们可以安全地继续执行协程,协程仅由Worker线程创建,并将继续执行其动作序列。而且,我们必须将返回的结果保存在某个地方,这当然对我们进一步有用。auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
value = result;
std::invoke(coro);
});
getA(callback);
因此,现在您需要从某个地方获取一个coroutine_handle对象的实例以及一个可以保存我们的结果的链接。将来,我们将看到调用该函数将coroutine_handle传递给我们。因此,我们所能做的就是将其传递给其他函数。让我们将此函数准备为lambda。(我们会将链接传递给变量,在该变量中回调的结果将存储到公司)。auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
value = result;
std::invoke(coro);
});
getA(callback);
};
我们将在下一个类中保存此函数。struct ActorAwaiterSimple {
int value;
std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;
ActorAwaiterSimple(
const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
)
: forwardCoroToCallback(forwardCoroToCallback)
{}
ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;
除了功能对象外,我们还将在这里保存回调中等待我们使用的值的内存(以变量值的形式)。由于我们将内存保留在此处的值下,因此我们几乎不希望将该类的实例复制或移动到某个位置。想象一下,例如,有人复制了该类,将值保存在该类的旧实例中的value变量下,然后尝试从新实例中读取它。它自然不存在,因为复制是在保存之前进行的。不愉快。因此,我们通过禁止构造函数以及复制和移动运算符来保护自己免受此麻烦。让我们继续写这个课。我们需要的下一个方法是: bool await_ready() const noexcept {
return false;
}
他回答了我们的意思是否准备发布的问题。自然,在第一次调用时,我们的价值尚未准备好,将来没有人会问我们这个问题,因此只需返回false。coroutine_handle实例将通过void await_suspend方法(std :: coroutine_handle <> coro)传递给我们,因此让我们在其中调用我们准备好的函子,并在值下面传递一个内存链接: void await_suspend(std::coroutine_handle<> coro) noexcept {
std::invoke(forwardCoroToCallback, std::ref(value), coro);
}
通过调用await_resume方法,将在正确的时间询问函数执行的结果。我们不会拒绝请求者: int await_resume() noexcept {
return value;
}
现在,可以使用co_await关键字调用我们的方法:const int a = co_await actor.abActor.getAAsync(actor);
这里会发生什么,我们已经大致代表了。首先,将创建一个类型为ActorAwaiterSimple的对象,该对象将被传输到co_await的“输入”中。他将首先询问(通过调用await_ready)我们是否意外地获得了一个结果(我们没有),然后调用await_suspend,传递了一个上下文(实际上是指向当前协程堆栈框架的指针)并中断了执行。将来,当ABActor actor完成工作并调用结果回调时,此结果(已在Worker线程线程中)将保存在ActorAwaiterSimple的唯一实例(保留在协程堆栈中),并且协程的继续将开始。Corutin将继续执行,通过调用await_resume方法获取保存的结果,并将该结果传递给变量a目前,当前Awaiter的局限性在于它只能与带有一个int类型参数的回调一起使用。让我们尝试扩展Awaiter的应用程序:template<typename... T>
struct ActorAwaiter {
std::tuple<T...> values;
std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;
ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
: storeHandler(storeHandler)
{}
ActorAwaiter(const ActorAwaiter &) = delete;
ActorAwaiter& operator=(const ActorAwaiter &) = delete;
ActorAwaiter(ActorAwaiter &&) = delete;
ActorAwaiter& operator=(ActorAwaiter &&) = delete;
bool await_ready() const noexcept {
return false;
}
void await_suspend(std::coroutine_handle<> coro) noexcept {
std::invoke(storeHandler, std::ref(values), coro);
}
template<
bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
>
void await_resume() noexcept {
}
template<
bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
>
auto await_resume() noexcept {
return std::get<0>(values);
}
template<
bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
>
std::tuple<T...> await_resume() noexcept {
return values;
}
};
在这里,我们使用std :: tuple以便能够一次保存多个变量。Sfinae被强加于await_resume方法上,以便可能并非在所有情况下都返回一个元组,而是取决于该元组中所包含的值的数目,返回void,正好为1个参数或整个元组。现在,用于创建Awaiter本身的包装程序如下所示:template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
values = std::make_tuple(std::forward<ReturnArgs>(result)...);
std::invoke(coro);
});
func(callback);
};
}
template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}
ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}
ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}
ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}
现在让我们弄清楚如何在协程中直接使用创建的类型。协程。第2部分。可恢复
从C ++的角度来看,包含单词co_await,co_yield或co_return的函数被视为协程。但是这样的函数也应该返回某种类型。我们同意不会更改函数的签名(这里我的意思是返回类型也指代签名),因此我们将不得不以某种方式摆脱它。让我们创建一个lambda协程并从我们的函数中调用它:void WokrerActor::workProcess() {
const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
const int a = co_await actor.abActor.getAAsync(actor);
const int b = co_await actor.abActor.getBAsync(actor);
co_await actor.abActor.saveABAsync(actor, a - b, a + b);
const int newA = co_await actor.abActor.getAAsync(actor);
const int newB = co_await actor.abActor.getBAsync(actor);
std::cout << "Result " << newA << " " << newB << std::endl;
};
coroutine(*this);
}
(为什么不在lambdas的捕获列表中捕获它?然后里面的所有代码会变得容易一些。但是碰巧,显然编译器中的lambda-协程还没有得到完全支持,因此该代码将无法工作。)您可以看到,我们的代码可怕的回调代码现在已经变成了非常不错的线性代码。我们剩下的就是发明ActorResumable类,让我们来看一下。struct ActorResumable {
struct promise_type {
using coro_handle = std::coroutine_handle<promise_type>;
auto get_return_object() {
return coro_handle::from_promise(*this);
}
auto initial_suspend() {
return std::suspend_never();
}
auto final_suspend() {
return std::suspend_never();
}
void unhandled_exception() {
std::terminate();
}
};
ActorResumable(std::coroutine_handle<promise_type>) {}
};
从我们的lambda生成的corutin的伪代码如下所示:ActorResumable coro() {
promise_type promise;
ActorResumable retobj = promise.get_return_object();
auto intial_suspend = promise.initial_suspend();
if (initial_suspend == std::suspend_always) {
}
try {
const int a = co_await actor.abActor.getAAsync(actor);
std::cout << "Result " << a << std::endl;
} catch(...) {
promise.unhandled_exception();
}
final_suspend:
auto final_suspend = promise.final_suspend();
if (final_suspend == std::suspend_always) {
} else {
cleanup();
}
这只是伪代码,有些事情是有意简化的。尽管如此,让我们看看会发生什么。首先,我们创建一个promise和ActorResumable。在initial_suspend()之后,我们不会暂停,而是继续前进。我们开始执行该程序的主要部分。当我们到达co_await时,我们知道我们需要暂停。我们已经在上一节中研究了这种情况,您可以返回并进行审查。在我们继续执行并将结果显示在屏幕上之后,协程执行结束。我们检查final_suspend,并清除协程的整个上下文。协程。第3部分。任务
让我们记住我们现在处于什么阶段。void WokrerActor::workProcess() {
const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
const int a = co_await actor.abActor.getAAsync(actor);
const int b = co_await actor.abActor.getBAsync(actor);
co_await actor.abActor.saveABAsync(actor, a - b, a + b);
const int newA = co_await actor.abActor.getAAsync(actor);
const int newB = co_await actor.abActor.getBAsync(actor);
std::cout << "Result " << newA << " " << newB << std::endl;
};
coroutine(*this);
}
看起来不错,但是很容易看到代码: const int a = co_await actor.abActor.getAAsync(actor);
const int b = co_await actor.abActor.getBAsync(actor);
重复2次。是否可以重构这一刻并将其置于单独的函数中?让我们勾勒出它的外观:CoroTask<std::pair<int, int>> WokrerActor::readAB() {
const int a = co_await abActor.getAAsync2(*this);
const int b = co_await abActor.getBAsync2(*this);
co_return std::make_pair(a, b);
}
void WokrerActor::workCoroProcess() {
const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
const auto [a, b] = co_await actor.readAB();
co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
const auto [newA, newB] = co_await actor.readAB();
std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
};
coroutine(*this);
}
我们剩下的就是发明CoroTask类型。让我们考虑一下。首先,在readAB函数内部使用了co_return,这意味着CoroTask必须满足Resumable接口。而且,此类的对象用于输入另一个协程的co_await。这意味着CoroTask类还必须满足Awaitable接口。让我们在CoroTask类中实现这两个接口:template <typename T = void>
struct CoroTask {
struct promise_type {
T result;
std::coroutine_handle<> waiter;
auto get_return_object() {
return CoroTask{*this};
}
void return_value(T value) {
result = value;
}
void unhandled_exception() {
std::terminate();
}
std::suspend_always initial_suspend() {
return {};
}
auto final_suspend() {
struct final_awaiter {
bool await_ready() {
return false;
}
void await_resume() {}
auto await_suspend(std::coroutine_handle<promise_type> me) {
return me.promise().waiter;
}
};
return final_awaiter{};
}
};
CoroTask(CoroTask &&) = delete;
CoroTask& operator=(CoroTask&&) = delete;
CoroTask(const CoroTask&) = delete;
CoroTask& operator=(const CoroTask&) = delete;
~CoroTask() {
if (h) {
h.destroy();
}
}
explicit CoroTask(promise_type & p)
: h(std::coroutine_handle<promise_type>::from_promise(p))
{}
bool await_ready() {
return false;
}
T await_resume() {
auto &result = h.promise().result;
return result;
}
void await_suspend(std::coroutine_handle<> waiter) {
h.promise().waiter = waiter;
h.resume();
}
private:
std::coroutine_handle<promise_type> h;
};
(我强烈建议打开此文章的背景图片。将来,这将对您有很大帮助。)因此,让我们看看这里发生了什么。1.转到lambda协程,然后立即创建WokrerActor :: readAB协程。但是,在创建此协程之后,我们就不会开始执行它(initial_suspend == suspend_always),这迫使我们中断并返回协程lambda。2. co_await lambda检查readAB是否准备就绪。结果尚未准备好(await_ready == false),这迫使其将上下文传递给CoroTask :: await_suspend方法。该上下文保存在CoroTask中,并启动了readAB3 协程的恢复。readAB协程完成所有必要的操作后,到达以下行:co_return std::make_pair(a, b);
结果,调用了CoroTask :: promise_type :: return_value方法,并且将创建的数字对保存在CoroTask:promise_type4中。由于调用了co_return方法,协程的执行结束了,这意味着该调用CoroTask :: promise_type :: final_suspend方法了。此方法返回一个自写的结构(不要忘记看图片),该结构迫使您调用final_awaiter :: await_suspend方法,该方法返回存储在步骤2中的协程lambda上下文。为什么我们不能只在这里返回suspend_always?毕竟,在此类的initial_suspend情况下,我们成功了吗?事实是,在initial_suspend中我们成功了,因为此协程被我们的lambda协程调用,然后我们返回了它。但是,当我们到达final_suspend调用的那一刻,我们的协程很可能从另一个堆栈继续进行(特别是从makeCoroCallback函数准备的lambda继续),并且如果我们在此处返回suspend_always,我们将返回到它,而不是workCoroProcess方法。5.由于final_awaiter :: await_suspend方法将上下文返回给我们,因此这迫使程序继续执行返回的上下文,即协程lambda。由于执行返回到该点:const auto [a, b] = co_await actor.readAB();
那么我们需要通过调用CoroTask :: await_resume方法来隔离保存的结果。结果被接收,传递给变量a和b,现在CoroTask实例被销毁了。6. CoroTask实例被破坏了,但是WokrerActor :: readAB上下文发生了什么?如果我们从CoroTask :: promise_type :: final_suspend返回returnsuspended_never(更确切地说,将其返回到await_ready问题将返回true),那么此时协程上下文将被清除。但是由于我们没有这样做,因此清除上下文的义务已转移给我们。我们将在CoroTask析构函数中清除此上下文,这时它已经很安全了。7.执行协程readAB,从中获取结果,清除上下文,lambda协程继续运行...ew,有点整理了。您还记得从ABActor :: getAAsync()之类的方法中,我们返回了一个自写的结构吗?实际上,通过组合从CoroTask和ActorAwaiter类的实现中获得的知识并获得类似以下内容的方法,也可以将getAAsync方法转换为协程。CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}
但这我将进行自我分析。发现
如您所见,借助协程,您可以很好地线性化异步回调代码。的确,编写辅助类型和函数的过程似乎还不太直观。所有代码都可以在资源库中找到,我也建议您查看这些讲座,以更全面地了解该主题。来自同一作者的有关协程的大量示例在此处。您也可以观看此讲座。