.NET中的异步编程:最佳实践

C#中异步/等待的出现导致对如何编写简单且正确的并行代码的重新定义。通常,使用异步编程,程序员不仅无法解决线程所带来的问题,而且还会引入新的问题。死锁和飞行无处不在-它们变得更难诊断。



Dmitry Ivanov-华为软件分析团队的负责人,曾是JetBrains Rider技术的负责人和ReSharper核心的开发人员:数据结构,缓存,多线程以及在DotNext会议上的定期演讲者

在过场动画下-来自DotNext 2019 Piter会议的Dmitry报告的视频记录和文字记录。



代表讲者的进一步叙述。

在多线程或异步代码中,某些东西经常会中断。原因可能是僵局,也可能是种族。通常,一场竞赛每千次崩溃一次,通常不是在本地,而是在构建服务器上崩溃,并且要花几天的时间才能赶上。我相信对于很多人来说这是一个熟悉的情况。

另外,即使是经验丰富的开发人员,也要查看异步代码,我发现自己认为某些东西可以写得短三倍,而且更正确。

这表明问题不在于人,而在于工具。人们只是使用该工具并希望其解决问题。该工具本身具有大量功能(有时甚至是多余的),设置和隐式上下文,这导致以下事实:很容易错误使用。让我们尝试弄清楚如何使用async / await并使用Task.NET中的类

计划


  • 用异步/等待解决的方法存在问题。
  • 有争议的设计示例。
  • 现实生活中的一项任务,我们将异步解决。


异步/等待和有待解决的问题




为什么我们需要异步/等待?假设我们有可用于共享共享内存的代码。

在工作开始时,我们使用Dequeue阻塞请求从阻塞队列(例如,从Internet或磁盘)读取请求,在这种情况下,该文件(阻塞请求将在示例中用红色标记)。

这种方法需要大量线程,并且每个线程都需要资源,在调度程序上造成了负载。但这不是主要问题。假设人们可以重写操作系统,以便这些系统支持十万个线程和一百万个线程。但是主要的问题是某些线程根本无法使用。例如,您有一个用户界面线程。没有正常的适当UI框架,在该框架中,不仅要从一个线程访问数据。 UI线程无法被阻止。为了不阻止它,我们需要异步代码。

现在让我们讨论第二个任务。读取文件后,需要以某种方式对其进行处理。我们将并行进行。

你们中许多人都听说过并行性与异步性不同。在这种情况下,就会出现一个问题:异步是否可以帮助编写更紧凑,美观和更快的并行代码?

最后的任务是使用共享内存。我们是否需要使用锁定来拖动这种机制,将其同步到异步代码,还是可以通过某种方式避免这种情况?可以异步/的await这方面的帮助?

异步/等待路径


让我们看看世界上和.NET中异步编程的发展。

打回来


Void Foo(params, Action callback) {…}
 

Void OurMethod() {//synchronous code
 
    Foo(params,() =>{//asynchronous code;continuation
    });
}

异步编程始于回调。也就是说,首先您需要同步调用代码的某些部分,然后第二部分-异步调用。例如,您从文件中读取数据,数据准备就绪后,它将以某种方式传递给您。此异步部分作为回调传递

更多回调


void Foo(params, Action callback) {...} 
void Bar(Action callback) {...}
void Baz(Action callback) {...}

void OurMethod() {
    ... //synchronous code
    
    Foo(params, () => { 
      ... //continuation 1 
      Bar(() => {
        //continuation 2
        Baz(() => {
          //continuation 3
        }); 
      });
    });
}

因此,从一个回调中,您可以注册另一个回调,从中您可以注册第三个回调,最后,所有这些都将变成一个Callback Hell



回调:例外



void Foo(params, Action onSuccess, Action onFailure) {...}


void OurMethod() {
    ... //synchronous code 
    Foo(params, () => {
      ... //asynchronous code on success 
    },
    () => {
        ... //asynchronous code on failure
    }); 
}

如何处理例外情况?例如,ReSharper在分别响应异常和良好执行时,并未展示出最漂亮的代码片段-针对特殊情况和成功的延续,存在单独的回调。结果就是这样的回调地狱,但不是线性的,而是像树一样的,这可能完全令人困惑。



在.NET中,第一种回调方法称为异步编程模型(APM)。该方法将称为AsyncCallback,与基本上相同Action,但是该方法具有一些功能。首先,方法应以单词“ Begin”开头(从文件中读取的内容为BeginRead),它返回some AsyncResult。自己AsyncResult-这是一个知道该操作已完成并且具有某种机制的处理程序WaitHandle。您WaitHandle可以挂起,等待操作异步完成。另一方面,您可以调用EndOperation,即EndRead同步进行make 和hang(这非常类似于property Task.Result)。

