Linearizing asynchronous code with corutin

image

In addition to using coroutine to create generators, you can try to use them to linearize existing asynchronous code. Let's try to do this with a small example. Take the code written on the actor framework and rewrite one function of this code on coroutines. To build the project, we will use gcc from the coroutines branch .

Our goal is to get callbacks from noodles:

    abActor.getA(ABActor::GetACallback([this](int a) {
        abActor.getB(ABActor::GetBCallback([a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
                abActor.getA(ABActor::GetACallback([this](int a) {
                    abActor.getB(ABActor::GetBCallback([a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Sort of:

const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;

So let's get started.

Actors


To begin with, we need to create a simple actor's framework. Creating a full-fledged actor framework is a difficult and big task, so we only implement some kind of it.
First, create a base class:

class Actor {
public:
    using Task = std::function<void()>;
public:
    virtual ~Actor();
public:
    void addTask(const Task &task);
    void tryRunTask();
private:
    std::queue<Task> queue;
    mutable std::mutex mutex;
};

The idea is basically simple: we put tasks that are functional objects in a queue, and when we tryRunTask, we try to complete this task. The implementation of the class confirms our intentions:

Actor::~Actor() = default;

void Actor::addTask(const Task &task) {
    std::lock_guard lock(mutex);
    queue.push(task);
}

void Actor::tryRunTask() {
    std::unique_lock lock(mutex);
    if (queue.empty()) {
        return;
    }

    const Task task = queue.front();
    queue.pop();
    lock.unlock();

    std::invoke(task);
}

The next class is the "thread" to which our actors will belong:

class Actor;

class ActorThread {
public:
    ~ActorThread();
public:
    void addActor(Actor &actor);
    void run();
private:
    std::vector<std::reference_wrapper<Actor>> actors;
};

Everything is simple here too: at the very beginning of the program, we “bind” our actors to the thread using the addActor method, and then we start the thread using the run method.

ActorThread::~ActorThread() = default;

void ActorThread::addActor(Actor &actor) {
    actors.emplace_back(actor);
}

void ActorThread::run() {
    while (true) {
        for (Actor &actor: actors) {
            actor.tryRunTask();
        }
    }
}

When starting the thread, we enter an infinite loop and try to perform one task from each actor. Not the best solution, but it will do for a demonstration.

Now let's look at the representative of the actor class:

class ABActor: public Actor {
public:
    using GetACallback = Callback<void(int result)>;
    using GetBCallback = Callback<void(int result)>;
    using SaveABCallback = Callback<void()>;
public:
    void getA(const GetACallback &callback);
    void getB(const GetBCallback &callback);
    void saveAB(int a, int b, const SaveABCallback &callback);
private:
    void getAProcess(const GetACallback &callback);
    void getBProcess(const GetBCallback &callback);
    void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
    int a = 10;
    int b = 20;
};

This class stores 2 numbers in itself - a and b, and, upon request, returns their values ​​or overwrites them.

As a callback, it accepts a functional object with the necessary parameters. But let's pay attention to the fact that different actors can be launched in different threads. And therefore, if at the end of the work we just call the callback passed to the method, this callback will be called in the current executable thread, and not in the thread that called our method and created this callback. Therefore, we need to create a wrapper over the callback that will resolve this situation:

template<typename C>
class Callback {
public:
    template<typename Functor>
    Callback(Actor &sender, const Functor &callback)
        : sender(sender)
        , callback(callback)
    {}
public:
    template<typename ...Args>
    void operator() (Args&& ...args) const {
        sender.addTask(std::bind(callback, std::forward<Args>(args)...));
    }
private:
    Actor &sender;
    std::function<C> callback;
};

This wrapper remembers the original actor, and when you try to execute yourself, it simply adds a real callback to the task queue of the original actor.
As a result, the implementation of the ABActor class looks like this:

void ABActor::getA(const GetACallback &callback) {
    addTask(std::bind(&ABActor::getAProcess, this, callback));
}

void ABActor::getAProcess(const ABActor::GetACallback &callback) {
    std::invoke(callback, a);
}

void ABActor::getB(const GetBCallback &callback) {
    addTask(std::bind(&ABActor::getBProcess, this, callback));
}

void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
    std::invoke(callback, b);
}

void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
    addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}

void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
    this->a = a;
    this->b = b;
    std::invoke(callback);
}

In the interface method of the class, we simply bind the passed arguments to the corresponding “slot” of the class, thus creating a task, and put this task in the task queue of this class. When the task thread starts to perform the task, it will thus call the correct “slot”, which will perform all the actions it needs and call the callback, which in turn will send the real callback to the queue of the task that caused it.

Let's write an actor that will use the ABActor class:

class ABActor;

class WokrerActor: public Actor {
public:
    WokrerActor(ABActor &actor)
        : abActor(actor)
    {}
public:
    void work();
private:
    void workProcess();
private:
    ABActor &abActor;
};

void WokrerActor::work() {
    addTask(std::bind(&WokrerActor::workProcess, this));
}

void WokrerActor::workProcess() {
    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        std::cout << "Result " << a << std::endl;
    }));
}

And put it all together:

int main() {
    ABActor abActor;
    WokrerActor workerActor(abActor);

    ActorThread thread;
    thread.addActor(abActor);
    thread.addActor(workerActor);

    workerActor.work();

    thread.run();
}

Let's follow the whole chain of code.

In the beginning, we create the necessary objects and establish connections between them.
Then we add the workProcess task to the actor Worker task queue.
When the thread starts, it will find our task in the queue and begin to execute it.
In the process of execution, we call the getA method of the ABActor class, thereby putting the corresponding task in the queue of the ABActor class, and complete the execution.
Next, the thread will take the newly created task from the ABActor class and execute it, which will lead to the execution of the getAProcess code.
This code will call a callback, passing the necessary argument into it - the variable a. But since the callback that he owns is a wrapper, in fact, a real callback with filled parameters will be put in the queue of the Worker class.
And when, at the next iteration of the cycle, the thread pulls out and executes our callback from the Worker class, we will see the output of the line “Result 10”

Actor framework is a fairly convenient way of interacting classes scattered across different physical streams with each other. The peculiarity of class design, as you should have been convinced of this, is that within each individual actor all actions are performed entirely and in a single thread. The only point of synchronization of streams is made in the implementation details of the actor framework and is not visible to the programmer. Thus, a programmer can write single-threaded code without worrying about wrapping up mutexes and tracking race situations, deadlocks and other headaches.

Unfortunately, this solution has a price. Since the result of executing another actor is accessible only from the callback, sooner or later the actor code turns into something like this:

    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
                abActor.getA(ABActor::GetACallback(*this, [this](int a) {
                    abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Let's see if we can avoid this using the innovation of C ++ 20 - coroutines.

But first, we will specify the limitations.

Naturally, we can in no way change the code of the actor framework. Also, we cannot change the signatures of public and private methods of instances of the Actor class - ABActor and WorkerActor. Let's see if we can get out of this situation.

Coroutines. Part 1. Awaiter


The main idea of ​​corutin is that when creating coroutine, a separate stack frame is created for it on the heap, from which we can "exit" at any time, while maintaining the current execution position, processor registers and other necessary information. Then we can also at any time return to the execution of the suspended coroutine and complete it until the end or until the next suspension.

The std :: coroutine_handle <> object is responsible for managing this data, which essentially represents a pointer to the stack frame (and other necessary data), and which has a resume method (or its analogue, the operator ()), which returns us to the execution of the coroutine .

Based on this data, let's first write the getAAsync function, and then try to generalize.

So, suppose we already have an instance of the std :: coroutine_handle <> coro class, what do we need to do?

You must call the already existing method ABActor :: getA, which will resolve the situation as needed, but first you need to create a callback for the getA method.

Let's recall that a callback is returned to the getA method callback - the result of the getA method. Moreover, this callback is called in the Worker thread of the thread. Thus, from this callback, we can safely continue to execute coroutine, which was created just from the Worker thread and which will continue to carry out its sequence of actions. But also, we must somewhere save the result returned in the callback, it, of course, will be useful to us further.

auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
        value = result;
        std::invoke(coro);
 });
getA(callback);

So, now you need to take an instance of the coroutine_handle object from somewhere and a link where you can save our result.

In the future, we will see that coroutine_handle is passed to us as a result of calling the function. Accordingly, all we can do with it is to pass it to some other function. Let's prepare this function as a lambda. (We will pass the link to the variable where the result of the callback will be stored to the company).

auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
    auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
        value = result;
        std::invoke(coro);
    });
    getA(callback);
};

We will save this function in the next class.

struct ActorAwaiterSimple {
    int value;

    std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;

    ActorAwaiterSimple(
        const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
    )
        : forwardCoroToCallback(forwardCoroToCallback)
    {}

    ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
    ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;

// ...

In addition to the functional object, we will also hold here the memory (in the form of the variable value) for the value that awaits us in the callback.

Since we hold the memory under the value here, we hardly want the instance of this class to be copied or moved somewhere. Imagine, for example, someone copied this class, saved the value under the value variable in the old instance of the class, and then tried to read it from the new instance. And it is naturally not there, since copying occurred before saving. Unpleasant. Therefore, we protect ourselves from this trouble by prohibiting constructors and copy and move operators.

Let's continue writing this class. The next method we need is:

    bool await_ready() const noexcept {
        return false;
    }

He answers the question of whether our meaning is ready to be issued. Naturally, at the first call, our value is not yet ready, and in the future no one will ask us about this, so just return false.

The coroutine_handle instance will be passed to us in the void await_suspend method (std :: coroutine_handle <> coro), so let's call our prepared functor in it, passing there also a memory link under value:

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(forwardCoroToCallback, std::ref(value), coro);
    }

The result of the function execution will be asked at the right time by calling the await_resume method. We will not refuse to the requestor:

    int await_resume() noexcept {
        return value;
    }

Now our method can be called using the co_await keyword:

const int a = co_await actor.abActor.getAAsync(actor);

What will happen here, we are already roughly representing.

First, an object of type ActorAwaiterSimple will be created, which will be transferred to the "input" of co_await. He will first ask (by calling await_ready) whether we accidentally have a finished result (we don’t have), then call await_suspend, passing in a context (in fact, a pointer to the current coroutine stack frame) and interrupt execution.

In the future, when the ABActor actor completes its work and calls the result callback, this result (already in the Worker thread thread) will be saved in the only (remaining on the coroutine stack) instance of ActorAwaiterSimple and the continuation of the coroutine will start.

Corutin will continue execution, take the saved result by calling the await_resume method, and pass this result to the variable a

At the moment, the limitation of the current Awaiter is that it can only work with callbacks with one parameter of type int. Let's try to expand the application of Awaiter:

template<typename... T>
struct ActorAwaiter {

    std::tuple<T...> values;

    std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;

    ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
        : storeHandler(storeHandler)
    {}

    ActorAwaiter(const ActorAwaiter &) = delete;
    ActorAwaiter& operator=(const ActorAwaiter &) = delete;
    ActorAwaiter(ActorAwaiter &&) = delete;
    ActorAwaiter& operator=(ActorAwaiter &&) = delete;

    bool await_ready() const noexcept {
        return false;
    }

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(storeHandler, std::ref(values), coro);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
    >
    void await_resume() noexcept {

    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
    >
    auto await_resume() noexcept {
        return std::get<0>(values);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
    >
    std::tuple<T...> await_resume() noexcept {
        return values;
    }
};

Here we use std :: tuple in order to be able to save several variables at once.

Sfinae is imposed on the await_resume method so that it is possible not to return a tuple in all cases, but depending on the number of values ​​lying in the tuple, return void, exactly 1 argument or the whole tuple.

The wrappers for creating Awaiter itself now look like this:

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
    return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
        auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
            values = std::make_tuple(std::forward<ReturnArgs>(result)...);
            std::invoke(coro);
        });
        func(callback);
    };
}

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
    const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
    return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}

ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}

ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
    return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}

