Asynchronen Code mit Corutin linearisieren

Bild

Sie können nicht nur Coroutine zum Erstellen von Generatoren verwenden, sondern auch versuchen, vorhandenen asynchronen Code zu linearisieren. Versuchen wir dies anhand eines kleinen Beispiels. Nehmen Sie den auf dem Actor-Framework geschriebenen Code und schreiben Sie eine Funktion dieses Codes auf Coroutinen neu. Um das Projekt zu erstellen, verwenden wir gcc aus dem Coroutinen- Zweig .

Unser Ziel ist es, Rückrufe von Nudeln zu erhalten:

    abActor.getA(ABActor::GetACallback([this](int a) {
        abActor.getB(ABActor::GetBCallback([a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
                abActor.getA(ABActor::GetACallback([this](int a) {
                    abActor.getB(ABActor::GetBCallback([a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Art von:

const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;

Also lasst uns anfangen.

Schauspieler


Zunächst müssen wir ein einfaches Schauspieler-Framework erstellen. Das Erstellen eines vollwertigen Rahmens für Schauspieler ist eine schwierige und große Aufgabe, daher implementieren wir nur eine Art davon.
Erstellen Sie zunächst eine Basisklasse:

class Actor {
public:
    using Task = std::function<void()>;
public:
    virtual ~Actor();
public:
    void addTask(const Task &task);
    void tryRunTask();
private:
    std::queue<Task> queue;
    mutable std::mutex mutex;
};

Die Idee ist im Grunde einfach: Wir stellen Aufgaben, die funktionale Objekte sind, in eine Warteschlange, und wenn wir RunTask versuchen, versuchen wir, diese Aufgabe abzuschließen. Die Implementierung der Klasse bestätigt unsere Absichten:

Actor::~Actor() = default;

void Actor::addTask(const Task &task) {
    std::lock_guard lock(mutex);
    queue.push(task);
}

void Actor::tryRunTask() {
    std::unique_lock lock(mutex);
    if (queue.empty()) {
        return;
    }

    const Task task = queue.front();
    queue.pop();
    lock.unlock();

    std::invoke(task);
}

Die nächste Klasse ist der "Faden", zu dem unsere Schauspieler gehören werden:

class Actor;

class ActorThread {
public:
    ~ActorThread();
public:
    void addActor(Actor &actor);
    void run();
private:
    std::vector<std::reference_wrapper<Actor>> actors;
};

Auch hier ist alles einfach: Ganz am Anfang des Programms „binden“ wir unsere Akteure mit der addActor-Methode an den Thread und starten den Thread dann mit der run-Methode.

ActorThread::~ActorThread() = default;

void ActorThread::addActor(Actor &actor) {
    actors.emplace_back(actor);
}

void ActorThread::run() {
    while (true) {
        for (Actor &actor: actors) {
            actor.tryRunTask();
        }
    }
}

Wenn wir den Thread starten, treten wir in eine Endlosschleife ein und versuchen, von jedem Akteur eine Aufgabe auszuführen. Nicht die beste Lösung, aber für eine Demonstration.

Schauen wir uns nun den Vertreter der Schauspielerklasse an:

class ABActor: public Actor {
public:
    using GetACallback = Callback<void(int result)>;
    using GetBCallback = Callback<void(int result)>;
    using SaveABCallback = Callback<void()>;
public:
    void getA(const GetACallback &callback);
    void getB(const GetBCallback &callback);
    void saveAB(int a, int b, const SaveABCallback &callback);
private:
    void getAProcess(const GetACallback &callback);
    void getBProcess(const GetBCallback &callback);
    void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
    int a = 10;
    int b = 20;
};

Diese Klasse speichert zwei Zahlen in sich selbst - a und b - und gibt auf Anfrage ihre Werte zurück oder überschreibt sie.

Als Rückruf akzeptiert es ein Funktionsobjekt mit den notwendigen Parametern. Aber achten wir darauf, dass verschiedene Akteure in verschiedenen Threads gestartet werden können. Wenn wir am Ende der Arbeit nur den an die Methode übergebenen Rückruf aufrufen, wird dieser Rückruf im aktuellen ausführbaren Thread aufgerufen und nicht in dem Thread, der unsere Methode aufgerufen und diesen Rückruf erstellt hat. Daher müssen wir über den Rückruf einen Wrapper erstellen, der diese Situation löst:

template<typename C>
class Callback {
public:
    template<typename Functor>
    Callback(Actor &sender, const Functor &callback)
        : sender(sender)
        , callback(callback)
    {}
public:
    template<typename ...Args>
    void operator() (Args&& ...args) const {
        sender.addTask(std::bind(callback, std::forward<Args>(args)...));
    }
private:
    Actor &sender;
    std::function<C> callback;
};

Dieser Wrapper merkt sich den ursprünglichen Akteur, und wenn Sie versuchen, sich selbst auszuführen, fügt er der Aufgabenwarteschlange des ursprünglichen Akteurs einfach einen echten Rückruf hinzu.
Infolgedessen sieht die Implementierung der ABActor-Klasse folgendermaßen aus:

void ABActor::getA(const GetACallback &callback) {
    addTask(std::bind(&ABActor::getAProcess, this, callback));
}

void ABActor::getAProcess(const ABActor::GetACallback &callback) {
    std::invoke(callback, a);
}

void ABActor::getB(const GetBCallback &callback) {
    addTask(std::bind(&ABActor::getBProcess, this, callback));
}

void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
    std::invoke(callback, b);
}

void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
    addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}

void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
    this->a = a;
    this->b = b;
    std::invoke(callback);
}

In der Schnittstellenmethode der Klasse binden wir die übergebenen Argumente einfach an den entsprechenden „Slot“ der Klasse, erstellen so eine Aufgabe und stellen diese Aufgabe in die Warteschlange der Aufgaben dieser Klasse. Wenn der Task-Thread beginnt, die Task auszuführen, ruft er den richtigen „Slot“ auf, der alle erforderlichen Aktionen ausführt, und ruft den Rückruf auf, der wiederum den tatsächlichen Rückruf an die Warteschlange der Aufgabe sendet, die ihn verursacht hat.

Schreiben wir einen Schauspieler, der die ABActor-Klasse verwendet:

class ABActor;

class WokrerActor: public Actor {
public:
    WokrerActor(ABActor &actor)
        : abActor(actor)
    {}
public:
    void work();
private:
    void workProcess();
private:
    ABActor &abActor;
};

void WokrerActor::work() {
    addTask(std::bind(&WokrerActor::workProcess, this));
}

void WokrerActor::workProcess() {
    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        std::cout << "Result " << a << std::endl;
    }));
}

Und alles zusammen:

int main() {
    ABActor abActor;
    WokrerActor workerActor(abActor);

    ActorThread thread;
    thread.addActor(abActor);
    thread.addActor(workerActor);

    workerActor.work();

    thread.run();
}

Folgen wir der gesamten Codekette.

Am Anfang erstellen wir die notwendigen Objekte und stellen Verbindungen zwischen ihnen her.
Anschließend fügen wir die workProcess-Task zur Warteschlange der Actor Worker-Task hinzu.
Wenn der Thread startet, findet er unsere Aufgabe in der Warteschlange und beginnt, sie auszuführen.
Während der Ausführung rufen wir die getA-Methode der ABActor-Klasse auf, stellen die entsprechende Aufgabe in die Warteschlange der ABActor-Klasse und schließen die Ausführung ab.
Als Nächstes nimmt der Thread die neu erstellte Aufgabe aus der ABActor-Klasse und führt sie aus, was zur Ausführung des getAProcess-Codes führt.
Dieser Code ruft einen Rückruf auf und übergibt das erforderliche Argument - die Variable a. Da der Rückruf, den er besitzt, ein Wrapper ist, wird tatsächlich ein echter Rückruf mit gefüllten Parametern in die Warteschlange der Worker-Klasse gestellt.
Und wenn der Thread bei der nächsten Iteration des Zyklus unseren Rückruf von der Worker-Klasse abzieht und ausführt, wird die Ausgabe der Zeile "Ergebnis 10" angezeigt.

Das Actor Framework ist eine recht bequeme Möglichkeit, Klassen, die über verschiedene physische Streams verteilt sind, miteinander zu interagieren. Die Besonderheit des Klassendesigns besteht, wie Sie davon überzeugt sein sollten, darin, dass innerhalb jedes einzelnen Akteurs alle Aktionen vollständig und in einem einzigen Thread ausgeführt werden. Der einzige Punkt für die Synchronisation von Streams liegt in den Implementierungsdetails des Actor Frameworks und ist für den Programmierer nicht sichtbar. Auf diese Weise kann ein Programmierer Single-Threaded-Code schreiben, ohne sich Gedanken über das Einpacken von Mutexen und das Verfolgen von Rennsituationen, Deadlocks und anderen Kopfschmerzen machen zu müssen.

Leider hat diese Lösung einen Preis. Da das Ergebnis der Ausführung eines anderen Akteurs nur über den Rückruf zugänglich ist, wird der Akteurcode früher oder später wie folgt:

    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
                abActor.getA(ABActor::GetACallback(*this, [this](int a) {
                    abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Mal sehen, ob wir dies mit der Innovation von C ++ 20 - Coroutinen vermeiden können.

Aber zuerst werden wir die Einschränkungen spezifizieren.

Natürlich können wir den Code des Actor Frameworks in keiner Weise ändern. Außerdem können wir die Signaturen öffentlicher und privater Methoden von Instanzen der Actor-Klasse - ABActor und WorkerActor - nicht ändern. Mal sehen, ob wir aus dieser Situation herauskommen können.

Coroutinen. Teil 1. Kellner


Die Hauptidee von corutin ist, dass beim Erstellen von coroutine ein separater Stapelrahmen für diesen auf dem Heap erstellt wird, von dem wir jederzeit "beenden" können, während die aktuelle Ausführungsposition, Prozessorregister und andere notwendige Informationen beibehalten werden. Dann können wir auch jederzeit zur Ausführung der suspendierten Coroutine zurückkehren und diese bis zum Ende oder bis zur nächsten Suspendierung abschließen.

Das Objekt std :: coroutine_handle <> ist für die Verwaltung dieser Daten verantwortlich, die im Wesentlichen einen Zeiger auf den Stapelrahmen (und andere erforderliche Daten) darstellen und über eine Wiederaufnahmemethode (oder deren Analogon, den Operator ()) verfügen, die uns zur Ausführung der Coroutine zurückführt .

Basierend auf diesen Daten schreiben wir zuerst die Funktion getAAsync und versuchen dann zu verallgemeinern.

Angenommen, wir haben bereits eine Instanz der Klasse std :: coroutine_handle <> coro. Was müssen wir tun?

Sie müssen die bereits vorhandene Methode ABActor :: getA aufrufen, um die Situation nach Bedarf zu beheben. Zunächst müssen Sie jedoch einen Rückruf für die Methode getA erstellen.

Erinnern wir uns, dass ein Rückruf an den Rückruf der getA-Methode zurückgegeben wird - das Ergebnis der getA-Methode. Darüber hinaus wird dieser Rückruf im Worker-Thread des Threads aufgerufen. Somit können wir von diesem Rückruf aus sicher die Coroutine ausführen, die nur aus dem Worker-Thread erstellt wurde und die weiterhin ihre Abfolge von Aktionen ausführt. Aber wir müssen auch irgendwo das im Rückruf zurückgegebene Ergebnis speichern, es wird uns natürlich weiter nützlich sein.

auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
        value = result;
        std::invoke(coro);
 });
getA(callback);

Jetzt müssen Sie eine Instanz des coroutine_handle-Objekts von irgendwoher nehmen und einen Link, über den Sie unser Ergebnis speichern können.

In Zukunft werden wir sehen, dass coroutine_handle als Ergebnis des Aufrufs der Funktion an uns übergeben wird. Dementsprechend können wir es nur an eine andere Funktion übergeben. Bereiten wir diese Funktion als Lambda vor. (Wir werden den Link an die Variable übergeben, in der das Ergebnis des Rückrufs an das Unternehmen gespeichert wird.)

auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
    auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
        value = result;
        std::invoke(coro);
    });
    getA(callback);
};

Wir werden diese Funktion in der nächsten Klasse speichern.

struct ActorAwaiterSimple {
    int value;

    std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;

    ActorAwaiterSimple(
        const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
    )
        : forwardCoroToCallback(forwardCoroToCallback)
    {}

    ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
    ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;

// ...

Neben dem Funktionsobjekt wird hier auch der Speicher (in Form des Variablenwerts) für den Wert gespeichert, der uns im Rückruf erwartet.

Da wir den Speicher hier unter dem Wert halten, möchten wir kaum, dass die Instanz dieser Klasse kopiert oder irgendwohin verschoben wird. Stellen Sie sich zum Beispiel vor, jemand hat diese Klasse kopiert, den Wert unter dem Variablenwert in der alten Instanz der Klasse gespeichert und dann versucht, ihn aus der neuen Instanz zu lesen. Und es ist natürlich nicht da, da vor dem Speichern kopiert wurde. Unangenehm. Daher schützen wir uns vor diesen Problemen, indem wir Konstruktoren verbieten und Operatoren kopieren und verschieben.

Lassen Sie uns diese Klasse weiter schreiben. Die nächste Methode, die wir brauchen, ist:

    bool await_ready() const noexcept {
        return false;
    }

Er beantwortet die Frage, ob unsere Bedeutung zur Ausgabe bereit ist. Natürlich ist unser Wert beim ersten Anruf noch nicht fertig, und in Zukunft wird uns niemand mehr danach fragen. Geben Sie also einfach false zurück.

Die coroutine_handle-Instanz wird in der void await_suspend-Methode (std :: coroutine_handle <> coro) an uns übergeben. Rufen wir also unseren vorbereiteten Funktor darin auf und übergeben dort auch eine Speicherverknüpfung unter value:

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(forwardCoroToCallback, std::ref(value), coro);
    }

Das Ergebnis der Funktionsausführung wird zum richtigen Zeitpunkt durch Aufrufen der Methode await_resume abgefragt. Wir werden dem Antragsteller nicht verweigern:

    int await_resume() noexcept {
        return value;
    }

Jetzt kann unsere Methode mit dem Schlüsselwort co_await aufgerufen werden:

const int a = co_await actor.abActor.getAAsync(actor);

Was hier passieren wird, stellen wir bereits grob dar.

Zunächst wird ein Objekt vom Typ ActorAwaiterSimple erstellt, das an die "Eingabe" von co_await übertragen wird. Er fragt zuerst (indem er await_ready aufruft), ob wir versehentlich ein fertiges Ergebnis haben (wir haben es nicht), ruft dann await_suspend auf, übergibt einen Kontext (tatsächlich einen Zeiger auf den aktuellen Coroutine-Stack-Frame) und unterbricht die Ausführung.

Wenn der ABActor-Akteur in Zukunft seine Arbeit beendet und den Ergebnisrückruf aufruft, wird dieses Ergebnis (bereits im Worker-Thread-Thread) in der einzigen (auf dem Coroutine-Stapel verbleibenden) Instanz von ActorAwaiterSimple gespeichert und die Fortsetzung der Coroutine wird gestartet.

Corutin setzt die Ausführung fort, nimmt das gespeicherte Ergebnis durch Aufrufen der Methode await_resume und übergibt dieses Ergebnis an die Variable a

Derzeit besteht die Einschränkung des aktuellen Kellners darin, dass er nur mit Rückrufen mit einem Parameter vom Typ int arbeiten kann. Versuchen wir, die Anwendung von Awaiter zu erweitern:

template<typename... T>
struct ActorAwaiter {

    std::tuple<T...> values;

    std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;

    ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
        : storeHandler(storeHandler)
    {}

    ActorAwaiter(const ActorAwaiter &) = delete;
    ActorAwaiter& operator=(const ActorAwaiter &) = delete;
    ActorAwaiter(ActorAwaiter &&) = delete;
    ActorAwaiter& operator=(ActorAwaiter &&) = delete;

    bool await_ready() const noexcept {
        return false;
    }

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(storeHandler, std::ref(values), coro);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
    >
    void await_resume() noexcept {

    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
    >
    auto await_resume() noexcept {
        return std::get<0>(values);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
    >
    std::tuple<T...> await_resume() noexcept {
        return values;
    }
};

Hier verwenden wir std :: tuple, um mehrere Variablen gleichzeitig speichern zu können.

Sfinae wird der Methode await_resume auferlegt, damit nicht in allen Fällen ein Tupel zurückgegeben werden kann. Abhängig von der Anzahl der im Tupel liegenden Werte wird void, genau 1 Argument oder das gesamte Tupel zurückgegeben.

Die Wrapper zum Erstellen von Awaiter selbst sehen jetzt folgendermaßen aus:

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
    return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
        auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
            values = std::make_tuple(std::forward<ReturnArgs>(result)...);
            std::invoke(coro);
        });
        func(callback);
    };
}

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
    const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
    return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}

ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}

ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
    return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}