这种方法有许多问题。首先,它不能保护我们免受回调地狱的伤害。第二,仍然不清楚如何处理异常。第三,尚不清楚将在哪个线程上调用此回调-我们无法控制该调用。第四,问题来了,如何将代码段与回调结合起来?



第二个模型称为基于事件的异步模式。这是一种响应式回调方法。方法的思想是我们将OperationNameAsync具有事件完成的对象传递给该方法并订阅该事件。如您所见,BeginOperationName更改为OperationNameAsync。当您进入Socket类时,可能会造成混乱,其中混合了两种模式:ConnectAsyncBeginConnect

请注意,您必须致电取消OperationNameAsyncCancel。由于在.NET中找不到其他位置,因此通常每个人都发送CancellationToken。因此,如果您意外地在库中遇到一个以结尾的方法,则Async需要了解它不一定会返回Task,而是可以返回类似的构造。



考虑在Java中称为以JavaScript(如Promises)和.NET(如Task Asynchronous Patterns)(即“任务”)形式表示的Future。此方法假定您有一些计算对象,并且可以看到该对象的状态(运行或已完成)。在.NET中,有RnToCompletion两种状态的所谓方便分隔:任务开始和任务完成。当一个方法被调用任务时一个常见的错误IsCompleted是没有回报成功延续,但是RnToCompletionCanceledFaulted。因此,在UI应用程序中单击“取消”的结果应与返回异常(执行)不同。在.NET中,有一个区别:如果执行是您要保护的错误,则取消-强制操作。

在.NET中,还引入了一个概念TaskScheduler-它是一种在线程之上的抽象,用于指示在何处运行任务。在这种情况下,取消支持是在设计级别设计的。 .NET库中几乎所有操作CancellationToken都可以传递。这不适用于所有语言:例如,在Kotlin中,您可以撤消任务,但在.NET中,则不能。解决方案可能是取消任务的人员与任务本身之间的责任划分。收到任务后,您只能明确地取消任务-您必须将其继续传递CancellationToken

一个特殊的对象TaskCompletionSoure使您可以轻松地适应与基于事件的异步模式异步编程模型相关联的旧API如果在任务中编程,则必须阅读一个文档。它描述了有关tasa的所有协议。例如,任何返回任务的方法都应在运行状态下返回它,这意味着它不能为Created,而所有此类操作都必须以结尾Async

合并延续


Task ourMethod() {
  return Task.RunSynchronously(() =>{
    ... //synchronous code
  })
  .ContinueWith(_ =>{
    Foo(); //continuation 1
  })
  .ContinueWith(_ =>{
    Bar(); //continuation 2
  })
  .ContinueWith(_ =>{
    Baz(); //continuation 3
  })
}

至于组合,考虑到回调地狱,尽管有重复的代码片段,但改动很小,它可以以更线性的形式出现。似乎代码正在以这种方式进行改进,但是这里也有一些陷阱。

开始和继续任务


Task.Factory.StartNew(Action, 
  TaskCreationOptions, 
  TaskScheduler, 
  CancellationToken
)
Task.ContinueWith(Action<Task>, 
  TaskContinuationOptions, 
  TaskScheduler, 
  CancellationToken
)

让我们在标准任务启动过程中转到三个参数:第一个是启动任务的选项,第二个是启动任务的选项,scheduler第三个是- CancellationToken



TaskScheduler告知任务从何处开始,并且是您可以独立覆盖的对象。例如,您可以覆盖method Queue。如果TaskSchedulerthread pool,则该方法Queue从中获取线程thread pool并将任务发送到那里。

如果您接管scheduler主线程,它将把所有内容放在一个队列中,并且任务将在主线程上顺序执行。但是,问题在于,在.NET中,您无需传递即可执行任务TaskScheduler。出现了一个问题:.NET如何计算传递给它的任务?当任务从StartNew内部开始时ActionThreadStaticCurrent展示在TaskScheduler我们给她的那张。

由于隐式上下文,这种设计似乎颇具争议。在某些情况下,它TaskScheduler包含异步代码,该代码在某处深深地继承TaskScheduler.Current并与另一个调度程序重叠,从而导致死锁。在这种情况下,您可以使用option TaskCreationOption.HideScheduler。这是一个警钟,它表明我们有一些选项可以覆盖ThreadStatic设置。

延续都是一样的。问题出现了:它从何而来TaskScheduler?首先,它是从您开始使用的方法中获取的Continuation。它也TaskScheduler取自ThreadStatic。重要的是,对于异步/等待,延续的工作方式必须完全不同。