Now let's figure out how to use the created type directly in coroutine.

Coroutines. Part 2. Resumable


From the point of view of C ++, a function that contains the words co_await, co_yield or co_return is considered to be coroutine. But also such a function should return a certain type. We agreed that we will not change the signature of the functions (here I mean that the return type also refers to the signature), so we will have to get out of it somehow.

Let's create a lambda coroutine and call it from our function:

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

(Why not capture this in the capture-list of lambdas? Then all the code inside would come out a little easier. But it so happened that, apparently, the lambda-coroutines in the compiler are not yet fully supported, so this code will not work.)

As you can see, our the scary callback code has now turned into a pretty nice linear code. All that remains for us is to invent the ActorResumable class.

Let's look at it.

struct ActorResumable {
    struct promise_type {
        using coro_handle = std::coroutine_handle<promise_type>;

        auto get_return_object() { 
            //  ,    ActorResumable   promise_type
            return coro_handle::from_promise(*this);
        }

        auto initial_suspend() {
            //      
            return std::suspend_never();
        }

        auto final_suspend() {
            //      . 
            // ,     
            return std::suspend_never();
        }

        void unhandled_exception() {
            //   ,       
            std::terminate();
        }
    };

    ActorResumable(std::coroutine_handle<promise_type>) {}
};

