Melinierisasi kode asinkron dengan corutin

gambar

Selain menggunakan coroutine untuk membuat generator, Anda dapat mencoba menggunakannya untuk membuat linierisasi kode asinkron yang ada. Mari kita coba lakukan ini dengan contoh kecil. Ambil kode yang ditulis pada kerangka aktor dan tulis ulang satu fungsi kode ini di coroutine. Untuk membangun proyek, kita akan menggunakan gcc dari cabang coroutines .

Tujuan kami adalah mendapatkan panggilan balik dari mie:

    abActor.getA(ABActor::GetACallback([this](int a) {
        abActor.getB(ABActor::GetBCallback([a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
                abActor.getA(ABActor::GetACallback([this](int a) {
                    abActor.getB(ABActor::GetBCallback([a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Semacam:

const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;

Jadi mari kita mulai.

Aktor


Untuk memulainya, kita perlu membuat kerangka aktor sederhana. Membuat kerangka aktor penuh adalah tugas yang sulit dan besar, jadi kami hanya mengimplementasikannya.
Pertama, buat kelas dasar:

class Actor {
public:
    using Task = std::function<void()>;
public:
    virtual ~Actor();
public:
    void addTask(const Task &task);
    void tryRunTask();
private:
    std::queue<Task> queue;
    mutable std::mutex mutex;
};

Idenya pada dasarnya sederhana: kami menempatkan tugas yang merupakan objek fungsional dalam antrian, dan ketika kami mencobaRunTask, kami mencoba menyelesaikan tugas ini. Implementasi kelas menegaskan niat kami:

Actor::~Actor() = default;

void Actor::addTask(const Task &task) {
    std::lock_guard lock(mutex);
    queue.push(task);
}

void Actor::tryRunTask() {
    std::unique_lock lock(mutex);
    if (queue.empty()) {
        return;
    }

    const Task task = queue.front();
    queue.pop();
    lock.unlock();

    std::invoke(task);
}

Kelas berikutnya adalah "utas" di mana aktor kita akan berada:

class Actor;

class ActorThread {
public:
    ~ActorThread();
public:
    void addActor(Actor &actor);
    void run();
private:
    std::vector<std::reference_wrapper<Actor>> actors;
};

Semuanya juga sederhana di sini: di awal program, kami “mengikat” aktor kami ke utas menggunakan metode addActor, dan kemudian kami memulai utas menggunakan metode jalankan.

ActorThread::~ActorThread() = default;

void ActorThread::addActor(Actor &actor) {
    actors.emplace_back(actor);
}

void ActorThread::run() {
    while (true) {
        for (Actor &actor: actors) {
            actor.tryRunTask();
        }
    }
}

Saat memulai utas, kami memasuki loop tak terbatas dan mencoba melakukan satu tugas dari masing-masing aktor. Bukan solusi terbaik, tetapi akan dilakukan untuk demonstrasi.

Sekarang mari kita lihat perwakilan dari kelas aktor:

class ABActor: public Actor {
public:
    using GetACallback = Callback<void(int result)>;
    using GetBCallback = Callback<void(int result)>;
    using SaveABCallback = Callback<void()>;
public:
    void getA(const GetACallback &callback);
    void getB(const GetBCallback &callback);
    void saveAB(int a, int b, const SaveABCallback &callback);
private:
    void getAProcess(const GetACallback &callback);
    void getBProcess(const GetBCallback &callback);
    void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
    int a = 10;
    int b = 20;
};

Kelas ini menyimpan 2 angka dalam dirinya sendiri - a dan b, dan, atas permintaan, mengembalikan nilai mereka atau menimpanya.

Sebagai panggilan balik, ia menerima objek fungsional dengan parameter yang diperlukan. Tapi mari kita perhatikan fakta bahwa aktor yang berbeda dapat diluncurkan di utas yang berbeda. Dan oleh karena itu, jika pada akhir pekerjaan kami hanya memanggil callback yang diteruskan ke metode, panggilan balik ini akan dipanggil di utas yang dapat dieksekusi saat ini, dan bukan di utas yang memanggil metode kami dan membuat panggilan balik ini. Karena itu, kita perlu membuat pembungkus atas callback yang akan menyelesaikan situasi ini:

template<typename C>
class Callback {
public:
    template<typename Functor>
    Callback(Actor &sender, const Functor &callback)
        : sender(sender)
        , callback(callback)
    {}
public:
    template<typename ...Args>
    void operator() (Args&& ...args) const {
        sender.addTask(std::bind(callback, std::forward<Args>(args)...));
    }
private:
    Actor &sender;
    std::function<C> callback;
};

Wrapper ini mengingat aktor asli, dan ketika Anda mencoba mengeksekusi diri Anda sendiri, ia hanya menambahkan panggilan balik nyata ke antrian tugas aktor asli.
Akibatnya, implementasi kelas ABActor terlihat seperti ini:

void ABActor::getA(const GetACallback &callback) {
    addTask(std::bind(&ABActor::getAProcess, this, callback));
}

void ABActor::getAProcess(const ABActor::GetACallback &callback) {
    std::invoke(callback, a);
}

void ABActor::getB(const GetBCallback &callback) {
    addTask(std::bind(&ABActor::getBProcess, this, callback));
}

void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
    std::invoke(callback, b);
}

void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
    addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}

void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
    this->a = a;
    this->b = b;
    std::invoke(callback);
}

Dalam metode antarmuka kelas, kami hanya mengikat argumen yang diteruskan ke "slot" yang sesuai kelas, sehingga membuat tugas, dan menempatkan tugas ini dalam antrian tugas kelas ini. Ketika utas tugas mulai melakukan tugas, dengan demikian ia akan memanggil "slot" yang benar, yang akan melakukan semua tindakan yang diperlukan dan memanggil panggilan balik, yang pada gilirannya akan mengirim panggilan balik nyata ke antrian tugas yang menyebabkannya.

Mari kita menulis aktor yang akan menggunakan kelas ABActor:

class ABActor;

class WokrerActor: public Actor {
public:
    WokrerActor(ABActor &actor)
        : abActor(actor)
    {}
public:
    void work();
private:
    void workProcess();
private:
    ABActor &abActor;
};

void WokrerActor::work() {
    addTask(std::bind(&WokrerActor::workProcess, this));
}

void WokrerActor::workProcess() {
    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        std::cout << "Result " << a << std::endl;
    }));
}

Dan kumpulkan semuanya:

int main() {
    ABActor abActor;
    WokrerActor workerActor(abActor);

    ActorThread thread;
    thread.addActor(abActor);
    thread.addActor(workerActor);

    workerActor.work();

    thread.run();
}

Mari kita ikuti seluruh rantai kode.

Pada awalnya, kami membuat objek yang diperlukan dan membangun koneksi di antara mereka.
Kemudian kita menambahkan tugas workProcess ke antrian tugas Worker aktor.
Ketika utas dimulai, ia akan menemukan tugas kami dalam antrian dan mulai melaksanakannya.
Dalam proses eksekusi, kami memanggil metode getA dari kelas ABActor, dengan demikian menempatkan tugas yang sesuai dalam antrian kelas ABActor, dan menyelesaikan eksekusi.
Selanjutnya, utas akan mengambil tugas yang baru dibuat dari kelas ABActor dan menjalankannya, yang akan mengarah pada pelaksanaan kode getAProcess.
Kode ini akan memanggil panggilan balik, meneruskan argumen yang diperlukan ke dalamnya - variabel a. Tetapi karena panggilan balik yang ia miliki adalah pembungkus, pada kenyataannya, panggilan balik nyata dengan parameter yang diisi akan dimasukkan ke dalam antrian kelas Pekerja.
Dan ketika, pada iterasi siklus berikutnya, utas menarik dan mengeksekusi panggilan balik kita dari kelas Pekerja, kita akan melihat output dari baris “Hasil 10”

Kerangka kerja aktor adalah cara yang cukup mudah untuk berinteraksi kelas yang tersebar di aliran fisik yang berbeda satu sama lain. Keunikan desain kelas, seperti yang seharusnya Anda yakini, adalah bahwa dalam setiap aktor individu semua tindakan dilakukan sepenuhnya dan dalam satu utas. Satu-satunya titik sinkronisasi stream dibuat dalam detail implementasi kerangka aktor dan tidak terlihat oleh programmer. Dengan demikian, seorang programmer dapat menulis kode single-threaded tanpa khawatir tentang membungkus mutex dan melacak situasi balapan, kebuntuan dan sakit kepala lainnya.

Sayangnya, solusi ini ada harganya. Karena hasil mengeksekusi aktor lain hanya dapat diakses dari callback, cepat atau lambat kode aktor berubah menjadi seperti ini:

    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
                abActor.getA(ABActor::GetACallback(*this, [this](int a) {
                    abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

Mari kita lihat apakah kita dapat menghindari ini menggunakan inovasi C ++ 20 - coroutine.

Tapi pertama-tama, kami akan menentukan batasannya.

Secara alami, kita sama sekali tidak dapat mengubah kode kerangka aktor. Selain itu, kami tidak dapat mengubah tanda tangan metode publik dan pribadi untuk instance dari kelas Aktor - ABActor dan WorkerActor. Mari kita lihat apakah kita bisa keluar dari situasi ini.

Coroutine. Bagian 1. Penunggu


Gagasan utama corutin adalah bahwa ketika membuat coroutine, frame stack terpisah dibuat untuk itu di heap, dari mana kita dapat "keluar" kapan saja, sambil mempertahankan posisi eksekusi saat ini, register prosesor dan informasi lain yang diperlukan. Kemudian kita juga dapat kapan saja kembali ke eksekusi coroutine yang ditangguhkan dan menyelesaikannya sampai akhir atau sampai suspensi berikutnya.

Objek std :: coroutine_handle <> bertanggung jawab untuk mengelola data ini, yang pada dasarnya mewakili sebuah pointer ke frame stack (dan data lain yang diperlukan), dan yang memiliki metode resume (atau analognya, operator ()), yang mengembalikan kita ke eksekusi coroutine .

Berdasarkan data ini, pertama mari kita menulis fungsi getAAsync, dan kemudian mencoba untuk menggeneralisasi.

Jadi, misalkan kita sudah memiliki instance dari kelas std :: coroutine_handle <> coro, apa yang perlu kita lakukan?

Anda harus memanggil metode yang sudah ada ABActor :: getA, yang akan menyelesaikan situasi sesuai kebutuhan, tetapi pertama-tama Anda harus membuat panggilan balik untuk metode getA.

Mari kita ingat bahwa panggilan balik dikembalikan ke metode panggilan balik getA - hasil dari metode getA. Selain itu, panggilan balik ini disebut dalam utas Pekerja utas. Dengan demikian, dari panggilan balik ini, kita dapat dengan aman melanjutkan menjalankan coroutine, yang dibuat hanya dari utas Pekerja dan yang akan terus melakukan urutan tindakannya. Tetapi juga, kita harus di suatu tempat menyimpan hasil yang dikembalikan dalam panggilan balik, itu, tentu saja, akan berguna bagi kita lebih lanjut.

auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
        value = result;
        std::invoke(coro);
 });
getA(callback);

Jadi, sekarang Anda perlu mengambil contoh objek coroutine_handle dari suatu tempat dan tautan tempat Anda dapat menyimpan hasil kami.

Di masa depan, kita akan melihat bahwa coroutine_handle diberikan kepada kita sebagai akibat dari memanggil fungsi. Dengan demikian, yang bisa kita lakukan dengannya adalah meneruskannya ke beberapa fungsi lain. Mari kita siapkan fungsi ini sebagai lambda. (Kami akan meneruskan tautan ke variabel tempat hasil panggilan balik akan disimpan ke perusahaan).

auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
    auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
        value = result;
        std::invoke(coro);
    });
    getA(callback);
};

Kami akan menyimpan fungsi ini di kelas berikutnya.

struct ActorAwaiterSimple {
    int value;

    std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;

    ActorAwaiterSimple(
        const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
    )
        : forwardCoroToCallback(forwardCoroToCallback)
    {}

    ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
    ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;

// ...

Selain objek fungsional, kami juga akan menahan di sini memori (dalam bentuk nilai variabel) untuk nilai yang menunggu kami di callback.

Karena kami memegang memori di bawah nilai di sini, kami tidak ingin instance kelas ini disalin atau dipindahkan ke suatu tempat. Bayangkan, misalnya, seseorang menyalin kelas ini, menyimpan nilai di bawah variabel nilai di instance kelas yang lama, dan kemudian mencoba membacanya dari instance baru. Dan tentu saja tidak ada di sana, karena penyalinan terjadi sebelum menyimpan. Tidak menyenangkan. Oleh karena itu, kami melindungi diri dari masalah ini dengan melarang konstruktor dan menyalin dan memindahkan operator.

Mari kita lanjutkan menulis kelas ini. Metode selanjutnya yang kita butuhkan adalah:

    bool await_ready() const noexcept {
        return false;
    }

Dia menjawab pertanyaan apakah makna kita siap untuk dikeluarkan. Secara alami, pada panggilan pertama, nilai kami belum siap, dan di masa depan tidak ada yang akan bertanya kepada kami tentang hal ini, jadi kembalikan saja salah.

Contoh coroutine_handle akan dikirimkan kepada kami dalam metode void await_suspend (std :: coroutine_handle <> coro), jadi mari kita panggil functor kami yang sudah siap di dalamnya, lewat di sana juga ada tautan memori di bawah nilai:

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(forwardCoroToCallback, std::ref(value), coro);
    }

Hasil dari eksekusi fungsi akan ditanyakan pada waktu yang tepat dengan memanggil metode await_resume. Kami tidak akan menolak pemohon:

    int await_resume() noexcept {
        return value;
    }

Sekarang metode kami dapat dipanggil menggunakan kata kunci co_await:

const int a = co_await actor.abActor.getAAsync(actor);

Apa yang akan terjadi di sini, kami sudah mewakili secara kasar.

Pertama, objek bertipe ActorAwaiterSimple akan dibuat, yang akan ditransfer ke "input" dari co_await. Dia pertama-tama akan bertanya (dengan menelepon await_ready) apakah kita secara tidak sengaja memiliki hasil yang selesai (kita tidak punya), kemudian memanggil await_suspend, meneruskan dalam konteks (pada kenyataannya, sebuah penunjuk ke kerangka tumpukan coroutine saat ini) dan menghentikan eksekusi.

Di masa depan, ketika aktor ABActor menyelesaikan pekerjaannya dan memanggil panggilan balik hasil, hasil ini (sudah ada di utas thread Pekerja) akan disimpan dalam satu-satunya (sisa pada tumpukan coroutine) dari ActorAwaiterSimple contoh dan kelanjutan dari coroutine akan dimulai.

Corutin akan melanjutkan eksekusi, mengambil hasil yang disimpan dengan memanggil metode await_resume, dan meneruskan hasil ini ke variabel a

Saat ini, batasan dari Awaiter saat ini adalah bahwa ia hanya dapat bekerja dengan callback dengan satu parameter tipe int. Mari kita coba untuk memperluas aplikasi Awaiter:

template<typename... T>
struct ActorAwaiter {

    std::tuple<T...> values;

    std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;

    ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
        : storeHandler(storeHandler)
    {}

    ActorAwaiter(const ActorAwaiter &) = delete;
    ActorAwaiter& operator=(const ActorAwaiter &) = delete;
    ActorAwaiter(ActorAwaiter &&) = delete;
    ActorAwaiter& operator=(ActorAwaiter &&) = delete;

    bool await_ready() const noexcept {
        return false;
    }

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(storeHandler, std::ref(values), coro);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
    >
    void await_resume() noexcept {

    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
    >
    auto await_resume() noexcept {
        return std::get<0>(values);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
    >
    std::tuple<T...> await_resume() noexcept {
        return values;
    }
};

Di sini kita menggunakan std :: tuple untuk dapat menyimpan beberapa variabel sekaligus.

Sfinae dikenakan pada metode await_resume sehingga dimungkinkan untuk tidak mengembalikan tuple dalam semua kasus, tetapi bergantung pada jumlah nilai yang terletak di tuple, mengembalikan batal, tepat 1 argumen atau seluruh tupel.

Pembungkus untuk membuat Awaiter sendiri sekarang terlihat seperti ini:

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
    return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
        auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
            values = std::make_tuple(std::forward<ReturnArgs>(result)...);
            std::invoke(coro);
        });
        func(callback);
    };
}

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
    const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
    return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}

ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}

ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
    return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}