我们转到参数TaskCreationOptionsTaskContinuationOptions。他们的主要问题是他们很多。这些参数中的一些互相抵消,有些则互斥。所有这些参数均可在所有可能的组合中使用,因此很难记住渴望会发生的所有事情。其中一些选项完全无法理解。



例如,参数ExecuteSynchronouslyRunContinuationsAsynchronously代表两个可能的应用程序选项,但是继续是以同步方式还是异步方式启动取决于许多您不知道的事情。



另一个例子:我们启动了任务,启动了延续,并同时给出了两个参数TaskContinuations.ExecuteSynchronously,之后他们异步开始继续。它将在上一个任务结束的同一堆栈中执行,还是将其转移到thread pool在这种情况下,将有第三个选择:取决于。



TaskCompletionSource


考虑一下TaskCompletionSource。创建任务时,可以设置其结果SetResult以使以前的异步模式适应任务世界。您TaskCompletionSource可以请求tcs.Task,并且finish在您致电时此任务将进入状态tcs.SetResult。但是,如果在线程池上运行它,则会出现死锁。问题是,为什么我们甚至不同步编写任何内容?



我们创建TaskCompletionSource,启动一个新任务,然后有第二个线程启动该任务中的某些内容。它超过了预期的一百毫秒。然后我们的主线程-绿色- 等待,就是这样。他释放堆栈,堆栈挂起,等待继续调用task.Waittcs暴露。

在蓝色线程中,我们转到tcs,然后是最有趣的。基于.NET的内部考虑,他TaskCompletionSource认为此操作的延续tcs可以同步执行,即直接在同一堆栈task.Wait上执行,然后在同一堆栈上同步执行。尽管我们什至没有写任何东西,这还是很奇怪ExecuteSynchronously这可能是混合同步和异步代码的问题。



与此相关的另一个问题TaskCompletionSource是,当我们SetResult调用时,您不能调用任意代码,因为在锁下您只能执行一些细粒度的活动。在一些动作下运行,不可能来自他们来自哪里。如何解决这个问题呢?

var  tcs  =  new   TaskCompletionSource<int>(
       TaskContinuationsOptions.RunContinuationsAsynchronously  
) ;
lock(mylock)
{  
    tcs.SetResult(O); 
});

这是TaskCompletionSource值得使用只为不适应任务库中的代码。通过等待几乎可以解决所有其他问题。在这种情况下,始终强烈建议指定参数“ TaskCompletionSource.RunContinuationsAsynchronously”您几乎总是需要异步运行延续。在这种情况下,您tcs.SetResult将无法启动任何东西。



为什么要同步执行延续?因为它RunContinuationsAsynchronously指的是以下内容ContinueWith,而不是我们的内容。为了使他与我们的人联系起来,您需要编写以下内容:



此示例说明了参数如何不直观,它们如何相交,如何引入认知复杂性-很难编写。

亲子层次


Task.Factory.StartNew(() => 
{
  //... some parent activity

   Task.Factory.StartNew(() => {
      //... some child activity
   })

})
.ContinueWith(...) // don’t wait for child

还有其他使用参数的选项。例如,当您启动一个任务并在其下运行另一个任务时,就会出现父子层次结构。在这种情况下,如果您编写ContinueWithContinueWith则不会等待内部启动的任务。



如果您编写TaskCreationOptions.AttachedToParent,它将ContinueWith等待。您可以在产品中使用此属性。我认为每个人都可以举一个例子,其中有一个任务层次结构,任务等待子任务,子任务用于子任务。无需在任何地方写WaitForChildren,这种等待是异步发生的。也就是说,父任务的主体结束,并且在此之后父任务不被认为是完整的,直到子任务正常工作才开始其继续。

Task.Factory.StartNew(() => 
{
  //... some parent activity
  Foo(); 

})
.ContinueWith(...) // still wait for child

void Foo() { 
   Task.Factory.StartNew(() => {
      //... parent task to attach is in ThreadStatic
   }, TaskCreationOptions.AttachedToParent); 
}

可能存在将任务转移到中的某处的问题ThreadStatic,然后您开始时所做的所有操作AttachedToParent都会添加到此父任务中,这是一个警钟。

Task.Factory.StartNew(() => 
{
  //... some parent activity

  Foo();
}, TaskCreationOptions.DenyChildAttach)
.ContinueWith(...) // don’t wait for child

void Foo() { 
   Task.Factory.StartNew(() => {
      //... some child activity
   }, TaskCreationOptions.AttachedToParent); 
}

另一方面,有一个选项可以取消上一个选项DenyChildAttach这种应用经常发生。

Task.Run(() => 
{
  //... some parent activity

  Foo(); 

})
.ContinueWith(...) //don’t wait for child