The pseudocode of the generated corutin from our lambda looks something like this:

ActorResumable coro() {
    promise_type promise;
    ActorResumable retobj = promise.get_return_object();
    auto intial_suspend = promise.initial_suspend();
    if (initial_suspend == std::suspend_always)  {
          // yield
    }
    try { 
        //  .
        const int a = co_await actor.abActor.getAAsync(actor);
        std::cout << "Result " << a << std::endl;
    } catch(...) { 
        promise.unhandled_exception();
    }
final_suspend:
    auto final_suspend = promise.final_suspend();
    if (final_suspend == std::suspend_always)  {
         // yield
    } else {
         cleanup();
    }

This is just pseudo code, some things are intentionally simplified. Nonetheless, let's see what happens.

First we create a promise and ActorResumable.

After initial_suspend () we do not pause, but move on. We begin to execute the main part of the program.

When we get to co_await, we understand that we need to pause. We have already examined this situation in the previous section, you can return to it and review it.

After we continued the execution and displayed the result on the screen, the coroutine execution ends. We check final_suspend, and clear the entire context of the coroutine.

Coroutines. Part 3. Task


Let's remember what stage we have now reached.

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

It looks nice, but it's easy to see that the code:

        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);

repeated 2 times. Is it possible to refactor this moment and put it into a separate function?