Lassen Sie uns nun herausfinden, wie der erstellte Typ direkt in Coroutine verwendet wird.

Coroutinen. Teil 2. Wiederaufnahmefähig


Aus Sicht von C ++ wird eine Funktion, die die Wörter co_await, co_yield oder co_return enthält, als Coroutine betrachtet. Aber auch eine solche Funktion sollte einen bestimmten Typ zurückgeben. Wir waren uns einig, dass wir die Signatur der Funktionen nicht ändern werden (hier meine ich, dass sich der Rückgabetyp auch auf die Signatur bezieht), also müssen wir irgendwie raus.

Lassen Sie uns eine Lambda-Coroutine erstellen und sie von unserer Funktion aus aufrufen:

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

(Warum erfassen Sie dies nicht in der Lambda-Erfassungsliste? Dann würde der gesamte darin enthaltene Code etwas einfacher herauskommen. Es kam jedoch vor, dass die Lambda-Coroutinen im Compiler anscheinend noch nicht vollständig unterstützt werden, sodass dieser Code nicht funktioniert.)

Wie Sie sehen können, funktioniert unser Code Der beängstigende Rückrufcode hat sich jetzt in einen hübschen linearen Code verwandelt. Wir müssen nur noch die ActorResumable-Klasse erfinden. Schauen