void Foo() { 
   Task.Factory.StartNew(() => {
      //... some child activity
    }, TaskCreationOptions.AttachedToParent); 
}

值得记住的是,Task.Run这是标准的启动方式,默认情况下暗含DenyChildAttach

输入的隐式上下文ThreadStatic会增加复杂性。您不了解任务的工作原理,因为您需要了解上下文。可能出现的另一个问题与异步/等待的空闲状态有关。那是因为在异步/等待中,您没有任务,但是有动作。延续不是诚实的任务,而是行动。编写异步/等待代码时,不需要使用AttachedToParent它,因为您明确地将任务捆绑在一起以等待等待,这是正确的方法。



关于如何开始延续,您有六个选择。您启动了任务,启动了ContinueWith问题:这种延续将具有什么状态?有五个可能的答案:

  • 常规继续将成功完成;将发生RunToCompletion;
  • 任务将是错误的;
  • 取消将发生;
  • 这项任务根本不会完成,而是处于某种困境。
  • 选项-“取决于”。



在这种情况下,任务将处于“已取消”状态,尽管在任何地方都没有“已取消”一词。在这里,我们扔接待,什么也不做。问题是,即使您在十分钟前就知道了这些选项,当您阅读带有很多选项的他人的代码时,仍然会忘记这里发生的事情。所以不要写。

消除



Task.Factory.StartNew(() => 
{
    throw new OperationCanceledException(); 
});

                                                      Failed

任务开始时的第三个参数是kancellation。您编写OperationCanceledException,即,将任务置于“已取消”状态的特殊操作。在这种情况下,任务将处于“失败”状态,因为并非所有任务OperationCanceledException都相等。

Task.Factory.StartNew(() => 
{
    throw new OperationCanceledException(cancellationToken); 
}, cancellationToken);

                                                      Canceled

为了使任务能够执行Canceled,您需要将OperationCanceledException其及其CancellationToken一起抛出实际上,您永远不会显式地执行此操作,而是以这种方式进行操作:

Task.Factory.StartNew(() => 
{
    cancellationToken.ThrowIfCancellationRequested(); 
}, cancellationToken);
                                                       Canceled

是否需要区分cancelToken?在任务中的某处,您检查是否有人删除了您:取消抛出,然后任务进入状态Canceled或者有人在运行时单击“取消”并取消了任务。我们在JetBrains的实践表明,您无需区分这些标记。如果您收到OperationCanceledException-一种在发生某些取消时发生的特殊类型,则可以对其进行区分。在这种情况下,您只需要正常完成任务,不登录即可,并且在收到执行信息时-登录。

深栈


Task.Factory.StartNew(() => 
{
    Foo();
}, cancellationToken);

  void Foo() { 
     Bar() {
       ...
          Baz() {
             //how to get cancellation token?
          } 
    }
}

假设您有很多筹码。这CancellationToken是我们讨论的唯一显式参数。必须通过绝对所有层次结构将其传输到任何地方。如果在存在较深层次的情况下,您需要在最低级别的某个地方取消任务以放弃接收,该怎么办?我们使用了一个特殊的技巧。他被称为AsyncLocal

static AsyncLocal<Cancelation> asyncLocalCancellation;

Task.Factory.StartNew(() => 
{
     asyncLocalCancellation.Set(cancellationToken) 
    Foo();
}, cancellationToken); // use AsyncLocal to put cancellation int

  void Foo() { 
     async Bar() {
      ...
         Baz() {
             asyncLocalCancellation.Value.CheckForInterrupt(); 
         }
   } 
}

这与ThreadStatic仅在ThreadLocal异步/等待代码旅行中幸存的特殊代码相同。由于您的代码是异步的,并且具有此取消功能,因此将其放入中AsyncLocal,在更深的位置可以说“ CheckForInterrupt Throw If Cancellation Requested”。同样,这是唯一CancellationToken需要完全涂抹整个代码的参数,但是,我认为,对于大多数任务,您只需要知道发生了什么OperationCanceledException,并由此得出一个结论:取消或失败。

认知复杂性


Task.Factory.StartNew(Action, 
    TaskCreationOptions, 
    TaskScheduler, 
    CancellationToken
)
                                                   JetBrains.Lifetimes

lifetime.Start(TaskScheduler, Action) //puts lifetime in AsyncLocal

lifetime.StartMainRead(Action) 
lifetime.StartMainWrite(TaskScheduler, Action) 
lifetime.StartBackgroundRead(TaskScheduler, Action)