Sekarang mari kita cari tahu cara menggunakan tipe yang dibuat secara langsung di coroutine.

Coroutine. Bagian 2. Resumable


Dari sudut pandang C ++, fungsi yang berisi kata-kata co_await, co_yield atau co_return dianggap sebagai coroutine. Tetapi juga fungsi seperti itu harus mengembalikan tipe tertentu. Kami sepakat bahwa kami tidak akan mengubah tanda tangan dari fungsi-fungsi (di sini saya maksudkan bahwa tipe pengembalian juga mengacu pada tanda tangan), jadi kami harus keluar darinya entah bagaimana caranya.

Mari kita membuat coroutine lambda dan menyebutnya dari fungsi kita:

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

(Mengapa tidak menangkap ini dalam daftar penangkapan lambda? Maka semua kode di dalamnya akan keluar sedikit lebih mudah. ​​Tetapi kebetulan, tampaknya, lambda-coroutine di kompiler belum sepenuhnya didukung, sehingga kode ini tidak akan berfungsi.)

Seperti yang Anda lihat, kami kode panggilan balik menakutkan sekarang telah berubah menjadi kode linier yang cukup bagus. Yang tersisa bagi kita adalah menciptakan kelas ActorResumable.

Mari kita lihat.

struct ActorResumable {
    struct promise_type {
        using coro_handle = std::coroutine_handle<promise_type>;