wir uns das an.

struct ActorResumable {
    struct promise_type {
        using coro_handle = std::coroutine_handle<promise_type>;

        auto get_return_object() { 
            //  ,    ActorResumable   promise_type
            return coro_handle::from_promise(*this);
        }

        auto initial_suspend() {
            //      
            return std::suspend_never();
        }

        auto final_suspend() {
            //      . 
            // ,     
            return std::suspend_never();
        }

        void unhandled_exception() {
            //   ,       
            std::terminate();
        }
    };

    ActorResumable(std::coroutine_handle<promise_type>) {}
};

Der Pseudocode des aus unserem Lambda erzeugten Corutins sieht ungefähr so ​​aus:

ActorResumable coro() {
    promise_type promise;
    ActorResumable retobj = promise.get_return_object();
    auto intial_suspend = promise.initial_suspend();
    if (initial_suspend == std::suspend_always)  {
          // yield
    }
    try { 
        //  .
        const int a = co_await actor.abActor.getAAsync(actor);
        std::cout << "Result " << a << std::endl;
    } catch(...) { 
        promise.unhandled_exception();
    }
final_suspend:
    auto final_suspend = promise.final_suspend();
    if (final_suspend == std::suspend_always)  {
         // yield
    } else {
         cleanup();
    }

Dies ist nur Pseudocode, einige Dinge werden absichtlich vereinfacht. Mal sehen, was passiert.

Zuerst erstellen wir ein Versprechen und ActorResumable.

Nach initial_suspend () machen wir keine Pause, sondern fahren fort. Wir beginnen mit der Durchführung des Hauptteils des Programms.

Wenn wir zu co_await kommen, verstehen wir, dass wir innehalten müssen. Wir haben diese Situation bereits im vorherigen Abschnitt untersucht. Sie können darauf zurückkommen und sie überprüfen.

Nachdem wir die Ausführung fortgesetzt und das Ergebnis auf dem Bildschirm angezeigt haben, endet die Coroutine-Ausführung. Wir überprüfen final_suspend und löschen den gesamten Kontext der Coroutine.

Coroutinen. Teil 3. Aufgabe


Erinnern wir uns, welches Stadium wir jetzt erreicht haben.

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

Es sieht gut aus, aber es ist leicht zu erkennen, dass der Code:

        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);