启动任务时阅读代码越困难,出错的风险就越高。一年后查看代码,您会忘记它的作用,因为其中包含大量参数。但是,我们有JetBrains.Lifetimes,它提供了现代化的寿命,以及优化的CancellationToken,与Start方法被重写,并与重复的代码段得到解决,因为这个问题Task.Factory.StartNewTaskCreationOptions

少数调度程序允许您使用读取锁定在主线程上调度任务。也就是说,读锁定不是您显式选择的,它是一种特殊的调度程序,它使用读锁定在主线程上调度代码,以及具有写锁的主线程,后台线程-现在,启动随机播放的方法变得非常简单。同时,生存期会自动通过取消AsyncLocal,从而大大简化了代码。



让我们看看异步/等待如何解决这些问题,以及它们带来了哪些问题。

在此示例中,部分代码被同步执行,然后等待异步代码。首先,重复的代码(样板要少得多是一件好事。其次,异步代码与同步代码非常相似,这正是async / await的目的。您可以以与同步编写相同的方式异步编写,而不会占用线程。

在这种情况下,编译器将部署什么?同步代码将同步执行,然后任务InnerAsync同步执行,特殊的GetAwaiter对象从何而来。在这种情况下,我们很感兴趣TaskAwaiter。您可以为任何对象编写等待者。结果,我们等待任务完成InnerAsync并同步执行它continuationCode。如果任务未完成,则Context Scheduler调度continuationCode。即使您写了await,也可能绝对会同步调用所有内容。

async Task MyFuncAsync() { 
  synchronousCode();
   await InnerAsync();
   await Task.Yield(); //guaranteed !IsCompleted 
   continuationCode();
}

有一个窍门Task.Yield-这是一项特殊的任务,可确保其侍者不会总是回到您身边IsCompleted。因此,continuation在此位置将不会同步调用它。对于UI线程,这可能很重要,因为您不会花费大量时间。



如何选择继续的线程?异步/等待哲学是这样的:您编写与同步相同的异步代码。如果您有一个线程池那么这对您没有任何影响-continuationCode将在另一个线程上执行。无论InnerAsync您说的是等待状态是否完成,都需要在UI线程上执行所有操作。

等待任务的机制如下:static它被调用,被调用SynchronizationContext从中创建TaskSchedulerSynchronizationContext是Post方法的事情,与方法非常相似Queue实际上TaskScheduler,这是更早的事情,它只需要SynchronizationContextPost并通过Post来执行其任务。

async Task MyFuncAsync() { 
  synchronousCode();

    await InnerAsync().ConfigureAwait(false);
    continuationCode(); 
}

有一种方法可以使用参数更改此行为ContinueOnCapturedContext.NET中最令人讨厌的API称为ConfigureAwait在这种情况下,API将创建一个特殊的等待者,TaskAwaiter等待者不同于改变继续的等待者,它在相同的线程,方法终止InnerAsync 和任务终止的相同上下文中运行

async Task MyFuncAsync() { 
  synchronousCode();

    await InnerAsync().ConfigureAwait(continueOnCapturedContext: false); 
    continuationCode(); //code must be absolutely context-agnostic
}

互联网上有很多疯狂的建议:如果您遇到死锁,请涂抹所有ConfigureAwait代码,一切都会好起来的。这是错误的方式。ConfigureAwait可以在某些库方法中要稍微提高性能的情况下使用,也可以在方法结束时使用。

死锁


async Task MyFuncAsync() { //UI thread 
  synchronousCode();

    await Task.Delay(10).ConfigureAwait(continueOnCapturedContext: false); 
    continuationCode();
}
myFuncAsync().Wait() //on UI thread

这是一个典型的僵局在UI线程上,他们等待了十秒钟,然后完成了Wait由于您所做的事情Wait,它将continuationCode永远不会启动,Wait因此,它将永远不会返回。所有这些都是在一开始就发生的。

async Task OnBluttionClick() { //UI thread 
  int v = Button.Text.ParseInt();

    await Task.Delay(10).ConfigureAwait(continueOnCapturedContext: false); 
  Button.Text.Set((v+1).ToString());
}
myFuncAsync().Wait() //on UI thread

想象这是一些真实的活动。我们单击按钮,将其打开,等待Button.ParseInt,然后写道:我们说:“请不要关闭我们的UI流,继续执行。”问题是我们希望第二部分也要在UI线程上执行,因为这是await的理念。也就是说,您的异步代码看起来与同步代码相同,并且在相同的上下文中运行。在这种情况下,当然会出现错误。此外,可以有许多假设其上下文的方法调用。在这种情况下该怎么办?你可以这样做:ConfigureAwaitConfigureAwaitButton.Text.Set

async Task MyFuncAsync() { //UI thread 
  synchronousCode();

    await Task.Delay(10).ConfigureAwait(continueOnCapturedContext: false); 
    continuationCode(); //The same UI context
}
PumpUntil(() => task.IsCompleted);
//VS synchronization contexts always pump on any Wait

使用UI线程,必须禁止在Wait具有公共消息队列的线程上执行此操作。您可以执行此消息队列,而无需执行操作Wait或编写操作ConfigureAwait,与此同时,连续过程也将被执行。如果不能混合使用同步和异步代码,则不要混合使用。但是有时候这是无法避免的。

例如,您有旧代码,并且必须将它们混合,然后再泵送UI流。 Visual Studio可以按期望的速度注入UI线程,甚至SynchronizationContext有所改变。如果您在任何上进入WaitHandle Wait,则在挂起时,您的UI流将被抽出。因此,他们在僵局和种族之间进行选择,而选择种族s。

抽水机-这是非理想的API,也就是说,当您在任意位置执行随机连续性时,可能会有细微差别。不幸的是,没有其他方法。混合同步和异步代码。如果有的话,整个骑士都安排在旧地方,所以有时也会有些细微差别。

改变背景


async Task MyFuncAsync() { 
  synchronousCode(); // on initial context

    await myTaskScheduler;
    continuationCode(); //on scheduler context 
}

还有另一种使用async / await的有趣方式你可以写Awaiterscheduler和跳线程。我在Visual Studio中阅读过帖子,他们写了很长时间的书,说在方法中间来回跳动并不好,但是现在他们自己做。Visual Studio具有一个通过调度程序跳入线程的API。对于正常使用,这样做是不好的。

结构化并发


async Task MyFuncAsync() { 
  synchronousCode(); // on initial context

    await Task.Factory.StartNew(() => {...}, myTaskScheduler);
    continuationCode(); //on initial context 
}

为了方便地浸入新环境并返回旧环境,应建立一些结构性竞争或结构并行性。例如,在60年代,GoTo运算符被认为是有害的,因为它违反了结构性。就在这里。跳线程违反了结构。令人惊讶的是,使用异步状态机似乎是一个不错的出路。也就是说,在违反常规结构的地方,您跳到GoTo,就可以违反线程结构:do await其与标签混合当您需要这样做时,这是一种极为奇怪和罕见的情况。尽管如此,等待返回相同的上下文还是更好因此,线程池将没有与原来相同的线程,而是具有相同的上下文。

顺序行为


为什么等待与并行执行不同?等待执行是顺序执行。在这种情况下,我们开始第一个任务,等待它,然后开始第二个任务-我们等待。我们没有并行性。对于大多数用途,不需要并行性。并行性本身比顺序更复杂。串行代码比并行代码简单,这是一个公理。但是有时您需要在并行代码中运行某些内容,并且您需要这样做:

async Task MyAsync() {

  var task1 = StartTask1Async();
  await task1;

  var task2 = StartTask2Async();
  await task2; 
}

并发行为


async Task MyAsync() {
  var task1 = StartTask1Async();
  var task2 = StartTask2Async();

  await task1;
  await task2; 
}

在这里,任务并行开始。显然,方法可以在运行状态下立即返回任务,因此不会有并行性。假设两个任务都抛出一个执行。然后您等待第一个任务,然后在第一次等待中起飞。就是说,您一写完await task1,就起飞了,却没有处理exception task2有趣的是,这是绝对有效的代码。正是这种代码使.NET导致这样一个事实,即在4.5版中,执行的行为已发生改变。

异常处理


async Task MyAsync() {
  var task1 = StartTask1Async();
  var task2 = StartTask2Async(); 

  await task1;
  await task2;

  // if task1 throws exception and task2 throws exception we only throw and
  // handle task1’s exception

  //4.0 -> 4.5 framework: unhandled exceptions now don’t crush process
  //still visible in UnobservedExceptionHandler
}

以前,未处理的执行只是简单地抛出了流程,如果您没有抓住其中的执行UnobservedExceptionHandlerstatic您也可以将其附加到调度程序中),那么该流程就不会执行。现在,这是绝对有效的代码。尽管.NET更改了其行为,但它保留了设置以沿相反的方向返回行为。

