رمز غير متزامن خطي مع كوروتين

صورة

بالإضافة إلى استخدام coroutine لإنشاء مولدات ، يمكنك محاولة استخدامها لترميز التعليمات البرمجية غير المتزامنة الموجودة. لنحاول القيام بذلك بمثال صغير. خذ الرمز المكتوب على إطار الممثل وأعد كتابة وظيفة واحدة من هذا الرمز على coroutines. لبناء المشروع ، سنستخدم دول مجلس التعاون الخليجي من فرع coroutines .

هدفنا هو الحصول على رد من المعكرونة:

    abActor.getA(ABActor::GetACallback([this](int a) {
        abActor.getB(ABActor::GetBCallback([a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback([this](){
                abActor.getA(ABActor::GetACallback([this](int a) {
                    abActor.getB(ABActor::GetBCallback([a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

نوعا ما:

const int a = co_await actor.abActor.getAAsync();
const int b = co_await actor.abActor.getBAsync();
co_await actor.abActor.saveABAsync(a - b, a + b);
const int newA = co_await actor.abActor.getAAsync();
const int newB = co_await actor.abActor.getBAsync();
std::cout << "Result " << newA << " " << newB << std::endl;

لذلك دعونا نبدأ.

ممثلين


بادئ ذي بدء ، نحن بحاجة إلى إنشاء إطار ممثل بسيط. إن إنشاء إطار فاعل كامل هو مهمة صعبة وكبيرة ، لذلك نقوم بتنفيذ نوع ما فقط.
أولاً ، قم بإنشاء فئة أساسية:

class Actor {
public:
    using Task = std::function<void()>;
public:
    virtual ~Actor();
public:
    void addTask(const Task &task);
    void tryRunTask();
private:
    std::queue<Task> queue;
    mutable std::mutex mutex;
};

الفكرة بسيطة في الأساس: نضع المهام التي هي كائنات وظيفية في قائمة الانتظار ، وعندما نحاول تشغيل المهمة ، نحاول إكمال هذه المهمة. تنفيذ الصف يؤكد نواياه:

Actor::~Actor() = default;

void Actor::addTask(const Task &task) {
    std::lock_guard lock(mutex);
    queue.push(task);
}

void Actor::tryRunTask() {
    std::unique_lock lock(mutex);
    if (queue.empty()) {
        return;
    }

    const Task task = queue.front();
    queue.pop();
    lock.unlock();

    std::invoke(task);
}

الفئة التالية هي "الخيط" الذي سينتمي إليه ممثلونا:

class Actor;

class ActorThread {
public:
    ~ActorThread();
public:
    void addActor(Actor &actor);
    void run();
private:
    std::vector<std::reference_wrapper<Actor>> actors;
};

كل شيء بسيط هنا أيضًا: في بداية البرنامج ، "نربط" الممثلين لدينا بالخيط باستخدام طريقة addActor ، ثم نبدأ الخيط باستخدام طريقة التشغيل.

ActorThread::~ActorThread() = default;

void ActorThread::addActor(Actor &actor) {
    actors.emplace_back(actor);
}

void ActorThread::run() {
    while (true) {
        for (Actor &actor: actors) {
            actor.tryRunTask();
        }
    }
}

عند بدء الخيط ، ندخل حلقة لا نهائية ونحاول أداء مهمة واحدة من كل ممثل. ليس هذا هو الحل الأفضل ، ولكنه سيفيد في التظاهر.

الآن دعونا نلقي نظرة على ممثل فئة الممثل:

class ABActor: public Actor {
public:
    using GetACallback = Callback<void(int result)>;
    using GetBCallback = Callback<void(int result)>;
    using SaveABCallback = Callback<void()>;
public:
    void getA(const GetACallback &callback);
    void getB(const GetBCallback &callback);
    void saveAB(int a, int b, const SaveABCallback &callback);
private:
    void getAProcess(const GetACallback &callback);
    void getBProcess(const GetBCallback &callback);
    void saveABProcess(int a, int b, const SaveABCallback &callback);
private:
    int a = 10;
    int b = 20;
};

يخزن هذا الفصل رقمين في حد ذاته - أ و ب ، وعند الطلب ، يُرجع قيمهما أو يستبدلها.

كرد اتصال ، يقبل كائنًا وظيفيًا بالمعلمات الضرورية. ولكن دعونا ننتبه إلى حقيقة أنه يمكن إطلاق ممثلين مختلفين في مواضيع مختلفة. وبالتالي ، إذا قمنا في نهاية العمل باستدعاء رد الاتصال الذي تم تمريره إلى الطريقة ، فسيتم استدعاء رد الاتصال هذا في سلسلة العمليات القابلة للتنفيذ الحالية ، وليس في سلسلة الرسائل التي تسمى أسلوبنا وإنشاء هذا الاستدعاء. لذلك ، نحتاج إلى إنشاء غلاف فوق رد الاتصال لحل هذه الحالة:

template<typename C>
class Callback {
public:
    template<typename Functor>
    Callback(Actor &sender, const Functor &callback)
        : sender(sender)
        , callback(callback)
    {}
public:
    template<typename ...Args>
    void operator() (Args&& ...args) const {
        sender.addTask(std::bind(callback, std::forward<Args>(args)...));
    }
private:
    Actor &sender;
    std::function<C> callback;
};

يتذكر هذا المجمع الممثل الأصلي ، وعندما تحاول إعدام نفسك ، فإنه ببساطة يضيف رد فعل حقيقي إلى قائمة انتظار المهام للممثل الأصلي.
ونتيجة لذلك ، يبدو تنفيذ فئة ABActor كما يلي:

void ABActor::getA(const GetACallback &callback) {
    addTask(std::bind(&ABActor::getAProcess, this, callback));
}

void ABActor::getAProcess(const ABActor::GetACallback &callback) {
    std::invoke(callback, a);
}

void ABActor::getB(const GetBCallback &callback) {
    addTask(std::bind(&ABActor::getBProcess, this, callback));
}

void ABActor::getBProcess(const ABActor::GetBCallback &callback) {
    std::invoke(callback, b);
}

void ABActor::saveAB(int a, int b, const SaveABCallback &callback) {
    addTask(std::bind(&ABActor::saveABProcess, this, a, b, callback));
}

void ABActor::saveABProcess(int a, int b, const ABActor::SaveABCallback &callback) {
    this->a = a;
    this->b = b;
    std::invoke(callback);
}

في طريقة الواجهة للفئة ، نقوم ببساطة بربط الوسيطات التي تم تمريرها بـ "الفتحة" المقابلة للفئة ، وبالتالي إنشاء مهمة ، ووضع هذه المهمة في قائمة انتظار المهام لهذه الفئة. عندما يبدأ مؤشر ترابط المهمة في أداء المهمة ، فسوف يستدعي "الفتحة" الصحيحة ، والتي ستقوم بتنفيذ جميع الإجراءات التي تحتاجها وتستدعي رد الاتصال ، والذي بدوره سيرسل رد الاتصال الحقيقي إلى قائمة انتظار المهمة التي تسببت فيه.

لنكتب ممثلاً سيستخدم فئة ABActor:

class ABActor;

class WokrerActor: public Actor {
public:
    WokrerActor(ABActor &actor)
        : abActor(actor)
    {}
public:
    void work();
private:
    void workProcess();
private:
    ABActor &abActor;
};

void WokrerActor::work() {
    addTask(std::bind(&WokrerActor::workProcess, this));
}

void WokrerActor::workProcess() {
    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        std::cout << "Result " << a << std::endl;
    }));
}

وجمع كل شيء معًا:

int main() {
    ABActor abActor;
    WokrerActor workerActor(abActor);

    ActorThread thread;
    thread.addActor(abActor);
    thread.addActor(workerActor);

    workerActor.work();

    thread.run();
}

دعونا نتبع السلسلة الكاملة من التعليمات البرمجية.

في البداية ، ننشئ الكائنات الضرورية ونقيم روابط بينها.
ثم نضيف مهمة workProcess إلى قائمة انتظار مهمة الممثل.
عندما يبدأ الخيط ، سيجد مهمتنا في قائمة الانتظار ويبدأ في تنفيذه.
في عملية التنفيذ ، نسمي طريقة getA لفئة ABActor ، وبالتالي نضع المهمة المقابلة في قائمة انتظار فئة ABActor ، ونكمل التنفيذ.
بعد ذلك ، سيأخذ مؤشر الترابط المهمة التي تم إنشاؤها حديثًا من فئة ABActor وتنفيذها ، مما سيؤدي إلى تنفيذ رمز getAProcess.
سوف يستدعي هذا الرمز رد الاتصال ، ويمرر الحجة اللازمة إليه - المتغير a. ولكن بما أن رد الاتصال الذي يمتلكه هو غلاف ، في الواقع ، سيتم وضع رد اتصال حقيقي مع معلمات معبأة في قائمة انتظار فئة العامل.
وعندما ، في التكرار التالي للدورة ، يتم سحب الخيط وتنفيذ رد الاتصال من فئة العامل ، سنرى إخراج السطر "النتيجة 10"

إطار الممثل هو طريقة ملائمة إلى حد ما لتفاعل الطبقات المنتشرة عبر تيارات مادية مختلفة مع بعضها البعض. إن خصوصية التصميم الطبقي ، كما كان يجب أن تكون مقتنعا بهذا ، هي أنه في كل ممثل فردي يتم تنفيذ جميع الإجراءات بشكل كامل وفي خيط واحد. تتم النقطة الوحيدة لمزامنة التدفقات في تفاصيل تنفيذ إطار الممثل ولا تكون مرئية للمبرمج. وبالتالي ، يمكن للمبرمج كتابة رمز أحادي الخيوط دون القلق بشأن اختتام ملفات المزامنة وتتبع حالات السباق ، والمآزق والصداع الأخرى.

لسوء الحظ ، هذا الحل له ثمن. نظرًا لأن نتيجة تنفيذ ممثل آخر لا يمكن الوصول إليها إلا من رد الاتصال ، فعاجلاً أو آجلاً يتحول رمز الممثل إلى شيء مثل هذا:

    abActor.getA(ABActor::GetACallback(*this, [this](int a) {
        abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
            abActor.saveAB(a - b, a + b, ABActor::SaveABCallback(*this, [this](){
                abActor.getA(ABActor::GetACallback(*this, [this](int a) {
                    abActor.getB(ABActor::GetBCallback(*this, [a, this](int b) {
                        std::cout << "Result " << a << " " << b << std::endl;
                    }));
                }));
            }));
        }));
    }));

دعونا نرى ما إذا كان بإمكاننا تجنب ذلك باستخدام ابتكار C ++ 20 - coroutines.

لكن أولاً ، سنحدد القيود.

بطبيعة الحال ، لا يمكننا بأي حال من الأحوال تغيير رمز إطار الممثل. أيضًا ، لا يمكننا تغيير توقيعات الطرق العامة والخاصة لمثيلات فئة الفاعل - ABActor و WorkerActor. دعونا نرى ما إذا كان بإمكاننا الخروج من هذا الموقف.

Coroutines. الجزء الأول


تتمثل الفكرة الرئيسية لـ corutin في أنه عند إنشاء coroutine ، يتم إنشاء إطار مكدس منفصل له في الكومة ، حيث يمكننا "الخروج" منه في أي وقت ، مع الحفاظ على وضع التنفيذ الحالي ، وسجلات المعالج والمعلومات الضرورية الأخرى. ثم يمكننا أيضًا في أي وقت العودة إلى تنفيذ الروتين المعلق وإكماله حتى النهاية أو حتى التعليق التالي.

الكائن std :: coroutine_handle <> مسؤول عن إدارة هذه البيانات ، والتي تمثل بشكل أساسي مؤشرًا لإطار المكدس (والبيانات الضرورية الأخرى) ، والذي يحتوي على طريقة استئناف (أو التماثلية ، عامل التشغيل ()) ، والتي تعيدنا إلى تنفيذ coroutine .

استنادًا إلى هذه البيانات ، فلنكتب أولاً دالة getAAsync ، ثم نحاول التعميم.

لذا ، لنفترض أن لدينا بالفعل مثيل من فئة std :: coroutine_handle <> coro ، فماذا علينا أن نفعل؟

يجب استدعاء الطريقة الموجودة بالفعل ABActor :: getA ، والتي ستحل الموقف حسب الحاجة ، ولكن عليك أولاً إنشاء رد اتصال لطريقة getA.

دعنا نتذكر أنه يتم إرجاع رد الاتصال إلى طريقة استدعاء getA - نتيجة طريقة getA. علاوة على ذلك ، يتم استدعاء هذا رد الاتصال في مؤشر ترابط عامل مؤشر الترابط. وهكذا ، من خلال رد الاتصال هذا ، يمكننا الاستمرار بأمان في تنفيذ Coroutine الذي تم إنشاؤه للتو من مؤشر ترابط Worker والذي سيستمر في تنفيذ تسلسل الإجراءات. ولكن أيضًا ، يجب علينا حفظ النتيجة التي تم إرجاعها في رد الاتصال في مكان ما ، بالطبع ، سيكون مفيدًا لنا أكثر.

auto callback = GetACallback(returnCallbackActor, [&value, coro](int result) {
        value = result;
        std::invoke(coro);
 });
getA(callback);

لذا ، تحتاج الآن إلى أخذ نسخة من كائن coroutine_handle من مكان ما ورابط يمكنك حفظ النتيجة فيه.

في المستقبل ، سنرى أن coroutine_handle يتم تمريره إلينا نتيجة استدعاء الوظيفة. وفقًا لذلك ، كل ما يمكننا القيام به هو تمريرها إلى وظيفة أخرى. دعونا نعد هذه الوظيفة على أنها لامدا. (سنقوم بتمرير الرابط إلى المتغير حيث سيتم تخزين نتيجة الاستدعاء للشركة).

auto storeCoroToQueue = [&returnCallbackActor, this](auto &value, std::coroutine_handle<> coro) {
    auto callback=GetACallback(returnCallbackActor, [&value, coro](int result){
        value = result;
        std::invoke(coro);
    });
    getA(callback);
};

سنحفظ هذه الوظيفة في الصف التالي.

struct ActorAwaiterSimple {
    int value;

    std::function<void(int &value,std::coroutine_handle<>)> forwardCoroToCallback;

    ActorAwaiterSimple(
        const std::function<void(int &value, std::coroutine_handle<>)> &forwardCoroToCallback
    )
        : forwardCoroToCallback(forwardCoroToCallback)
    {}

    ActorAwaiterSimple(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple& operator=(const ActorAwaiterSimple &) = delete;
    ActorAwaiterSimple(ActorAwaiterSimple &&) = delete;
    ActorAwaiterSimple& operator=(ActorAwaiterSimple &&) = delete;

// ...

بالإضافة إلى الكائن الوظيفي ، سنحتفظ هنا أيضًا بالذاكرة (في شكل قيمة متغيرة) للقيمة التي تنتظرنا في رد الاتصال.

نظرًا لأننا نحتفظ بالذاكرة تحت القيمة هنا ، فإننا بالكاد نريد نسخ مثيل هذه الفئة أو نقله إلى مكان ما. تخيل ، على سبيل المثال ، شخص ما قام بنسخ هذه الفئة ، وحفظ القيمة تحت متغير القيمة في النسخة القديمة من الفصل ، ثم حاول قراءتها من النسخة الجديدة. ومن الطبيعي أنه ليس هناك ، حيث حدث النسخ قبل الحفظ. غير سارة. لذلك ، نحن نحمي أنفسنا من هذه المشكلة من خلال حظر المنشئين ونسخ وتحريك المشغلين.

دعونا نواصل كتابة هذا الفصل. الطريقة التالية التي نحتاجها هي:

    bool await_ready() const noexcept {
        return false;
    }

يجيب على سؤال ما إذا كان معناها جاهزًا للإصدار. بطبيعة الحال ، في المكالمة الأولى ، قيمتنا ليست جاهزة بعد ، وفي المستقبل لن يسألنا أحد عن ذلك ، لذا فقط قم بإرجاع خطأ.

سيتم تمرير مثيل coroutine_handle إلينا في طريقة await_suspend الباطلة (std :: coroutine_handle <> coro) ، لذا دعنا ندعو ممولنا المحضر في ذلك ، ونمر هناك أيضًا رابط ذاكرة تحت القيمة:

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(forwardCoroToCallback, std::ref(value), coro);
    }

سيتم طلب نتيجة تنفيذ الوظيفة في الوقت المناسب عن طريق استدعاء طريقة await_resume. لن نرفض مقدم الطلب:

    int await_resume() noexcept {
        return value;
    }

الآن يمكن استدعاء طريقتنا باستخدام الكلمة الرئيسية co_await:

const int a = co_await actor.abActor.getAAsync(actor);

ماذا سيحدث هنا ، نحن نمثل بالفعل تقريبًا.

أولاً ، سيتم إنشاء كائن من نوع ActorAwaiterSimple ، وسيتم نقله إلى "إدخال" co_await. سوف يسأل أولاً (من خلال الاتصال بـ await_ready) عما إذا كان لدينا نتيجة نهائية عن طريق الخطأ (ليس لدينا) ، ثم يتصل بـ await_suspend ، ويمر في سياق (في الواقع ، مؤشر إلى إطار مكدس Coroutine الحالي) ويقطع التنفيذ.

في المستقبل ، عندما يكمل ممثل ABActor عمله ويستدعي استدعاء النتيجة ، سيتم حفظ هذه النتيجة (موجودة بالفعل في سلسلة ترابط Worker) في مثيل ActorAwaiterSimple الوحيد (المتبقي على رزمة Coroutine) وسيبدأ استمرار الروتين.

سيستمر Corutin في التنفيذ ، ويأخذ النتيجة المحفوظة عن طريق استدعاء طريقة await_resume ، ويمرر هذه النتيجة إلى المتغير a

في الوقت الحالي ، فإن الحد من Awaiter الحالي هو أنه يمكن أن يعمل فقط مع عمليات الاسترجاعات بمعلمة واحدة من النوع int. دعنا نحاول توسيع تطبيق Awaiter:

template<typename... T>
struct ActorAwaiter {

    std::tuple<T...> values;

    std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> storeHandler;

    ActorAwaiter(const std::function<void(std::tuple<T...> &values, std::coroutine_handle<>)> &storeHandler)
        : storeHandler(storeHandler)
    {}

    ActorAwaiter(const ActorAwaiter &) = delete;
    ActorAwaiter& operator=(const ActorAwaiter &) = delete;
    ActorAwaiter(ActorAwaiter &&) = delete;
    ActorAwaiter& operator=(ActorAwaiter &&) = delete;

    bool await_ready() const noexcept {
        return false;
    }

    void await_suspend(std::coroutine_handle<> coro) noexcept {
        std::invoke(storeHandler, std::ref(values), coro);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==0 && B, int>=0
    >
    void await_resume() noexcept {

    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len==1 && B, int>=0
    >
    auto await_resume() noexcept {
        return std::get<0>(values);
    }

    //   bool B  ,
    //   sfinae      
    template<
        bool B=true,size_t len=sizeof...(T),std::enable_if_t<len!=1 && len!=0 && B, int>=0
    >
    std::tuple<T...> await_resume() noexcept {
        return values;
    }
};

هنا نستخدم std :: tuple حتى نتمكن من حفظ العديد من المتغيرات في وقت واحد.

يتم فرض Sfinae على طريقة await_resume بحيث لا يمكن إرجاع مجموعة صفية في جميع الحالات ، ولكن اعتمادًا على عدد القيم الكامنة في الصف ، أو فراغ الإرجاع ، أو وسيطة واحدة بالضبط أو المجموعة بأكملها.

تبدو أغلفة إنشاء Awaiter نفسها الآن كما يلي:

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static auto makeCoroCallback(const Func &func, Actor &returnCallback) {
    return [&returnCallback, func](auto &values, std::coroutine_handle<> coro) {
        auto callback = MakeCallback(returnCallback, [&values, coro](ReturnArgs&& ...result) {
            values = std::make_tuple(std::forward<ReturnArgs>(result)...);
            std::invoke(coro);
        });
        func(callback);
    };
}

template<typename MakeCallback, typename... ReturnArgs, typename Func>
static ActorAwaiter<ReturnArgs...> makeActorAwaiter(const Func &func, Actor &returnCallback) {
    const auto storeCoroToQueue = makeCoroCallback<MakeCallback, ReturnArgs...>(func, returnCallback);
    return ActorAwaiter<ReturnArgs...>(storeCoroToQueue);
}

ActorAwaiter<int> ABActor::getAAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ActorAwaiter<int> ABActor::getBAsync(Actor &returnCallback) {
    return makeActorAwaiter<GetBCallback, int>(std::bind(&ABActor::getB, this, _1), returnCallback);
}

ActorAwaiter<> ABActor::saveABAsync(Actor &returnCallback, int a, int b) {
    return makeActorAwaiter<SaveABCallback>(std::bind(&ABActor::saveAB, this, a, b, _1), returnCallback);
}

الآن دعونا نكتشف كيفية استخدام النوع الذي تم إنشاؤه مباشرة في coroutine.

Coroutines. الجزء 2. قابل للاستئناف


من وجهة نظر C ++ ، تعتبر دالة تحتوي على الكلمات co_await أو co_yield أو co_return أنها coroutine. ولكن يجب أيضًا أن ترجع هذه الوظيفة نوعًا معينًا. اتفقنا على أننا لن نغير توقيع الدوال (أعني هنا أن نوع الإرجاع يشير أيضًا إلى التوقيع) ، لذا سيتعين علينا الخروج منه بطريقة أو بأخرى.

دعونا ننشئ كوروتين لامدا ونطلق عليه من وظيفتنا:

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

(لماذا لا يتم تسجيل هذا في قائمة التقاط lambdas؟ بعد ذلك ، ستصبح جميع التعليمات البرمجية في الداخل أسهل قليلاً. ولكن حدث ذلك ، على ما يبدو ، أن lambda-coroutines في المترجم لم يتم دعمها بالكامل بعد ، لذلك لن يعمل هذا الرمز.)

كما ترون ، لقد تحول رمز رد الاتصال المخيف الآن إلى رمز خطي لطيف جدًا. كل ما تبقى لنا هو اختراع فئة الممثل المستأنف

.

struct ActorResumable {
    struct promise_type {
        using coro_handle = std::coroutine_handle<promise_type>;

        auto get_return_object() { 
            //  ,    ActorResumable   promise_type
            return coro_handle::from_promise(*this);
        }

        auto initial_suspend() {
            //      
            return std::suspend_never();
        }

        auto final_suspend() {
            //      . 
            // ,     
            return std::suspend_never();
        }

        void unhandled_exception() {
            //   ,       
            std::terminate();
        }
    };

    ActorResumable(std::coroutine_handle<promise_type>) {}
};

يبدو الرمز الزائف للكوروتين الناتج من لامدا لدينا شيءًا مثل هذا:

ActorResumable coro() {
    promise_type promise;
    ActorResumable retobj = promise.get_return_object();
    auto intial_suspend = promise.initial_suspend();
    if (initial_suspend == std::suspend_always)  {
          // yield
    }
    try { 
        //  .
        const int a = co_await actor.abActor.getAAsync(actor);
        std::cout << "Result " << a << std::endl;
    } catch(...) { 
        promise.unhandled_exception();
    }
final_suspend:
    auto final_suspend = promise.final_suspend();
    if (final_suspend == std::suspend_always)  {
         // yield
    } else {
         cleanup();
    }

هذا مجرد رمز زائف ، يتم تبسيط بعض الأشياء عمدا. ومع ذلك ، دعنا نرى ما سيحدث.

أولاً ، نخلق وعدًا وممثلًا مستقلاً.

بعد الحرف الأولي () ، لا نتوقف مؤقتًا ، بل ننتقل. نبدأ في تنفيذ الجزء الرئيسي من البرنامج.

عندما نصل إلى co_await ، نتفهم أننا بحاجة للتوقف. لقد قمنا بالفعل بدراسة هذا الوضع في القسم السابق ، ويمكنك الرجوع إليه ومراجعته.

بعد أن تابعنا التنفيذ وعرضنا النتيجة على الشاشة ، ينتهي تنفيذ coroutine. نتحقق من final_suspend وننظف السياق الكامل للكوروتين.

Coroutines. الجزء 3. المهمة


دعونا نتذكر المرحلة التي وصلنا إليها الآن.

void WokrerActor::workProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);
        co_await actor.abActor.saveABAsync(actor, a - b, a + b);
        const int newA = co_await actor.abActor.getAAsync(actor);
        const int newB = co_await actor.abActor.getBAsync(actor);
        std::cout << "Result " << newA << " " << newB << std::endl;
    };

    coroutine(*this);
}

يبدو لطيفًا ، ولكن من السهل أن نرى أن الرمز:

        const int a = co_await actor.abActor.getAAsync(actor);
        const int b = co_await actor.abActor.getBAsync(actor);

كرر مرتين. هل من الممكن إعادة صياغة هذه اللحظة ووضعها في وظيفة منفصلة؟

لنرسم كيف قد يبدو هذا:

CoroTask<std::pair<int, int>> WokrerActor::readAB() {
    const int a = co_await abActor.getAAsync2(*this);
    const int b = co_await abActor.getBAsync2(*this);
    co_return std::make_pair(a, b);
}

void WokrerActor::workCoroProcess() {
    const auto coroutine = [](WokrerActor &actor) -> ActorResumable {
        const auto [a, b] = co_await actor.readAB();
        co_await actor.abActor.saveABAsync2(actor, a - b, a + b);
        const auto [newA, newB] = co_await actor.readAB();
        std::cout << "Result " << newA << " " << newB << " " << a << " " << b << std::endl;
    };

    coroutine(*this);
}

كل ما تبقى لنا هو اختراع نوع CoroTask. دعونا نفكر في الأمر. أولاً ، يتم استخدام co_return داخل وظيفة readAB ، مما يعني أنه يجب على CoroTask تلبية الواجهة القابلة للاستئناف. ولكن أيضا ، يتم استخدام كائن من هذه الفئة لإدخال co_await من coroutine آخر. هذا يعني أن فئة CoroTask يجب أن تفي أيضًا بواجهة Awaitable. لننفذ كلتا الواجهتين في فئة CoroTask:

template <typename T = void>
struct CoroTask {
    struct promise_type {
        T result;
        std::coroutine_handle<> waiter;

        auto get_return_object() {
            return CoroTask{*this};
        }

        void return_value(T value) {
            result = value;
        }

        void unhandled_exception() {
            std::terminate();
        }

        std::suspend_always initial_suspend() {
            return {};
        }

        auto final_suspend() {
            struct final_awaiter {
                bool await_ready() {
                    return false;
                }
                void await_resume() {}
                auto await_suspend(std::coroutine_handle<promise_type> me) {
                    return me.promise().waiter;
                }
            };
            return final_awaiter{};
        }
    };

    CoroTask(CoroTask &&) = delete;
    CoroTask& operator=(CoroTask&&) = delete;
    CoroTask(const CoroTask&) = delete;
    CoroTask& operator=(const CoroTask&) = delete;

    ~CoroTask() {
        if (h) {
            h.destroy();
        }
    }

    explicit CoroTask(promise_type & p)
        : h(std::coroutine_handle<promise_type>::from_promise(p))
    {}

    bool await_ready() {
        return false;
    }

    T await_resume() {
        auto &result = h.promise().result;
        return result;
    }

    void await_suspend(std::coroutine_handle<> waiter) {
        h.promise().waiter = waiter;
        h.resume();
    }
private:
    std::coroutine_handle<promise_type> h;
};

(أوصي بشدة بفتح صورة الخلفية لهذه المشاركة. في المستقبل ، سيساعدك هذا كثيرًا.)

لذا ، دعنا نرى ما يحدث هنا.

1. اذهب إلى corawine لامدا وإنشاء فورا WokrerActor :: readAB coroutine. ولكن بعد إنشاء هذا coroutine ، لا نبدأ في تنفيذه (Init_suspend == suspend_always) ، مما يجبرنا على المقاطعة والعودة إلى loutda.

2. co_await فحص لامدا لمعرفة ما إذا كان readAB جاهزا. النتيجة غير جاهزة (await_ready == false) ، مما يجبرها على تمرير سياقها إلى طريقة CoroTask :: await_suspend. يتم حفظ هذا السياق في CoroTask ، ويتم إطلاق السيرة الذاتية للقراءة corabinesAB

3. بعد الانتهاء من readAB coroutine جميع الإجراءات اللازمة ، فإنه يصل إلى الخط:

co_return std::make_pair(a, b);

ونتيجة لذلك ، يتم استدعاء طريقة CoroTask :: prom_type :: return_value ويتم حفظ الزوج الذي تم إنشاؤه داخل CoroTask :: prom_type

4. نظرًا لأنه تم استدعاء طريقة co_return ، ينتهي تنفيذ coroutine ، مما يعني أن الوقت قد حان لاستدعاء طريقة CoroTask :: prom_type :: final_suspend . تُرجع هذه الطريقة بنية مكتوبة ذاتيًا (لا تنسَ أن تنظر إلى الصورة) ، مما يجبرك على استدعاء طريقة final_awaiter :: await_suspend ، التي تُرجع سياق اللامبدا المخزن في الخطوة 2.

لماذا لا نعود هنا فقط هنا؟ بعد كل شيء ، في حالة التوقف الأولي لهذه الفئة ، هل نجحنا؟ والحقيقة هي أننا نجحنا في تعليق الإيقاف الأولي لأن هذا اللقاح كان يسمى بروتين اللامدا الخاص بنا ، وقد عدنا إليه. ولكن في اللحظة التي وصلنا فيها إلى المكالمة final_suspend ، استمر على الأرجح من مجموعتنا الأخرى (على وجه التحديد ، من lambda التي أعدتها وظيفة makeCoroCallback) ، وإذا عدنا suspend_always إلى هنا ، فسنعود إليها ، وليس إلى طريقة workCoroProcess.

5. منذ أن أعادت طريقة final_awaiter :: await_suspend السياق إلينا ، فإن هذا يجبر البرنامج على الاستمرار في تنفيذ السياق الذي تم إرجاعه ، أي loutda coroutine. منذ أن عاد التنفيذ إلى النقطة:

const auto [a, b] = co_await actor.readAB();

ثم نحتاج إلى عزل النتيجة المحفوظة عن طريق استدعاء طريقة CoroTask :: await_resume. يتم استلام النتيجة ، وتمريرها إلى المتغيرات a و b ، والآن يتم تدمير مثيل CoroTask.

6. تم تدمير مثيل CoroTask ، ولكن ماذا حدث لسياق WokrerActor :: readAB؟ إذا قمنا من CoroTask :: prom_type :: final_suspend بإرجاع suspend_never (بتعبير أدق ، سوف نعيد ذلك إلى السؤال الذي ينتظر انتظارًا صحيحًا) ، فعندئذٍ سيتم تنظيف سياق Coroutine. ولكن بما أننا لم نفعل ذلك ، فقد تم نقل الالتزام بتوضيح السياق إلينا. سنقوم بمسح هذا السياق في مدمر CoroTask ، عند هذه النقطة فهو آمن بالفعل.

7. يتم تنفيذ قراءة coroutine corabine ، ويتم الحصول على النتيجة منه ، ويتم مسح السياق ، ويستمر coroutine lambda في العمل ...

Phew ، نوعًا ما فرزها. هل تتذكر أنه من الأساليب ABActor :: getAAsync () وما شابه ذلك ، نعيد بنية مكتوبة ذاتيا؟ في الواقع ، يمكن أيضًا تحويل طريقة getAAsync إلى كوروتين عن طريق الجمع بين المعرفة المكتسبة من تنفيذ فئتي CoroTask و ActorAwaiter والحصول على شيء مثل:

CoroTaskActor<int> ABActor::getAAsync(Actor &returnCallback) {
    co_return makeCoroCallback<GetACallback, int>(std::bind(&ABActor::getA, this, _1), returnCallback);
}

ولكن هذا سأترك لتحليل الذات.

الموجودات


كما ترون ، بمساعدة coroutine ، يمكنك أن تكتب رمز رد الاتصال غير المتزامن. صحيح أن عملية كتابة الأنواع والوظائف المساعدة لا تبدو بديهية للغاية حتى الآن.

كل الكود متاح في المستودع ،

كما أوصيك بالاطلاع على هذه المحاضرات للحصول على انغماس كامل في الموضوع
.
هناك عدد كبير من الأمثلة حول موضوع coroutine من نفس المؤلف هنا .
ويمكنك أيضًا مشاهدة هذه المحاضرة.

All Articles