2 mal wiederholt. Ist es möglich, diesen Moment umzugestalten und in eine separate Funktion zu stellen?

Lassen Sie uns skizzieren, wie dies aussehen könnte:

CoroTask<std::pair<int, int>> WokrerActor::readAB() {
    const int a = co_await abActor.getAAsync2(*this);
    const int b = co_await abActor.getBAsync2(*this);
    co_return std::make_pair(a, b);
}

void WokrerActor::workCoroProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const auto [a, b] = co_await actor.readAB();
        co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
        const auto [newA, newB] = co_await actor.readAB();
        std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
    };

    coroutine(*this);
}

Wir müssen nur noch den CoroTask-Typ erfinden. Lass es uns überlegen. Zunächst wird co_return in der Funktion readAB verwendet, was bedeutet, dass CoroTask die Schnittstelle Resumable erfüllen muss. Ein Objekt dieser Klasse wird aber auch verwendet, um co_await einer anderen Coroutine einzugeben. Dies bedeutet, dass die CoroTask-Klasse auch die Awaitable-Schnittstelle erfüllen muss. Implementieren wir diese beiden Schnittstellen in der CoroTask-Klasse:

template <typename T = void>
struct CoroTask {
    struct promise_type {
        T result;
        std::coroutine_handle<> waiter;