Let's sketch out how this might look:

CoroTask<std::pair<int, int>> WokrerActor::readAB() {
    const int a = co_await abActor.getAAsync2(*this);
    const int b = co_await abActor.getBAsync2(*this);
    co_return std::make_pair(a, b);
}

void WokrerActor::workCoroProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const auto [a, b] = co_await actor.readAB();
        co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
        const auto [newA, newB] = co_await actor.readAB();
        std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
    };

    coroutine(*this);
}

All that remains for us is to invent the CoroTask type. Let's think it over. First, co_return is used inside the readAB function, which means that CoroTask must satisfy the Resumable interface. But also, an object of this class is used to input co_await of another coroutine. This means that the CoroTask class must also satisfy the Awaitable interface. Let's implement both of these interfaces in the CoroTask class:

template <typename T = void>
struct CoroTask {
    struct promise_type {
        T result;
        std::coroutine_handle<> waiter;

        auto get_return_object() {
            return CoroTask{*this};
        }

        void return_value(T value) {
            result = value;
        }

        void unhandled_exception() {
            std::terminate();
        }

        std::suspend_always initial_suspend() {
            return {};
        }

        auto final_suspend() {
            struct final_awaiter {
                bool await_ready() {
                    return false;
                }
                void await_resume() {}
                auto await_suspend(std::coroutine_handle<promise_type> me) {
                    return me.promise().waiter;
                }
            };
            return final_awaiter{};
        }
    };