async  Task  MyAsync(CancellationToken cancellationToken)  {  

  await  SomeTask1  Async(cancellationToken); 
 
  await  Some Task2Async( cancellation  Token); 
  //you should always pass use async API with cancelationToken  if possible 
} 
  
try { 
    await  MyAsync( cancellation  Token); 
} catch (OperationException e) { // do nothing: OCE happened
} catch (Exception e) { 
    log.Error(e);
}

查看执行过程如何进行。必须传输CancellationToken -s,有必要“涂抹” CancellationToken -s所有代码。异步的正常行为是您不检查任何地方Task.Status ancellationToken,使用异步代码的方式与使用同步的方式相同。也就是说,在取消的情况下,您得到执行,而在这种情况下,收到时您什么也不做OperationCanceledException

状态为已取消和故障的区别是您没有收到OperationCanceledException,而是正常执行。在这种情况下,我们可以保证,您只需要执行此操作并据此得出结论即可。如果您通过Task显式启动了任务,那么您将得到飞行AggregateException并且在异步情况下,在这种情况下,他们AggregateException总是抛出其中的第一个执行(在这种情况下- OperationCanceled)。

在实践中


同步方式


DataTable<File, ProcessedFile> sharedMemory;

// in any thread
void SynchronousWorker(...) {
  File f = blockingQueue.Dequeue(); 
  ProcessedFile p = ProcessInParallel(f);

  lock (_lock) { 
    sharedMemory.add(f, p);
  } 
}