        auto get_return_object() {
            return CoroTask{*this};
        }

        void return_value(T value) {
            result = value;
        }

        void unhandled_exception() {
            std::terminate();
        }

        std::suspend_always initial_suspend() {
            return {};
        }

        auto final_suspend() {
            struct final_awaiter {
                bool await_ready() {
                    return false;
                }
                void await_resume() {}
                auto await_suspend(std::coroutine_handle<promise_type> me) {
                    return me.promise().waiter;
                }
            };
            return final_awaiter{};
        }
    };

    CoroTask(CoroTask &&) = delete;
    CoroTask& operator=(CoroTask&&) = delete;
    CoroTask(const CoroTask&) = delete;
    CoroTask& operator=(const CoroTask&) = delete;

    ~CoroTask() {
        if (h) {
            h.destroy();
        }
    }

    explicit CoroTask(promise_type & p)
        : h(std::coroutine_handle<promise_type>::from_promise(p))
    {}

    bool await_ready() {
        return false;
    }

    T await_resume() {
        auto &result = h.promise().result;
        return result;
    }

    void await_suspend(std::coroutine_handle<> waiter) {
        h.promise().waiter = waiter;
        h.resume();
    }
private:
    std::coroutine_handle<promise_type> h;
};

(Ich empfehle dringend, das Hintergrundbild dieses Beitrags zu öffnen. In Zukunft wird dies Ihnen sehr helfen.)