        auto get_return_object() { 
            //  ,    ActorResumable   promise_type
            return coro_handle::from_promise(*this);
        }

        auto initial_suspend() {
            //      
            return std::suspend_never();
        }

        auto final_suspend() {
            //      . 
            // ,     
            return std::suspend_never();
        }

        void unhandled_exception() {
            //   ,       
            std::terminate();
        }
    };

    ActorResumable(std::coroutine_handle<promise_type>) {}
};

Kode pseudocor corutin yang dihasilkan dari lambda kami terlihat seperti ini:

ActorResumable coro() {
    promise_type promise;
    ActorResumable retobj = promise.get_return_object();
    auto intial_suspend = promise.initial_suspend();
    if (initial_suspend == std::suspend_always)  {
          // yield
    }
    try { 
        //  .
        const int a = co_await actor.abActor.getAAsync(actor);
        std::cout << "Result " << a << std::endl;
    } catch(...) { 
        promise.unhandled_exception();
    }
final_suspend:
    auto final_suspend = promise.final_suspend();
    if (final_suspend == std::suspend_always)  {
         // yield
    } else {
         cleanup();
    }

Ini hanya kode semu, beberapa hal sengaja disederhanakan. Meskipun demikian, mari kita lihat apa yang terjadi.

Pertama kita buat janji dan ActorResumable.

Setelah initial_suspend () kami tidak berhenti, tetapi beralih. Kami mulai melaksanakan bagian utama dari program ini.

Ketika kita sampai pada co_await, kita memahami bahwa kita perlu berhenti sebentar. Kami telah memeriksa situasi ini di bagian sebelumnya, Anda dapat kembali ke situ dan memeriksanya.

Setelah kami melanjutkan eksekusi dan menampilkan hasilnya di layar, eksekusi coroutine berakhir. Kami memeriksa final_suspend, dan menghapus seluruh konteks coroutine.

Coroutine. Bagian 3. Tugas


Mari kita ingat tahap apa yang sekarang kita capai.

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

Itu terlihat bagus, tetapi mudah untuk melihat bahwa kodenya:

        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);