例如,一个恶魔在ReSharper中工作-这是为您着色文件的编辑器。如果在编辑器中打开了文件,则有一些活动会将其放入阻塞队列。我们的流程worker从那里读取数据,然后使用此文件执行一系列不同的任务,对其进行着色,解析,构建,然后将这些文件添加到中sharedMemory有了sharedMemory锁,其他机制已经在使用它。

异步方式


在将代码重写为异步代码时,我们首先将其替换voidasync Task确保最后写上“ Async”一词。所有异步方法都必须以Async结尾-这是一个约定。

DataTable<File, ProcessedFile> sharedMemory;
// in any thread
async Task WorkerAsync(...) {

  File f = blockingQueue.Dequeue(); 

  ProcessedFile p = ProcessInParallel(f);

  lock (_lock) { 
    sharedMemory.add(f, p);
  } 
}

在那之后,您需要对我们的做些事情blockingQueue。显然,如果有一些同步原语,那么就必须有一些异步原语。



这个原语称为channel:位于package中的通道System.Threading.Channels。您可以创建有限和无限制的通道和队列,可以异步等待。此外,您可以创建一个值为“零”的通道,也就是说,它根本没有缓冲区。这种渠道称为集合渠道,在Go和Kotlin中得到积极推广。原则上,如果可以在异步代码中使用通道,则这是一个非常好的模式。也就是说,我们将队列更改为存在方法ReadAsync的通道WriteAsync

ProcessInParallel是一堆并行代码,用于处理文件并将其转换为ProcessedFile异步可以帮助我们编写更紧凑的并行代码而不是异步代码吗?

简化并行代码


可以通过以下方式重写代码:

DataTable<File, ProcessedFile> sharedMemory;

// in any thread
async Task WorkerAsync(...) {

  File f = await channel.ReadAsync();

  ProcessedFile p = await ProcessInParallelAsync(f);

  lock (_lock) { 
    sharedMemory.add(f, p);
  } 
}



他们长什么样ProcessInParallel?例如,我们有一个文件。首先,我们将其分解为词素,并且我们可以并行执行两个任务:构建搜索缓存和构建语法树。之后是“搜索语义错误”的任务。在此重要的是,所有这些任务都必须形成有向无环图。也就是说,您可以在并行线程中运行某些部分,而不能在并行线程中运行,并且显然有依赖关系哪个任务应该等待其他任务。您将获得此类任务的图表,并希望以某种方式将其分散在线程中。是否可以将其编写得精美无误?在我们的代码中,多次以不同的方式解决了这个问题。编写此代码没有错误时,这种情况很少发生。



我们按如下方式定义此任务图:假设每个任务还​​有其他任务,它依赖于该任务,然后使用ExecuteBefore字典编写方法的框架。

骨架解决方案


Dictionary<Action<ProcessedFile>, Action<ProcessedFile>[]> ExecuteBefore; async Task<ProcessedFile> ProcessInParallelAsync() {
  var res = new ProcessedFile();


  // lots of work with toposort, locks, etc.

  return res; 
}

如果您直接解决此问题,则需要对该图进行拓扑排序。然后,执行没有相关任务的任务,执行该任务,分析锁下的结构,查看哪些任务没有相关任务。运行,以某种方式分散它们Task Runner我们将其编写得更加紧凑:图形的拓扑排序+在不同线程上执行此类任务。

异步懒