Lassen Sie uns also sehen, was hier passiert.

1. Gehen Sie zur Lambda-Coroutine und erstellen Sie sofort die WokrerActor :: readAB-Coroutine. Nachdem wir diese Coroutine erstellt haben, beginnen wir nicht mit der Ausführung (initial_suspend == suspend_always), was uns zwingt, die Coroutine Lambda zu unterbrechen und zurückzukehren.

2. co_await lambda prüft, ob readAB bereit ist. Das Ergebnis ist nicht bereit (await_ready == false), wodurch es gezwungen wird, seinen Kontext an die CoroTask :: await_suspend-Methode zu übergeben. Dieser Kontext wird in CoroTask gespeichert und die Wiederaufnahme der readAB

3- Coroutinen wird gestartet . Nachdem die readAB-Coroutine alle erforderlichen Aktionen ausgeführt hat, erreicht sie die folgende Zeile:

co_return std::make_pair(a, b);

Infolgedessen wird die CoroTask :: Versprechen_Typ :: Rückgabewert-Methode aufgerufen und das erstellte Zahlenpaar in CoroTask :: Versprechen_Typ

4 gespeichert. Da die Co_Return-Methode aufgerufen wurde, wird die Ausführung der Coroutine beendet, was bedeutet, dass es Zeit ist, die CoroTask :: Versprechen_Typ :: Final_Suspend-Methode aufzurufen . Diese Methode gibt eine selbstgeschriebene Struktur zurück (vergessen Sie nicht, das Bild zu betrachten), die Sie zwingt, die Methode final_awaiter :: await_suspend aufzurufen, von der der in Schritt 2 gespeicherte Coroutine-Lambda-Kontext zurückgegeben wird.