diulang 2 kali. Apakah mungkin untuk memperbaiki momen ini dan memasukkannya ke dalam fungsi yang terpisah?

Mari kita gambarkan bagaimana tampilannya:

CoroTask<std::pair<int, int>> WokrerActor::readAB() {
    const int a = co_await abActor.getAAsync2(*this);
    const int b = co_await abActor.getBAsync2(*this);
    co_return std::make_pair(a, b);
}

void WokrerActor::workCoroProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const auto [a, b] = co_await actor.readAB();
        co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
        const auto [newA, newB] = co_await actor.readAB();
        std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
    };

    coroutine(*this);
}

Yang tersisa bagi kita adalah menciptakan tipe CoroTask. Mari kita pikirkan. Pertama, co_return digunakan di dalam fungsi readAB, yang berarti CoroTask harus memenuhi antarmuka Resumable. Tetapi juga, objek kelas ini digunakan untuk memasukkan co_await dari coroutine lain. Ini berarti bahwa kelas CoroTask juga harus memenuhi antarmuka yang ditunggu-tunggu. Mari kita terapkan kedua antarmuka ini di kelas CoroTask:

template <typename T = void>
struct CoroTask {
    struct promise_type {
        T result;
        std::coroutine_handle<> waiter;

        auto get_return_object() {
            return CoroTask{*this};
        }