Dictionary<Action<ProcessedFile>, Action<ProcessedFile>[]> ExecuteBefore;
async Task<ProcessedFile> ProcessInParallelAsync() {
  var res = new ProcessedFile();
  var lazy = new Dictionary<Action<ProcessedFile>, Lazy<Task>>(); 
  foreach ((action, beforeList) in ExecuteBefore)
    lazy[action] = new Lazy<Task>(async () => 
    {
      await Task.WhenAll(beforeList.Select(b => lazy[b].Value)) 
      await Task.Yield();
      action(res);
}
  await Task.WhenAll(lazy.Values.Select(l => l.Value)) 
  return res;
}

有一种称为的模式Async Lazy。我们创建我们ProcessedFile应该执行不同动作的动作。让我们创建一个字典:将每个阶段(Action ProcessedFile)的格式设置为某个Task,或者从Task设置为Lazy,然后沿原始图形运行。变量action将具有动作本身,并且在beforeList中-必须在我们之前执行的那些动作。然后Lazy创建action。我们用Task编写await。因此,我们正在等待必须完成的所有任务。在beforeList中,选择Lazy该词典中的一个。

请注意,此处不会同步执行任何内容,因此该代码不会落在上ItemNotFoundException in Dictionary。我们执行了我们之前的所有任务,通过行动进行搜索Lazy Task然后我们执行动作。最后,您只需要让每个任务开始即可,否则您将永远不知道某些事情是否没有开始。在这种情况下,什么也没有开始。这是解决方案。这种方法是在10分钟内编写的,这是绝对显而易见的。

因此,异步代码是我们的决定,起初它占据了几个屏幕,这些屏幕具有复杂的竞争代码。在这里,他是绝对一致的。我什至不使用它ConcurrentDictionary,而是使用通常的它Dictionary,因为我们没有竞争性地给它写任何东西。有一个一致的代码。我们很好地解决了使用async -s编写并行代码的问题,这意味着-没有错误。

摆脱锁


DataTable<File, ProcessedFile> sharedMemory;

// in any thread
async Task WorkerAsync(...) {

  File f = await channel.ReadAsync();

  ProcessedFile p = await ProcessInParallelAsync(f);

    lock (_lock) {
      sharedMemory.add(f, p);
   }
 }

是否值得引入异步和这些锁?现在有各种各样的异步锁,异步信号量,即试图使用同步和异步代码中的原语。这个概念似乎是错误的,因为使用锁可以保护某些东西免于并行执行。我们的任务是将并行执行转换为顺序执行,因为它更容易。而且如果容易的话,错误也就更少了。

Channel<Pair<File, ProcessedFile>> output;
// in any thread
async Task WorkerAsync(...) {

  File f = await channel.ReadAsync();

  ProcessedFile p = await ProcessInParallelAsync(f);
  
  await output.WriteAsync(); 
}

我们可以创建一些通道,并在其中放置几个​​File和ProcessedFile,ReadAsync其他一些过程将处理此通道,并将按顺序进行。锁本身除了保护结构外,还实质上使访问线性化,在此位置,连续线程中的所有线程都变为并行。并且我们正在用渠道显式替换它。



架构如下:工作人员从文件接收文件input并将它们发送到处理器的某个位置,处理器也按顺序处理所有内容,没有并行性。该代码看起来简单得多。我了解并非所有事情都可以通过这种方式完成。当您可以构建数据管道时,这种体系结构并不总是有效。



可能是您有第二个通道进入处理器,而不是从通道中形成非循环有向图,而是有周期的图。这是Roman Elizarov在2018年告诉KotlinConf的一个例子。他使用这些渠道在Kotlin上编写了一个示例,并且那里存在循环,并且该示例已关闭。问题是,如果在图形中有这样的循环,那么异步世界中的一切都会变得更加复杂。异步死锁的坏处在于,当您拥有一堆线程时,它们比同步死锁更难解决,而且很明显,挂死了什么。因此,它是必须正确使用的工具。

摘要


  • 避免异步代码中的同步。
  • 串行代码比并行代码简单。
  • 异步代码可以很简单,并使用最少的参数和更改其行为的隐式上下文。

如果您已经习惯于编写同步代码,并且即使异步代码与同步代码非常相似,也不要在其中拖拽原语,而primitives是您在同步代码中所习惯的async mutex。如果可能,使用提要,以及其他传递消息的原语

串行代码比并行代码简单。如果您可以编写体系结构使其顺序显示,而无需运行并行代码和锁定,则可以按顺序编写体系结构。

这是我们从大量带有任务的示例中看到的最后一件事。在设计系统时,请尽量减少对隐式上下文的依赖。隐式上下文导致对代码中发生的事情的误解,并且您可能会忘记一年中的隐式问题。而且如果另一个人处理此代码并重做其中的某些内容,则可能会导致您曾经知道的困难,而新程序员由于隐式上下文而不知道。结果,不良的设计的特征在于大量的参数,它们的组合和隐式上下文。

读什么



-10 . DotNext .

All Articles