Warum können wir suspend_always nicht einfach hierher zurückgeben? Haben wir im Fall von initial_suspend dieser Klasse Erfolg gehabt? Tatsache ist, dass wir in initial_suspend erfolgreich waren, weil diese Coroutine von unserer Lambda-Coroutine aufgerufen wurde und wir dorthin zurückgekehrt sind. Aber in dem Moment, als wir den Aufruf final_suspend erreichten, wurde unsere Coroutine höchstwahrscheinlich von einem anderen Stapel aus fortgesetzt (insbesondere von dem Lambda, das die Funktion makeCoroCallback vorbereitet hat), und wenn wir suspend_always hier zurückgeben würden, würden wir zu diesem und nicht zur workCoroProcess-Methode zurückkehren.

5. Da die Methode final_awaiter :: await_suspend den Kontext an uns zurückgegeben hat, wird das Programm gezwungen, den zurückgegebenen Kontext, dh das Coroutine-Lambda, weiter auszuführen. Da die Ausführung auf den Punkt zurückgekehrt ist:

const auto [a, b] = co_await actor.readAB();

Dann müssen wir das gespeicherte Ergebnis isolieren, indem wir die CoroTask :: await_resume-Methode aufrufen. Das Ergebnis wird empfangen, an die Variablen a und b übergeben, und jetzt wird die CoroTask-Instanz zerstört.

6. Die CoroTask-Instanz wurde zerstört, aber was ist mit dem WokrerActor :: readAB-Kontext passiert? Wenn wir von CoroTask :: versprechen_Typ :: final_suspend suspend_never zurückgeben würden (genauer gesagt, würde dies auf die Frage await_ready zurückgeben würde true zurückgeben), würde in diesem Moment der Coroutine-Kontext bereinigt. Da wir dies jedoch nicht getan haben, wird die Verpflichtung zur Klärung des Kontexts auf uns übertragen. Wir werden diesen Kontext im CoroTask-Destruktor löschen. Zu diesem Zeitpunkt ist er bereits sicher.

7. Die Coroutine readAB wird ausgeführt, das Ergebnis wird daraus erhalten, der Kontext wird gelöscht, die Ausführung der Lambda-Coroutine wird fortgesetzt ...

Puh, habe es irgendwie geklärt. Erinnern Sie sich, dass wir mit den Methoden ABActor :: getAAsync () und dergleichen eine selbstgeschriebene Struktur zurückgeben? Tatsächlich kann die getAAsync-Methode auch in eine Coroutine umgewandelt werden, indem das aus der Implementierung der Klassen CoroTask und ActorAwaiter gewonnene Wissen kombiniert und wie folgt abgerufen wird:

CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
    co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

aber das werde ich zur Selbstanalyse lassen.

Ergebnisse


Wie Sie sehen können, können Sie mit Hilfe von Coroutine den asynchronen Rückrufcode ziemlich gut linearisieren. Der Prozess des Schreibens von Hilfstypen und -funktionen scheint zwar noch nicht allzu intuitiv zu sein.

Der gesamte Code ist im Repository verfügbar . Ich

empfehle außerdem, dass Sie sich diese Vorlesungen ansehen, um ein vollständigeres Eintauchen in das Thema zu erhalten
.
Eine Vielzahl von Beispielen zum Thema Coroutine desselben Autors finden Sie hier .
Und Sie können diesen Vortrag auch sehen .

All Articles