        void return_value(T value) {
            result = value;
        }

        void unhandled_exception() {
            std::terminate();
        }

        std::suspend_always initial_suspend() {
            return {};
        }

        auto final_suspend() {
            struct final_awaiter {
                bool await_ready() {
                    return false;
                }
                void await_resume() {}
                auto await_suspend(std::coroutine_handle<promise_type> me) {
                    return me.promise().waiter;
                }
            };
            return final_awaiter{};
        }
    };

    CoroTask(CoroTask &&) = delete;
    CoroTask& operator=(CoroTask&&) = delete;
    CoroTask(const CoroTask&) = delete;
    CoroTask& operator=(const CoroTask&) = delete;

    ~CoroTask() {
        if (h) {
            h.destroy();
        }
    }

    explicit CoroTask(promise_type & p)
        : h(std::coroutine_handle<promise_type>::from_promise(p))
    {}

    bool await_ready() {
        return false;
    }

    T await_resume() {
        auto &result = h.promise().result;
        return result;
    }

    void await_suspend(std::coroutine_handle<> waiter) {
        h.promise().waiter = waiter;
        h.resume();
    }
private:
    std::coroutine_handle<promise_type> h;
};

(Saya sangat menyarankan membuka gambar latar belakang posting ini. Di masa depan, ini akan sangat membantu Anda.)