    CoroTask(CoroTask &&) = delete;
    CoroTask& operator=(CoroTask&&) = delete;
    CoroTask(const CoroTask&) = delete;
    CoroTask& operator=(const CoroTask&) = delete;

    ~CoroTask() {
        if (h) {
            h.destroy();
        }
    }

    explicit CoroTask(promise_type & p)
        : h(std::coroutine_handle<promise_type>::from_promise(p))
    {}

    bool await_ready() {
        return false;
    }

    T await_resume() {
        auto &result = h.promise().result;
        return result;
    }

    void await_suspend(std::coroutine_handle<> waiter) {
        h.promise().waiter = waiter;
        h.resume();
    }
private:
    std::coroutine_handle<promise_type> h;
};

(I strongly recommend opening the background image of this post. In the future, this will greatly help you.)

So, let's see what happens here.

1. Go to the lambda coroutine and immediately create the WokrerActor :: readAB coroutine. But after creating this coroutine, we don’t start executing it (initial_suspend == suspend_always), which forces us to interrupt and return to the coroutine lambda.

2. co_await lambda checks to see if readAB is ready. The result is not ready (await_ready == false), which forces it to pass its context to the CoroTask :: await_suspend method. This context is saved in CoroTask, and the resume of readAB

3 coroutines is launched . After readAB coroutine has completed all the necessary actions, it reaches the line:

co_return std::make_pair(a, b);

as a result, the CoroTask :: promise_type :: return_value method is called and inside the CoroTask :: promise_type method the created pair of numbers

4 is saved . Since the co_return method was called, the execution of the coroutine comes to an end, which means it's time to call the CoroTask :: promise_type :: final_suspend method . This method returns a self-written structure (do not forget to look at the picture), which forces you to call the final_awaiter :: await_suspend method, which returns the coroutine lambda context stored in step 2.

Why couldn’t we just return suspend_always here? After all, in the case of initial_suspend of this class, did we succeed? The fact is that in initial_suspend we succeeded because this coroutine was called by our lambda coroutine, and we returned to it. But at the moment when we reached the final_suspend call, our coroutine most likely continued from another stack (specifically, from the lambda that the makeCoroCallback function prepared), and if we returned suspend_always here, we would return to it, and not to the workCoroProcess method.

5. Since the final_awaiter :: await_suspend method returned the context to us, this forces the program to continue executing the returned context, that is, the coroutine lambda. Since the execution returned to the point:

const auto [a, b] = co_await actor.readAB();

then we need to isolate the saved result by calling the CoroTask :: await_resume method. The result is received, passed to the variables a and b, and now the CoroTask instance is destroyed.

6. The CoroTask instance was destroyed, but what happened to the WokrerActor :: readAB context? If we from CoroTask :: promise_type :: final_suspend would return suspend_never (more precisely, would return that to the await_ready question would return true), then at that moment the coroutine context would be cleaned up. But since we did not, the obligation to clear the context is transferred to us. We will clear this context in the CoroTask destructor, at this point it is already safe.

7. The coroutine readAB is executed, the result is obtained from it, the context is cleared, the lambda coroutine continues to run ...

Phew, sort of sorted it out. Do you remember that from the methods ABActor :: getAAsync () and the like, we return a self-written structure? In fact, the getAAsync method can also be turned into a coroutine by combining the knowledge gained from the implementation of the CoroTask and ActorAwaiter classes and getting something like:

CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
    co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

but this I will leave for self-analysis.

findings


As you can see, with the help of coroutine, you can pretty well linearize the asynchronous callback code. True, the process of writing auxiliary types and functions does not seem too intuitive yet.

All code is available in the repository. I

also recommend that you look at these lectures for a more complete immersion in the topic
.
A large number of examples on the topic of coroutine from the same author are here .
And you can also watch this lecture.

All Articles