Jadi, mari kita lihat apa yang terjadi di sini.

1. Pergi ke coroutine lambda dan segera buat WokrerActor :: readAB coroutine. Tetapi setelah membuat coroutine ini, kami tidak mulai menjalankannya (initial_suspend == suspend_always), yang memaksa kami untuk menyela dan kembali ke coroutine lambda.

2. co_await lambda memeriksa untuk melihat apakah readAB siap. Hasilnya tidak siap (await_ready == false), yang memaksanya meneruskan konteksnya ke metode CoroTask :: await_suspend. Konteks ini disimpan di CoroTask, dan resume dari readAB

3 coroutine diluncurkan . Setelah readAB coroutine telah menyelesaikan semua tindakan yang diperlukan, ia mencapai baris:

co_return std::make_pair(a, b);

sebagai hasilnya, metode CoroTask :: janji_type :: return_value dipanggil dan pasangan angka yang dibuat disimpan di dalam CoroTask :: janji_type

4. Karena metode co_return dipanggil, eksekusi coroutine berakhir, yang berarti saatnya memanggil metode CoroTask :: janji_type :: final_suspend . Metode ini mengembalikan struktur yang ditulis sendiri (jangan lupa untuk melihat gambar), yang memaksa Anda untuk memanggil metode final_awaiter :: await_suspend, yang mengembalikan konteks lambda coroutine yang disimpan dalam langkah 2.

Mengapa kami tidak bisa mengembalikan suspend_always saja di sini? Lagipula, dalam kasus initial_suspend kelas ini, apakah kita berhasil? Faktanya adalah bahwa pada initial_suspend kami berhasil karena coroutine ini dipanggil oleh lambda coroutine kami, dan kami kembali ke sana. Tetapi pada saat kami mencapai panggilan final_suspend, coroutine kami kemungkinan besar berlanjut dari tumpukan lain (khususnya, dari lambda yang disiapkan oleh fungsi makeCoroCallback), dan jika kami mengembalikan suspend_always di sini, kami akan kembali ke sana, dan tidak ke metode workCoroProcess.

5. Karena metode final_awaiter :: await_suspend mengembalikan konteks kepada kami, ini memaksa program untuk terus mengeksekusi konteks yang dikembalikan, yaitu, coroutine lambda. Karena eksekusi kembali ke titik:

const auto [a, b] = co_await actor.readAB();

maka kita perlu mengisolasi hasil yang disimpan dengan memanggil metode CoroTask :: await_resume. Hasilnya diterima, diteruskan ke variabel a dan b, dan sekarang instance CoroTask dihancurkan.

6. Contoh CoroTask dihancurkan, tetapi apa yang terjadi pada konteks WokrerActor :: readAB? Jika kita dari CoroTask :: janji_type :: final_suspend akan mengembalikan suspend_never (lebih tepatnya, akan mengembalikannya ke pertanyaan await_ready akan kembali benar), maka pada saat itu konteks coroutine akan dibersihkan. Tetapi karena kami tidak melakukannya, kewajiban untuk menghapus konteks dialihkan kepada kami. Kami akan menghapus konteks ini di destruktor CoroTask, pada titik ini sudah aman.

7. Bacaan coroutine dijalankan, hasilnya diperoleh dari itu, konteksnya dihapus, lambda coroutine terus berjalan ...

Fiuh, semacam mengatasinya. Apakah Anda ingat bahwa dari metode ABActor :: getAAsync () dan sejenisnya, kami mengembalikan struktur yang ditulis sendiri? Bahkan, metode getAAsync juga dapat diubah menjadi coroutine dengan menggabungkan pengetahuan yang diperoleh dari implementasi kelas CoroTask dan ActorAwaiter dan mendapatkan sesuatu seperti:

CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
    co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

tetapi ini saya akan pergi untuk analisis diri.

temuan


Seperti yang Anda lihat, dengan bantuan coroutine, Anda dapat dengan baik mem-linearkan kode panggilan balik yang tidak sinkron. Benar, proses penulisan jenis dan fungsi tambahan sepertinya belum terlalu intuitif.

Semua kode tersedia di repositori. Saya

juga menyarankan Anda melihat kuliah ini untuk perendaman yang lebih lengkap dalam topik ini
.
Sejumlah besar contoh tentang topik coroutine dari penulis yang sama ada di sini .
Dan Anda juga bisa menonton kuliah ini .

All Articles