asp.net core microservices 架构之Task 事务一致性 事件源 详解

一 aspnetcore之task的任务状态-CancellationToken                   

     我有一篇文章讲解了asp.net的线程方面的知识。我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作也比较简化,人为控制比较弱。那这一节为什么要说线程尼?大家有没有遇到过,部署或者人为的去重启,往往会造成一些不必要的业务中断,web api有这样的情况,后台程序也有这样的情况。异常和系统硬件的故障已经让我们防不胜防了,那么就尽量的人为的情况少那么一点点,系统的健壮性也就高那么一点点。

   目前有两个技巧可以处理这一类事情,第一是让主机graceful方式关闭,并且超时时间设置长一点,这样就有足够的时间,让运行的请求执行完毕,看代码:

    

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .Build();

    await host.RunAsync();
}

这是官方上的一段话:IHostedService 是执行代码的入口点。 每个 IHostedService 实现都按照 ConfigureServices 中服务注册的顺序执行。 主机启动时,每个 IHostedService 上都会调用 StartAsync,主机正常关闭时,以反向注册顺序调用 StopAsync

//关闭超时值

ShutdownTimeout 设置 StopAsync 的超时值。 默认值为 5 秒。
Program.Main 中的以下选项配置将默认值为 5 秒的关闭超时值增加至 20 秒:
C#

//复制
var host = new HostBuilder()
    .ConfigureServices((hostContext, services) =>
    {
        services.Configure<HostOptions>(option =>
        {
            option.ShutdownTimeout = System.TimeSpan.FromSeconds(20);
        });
    })
    .Build();

而我们看看源码中StopAsync方法:

/// <summary>
        /// Attempts to gracefully stop the host with the given timeout.
        /// </summary>
        /// <param name="host"></param>
        /// <param name="timeout">The timeout for stopping gracefully. Once expired the
        /// server may terminate any remaining active connections.</param>
        /// <returns></returns>
        public static Task StopAsync(this IHost host, TimeSpan timeout)
        {
            return host.StopAsync(new CancellationTokenSource(timeout).Token);
        }

系统接受到Ctrl+c和sign,就会调用这个方法,以比较礼貌的方式关闭。

那么看源码,这两个都是具有阻塞功能的异步方法,对应的非异步方法,都是同步调用的这两个方法:

/// <summary>
        /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered.
        /// </summary>
        /// <param name="host">The <see cref="IHost"/> to run.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task RunAsync(this IHost host, CancellationToken token = default)
        {
            using (host)
            {
                await host.StartAsync(token);

                await host.WaitForShutdownAsync(token);
            }
        }

        /// <summary>
        /// Returns a Task that completes when shutdown is triggered via the given token.
        /// </summary>
        /// <param name="host">The running <see cref="IHost"/>.</param>
        /// <param name="token">The token to trigger shutdown.</param>
        public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default)
        {
            var applicationLifetime = host.Services.GetService<IApplicationLifetime>();
        //当前token执行取消的时候,激发这个委托。
            token.Register(state =>
            {
                ((IApplicationLifetime)state).StopApplication(); //当进程取消的时候,通知注册IApplicationLifetime的进程也取消。
            },
            applicationLifetime);

            var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
            //应用程序生命周期中的停止应用token激发的时候,执行这个委托,去释放阻塞,执行host的停止方法。
            applicationLifetime.ApplicationStopping.Register(obj =>
            {
                var tcs = (TaskCompletionSource<object>)obj;
                tcs.TrySetResult(null);
            }, waitForStop);

            await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);执行完毕。
// Host will use its default ShutdownTimeout if none is specified.

await host.StopAsync(); //调用关闭 }

具体原理就是Host使用这个applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource这个类,使用这个类是可以控制task的取消执行的。

所以,两个解决方案,如果是webapi,就将将超时时间设置大一点。

第二,如果在非webapi中,使用了超长执行的Task,就使用CancellationTokenSource吧,将它的Token传进去,在外边判断是否执行中,如果不在执行中,就执行Cancel方法,当然在task内部,也可以

判断token,是否自己主动取消掉。

这是官方的一个例子,了解CancellationTokenSource这个类,那么就会明白如何去处理Task

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class Example
{
   public static void Main()
   {
      // Define the cancellation token.
      CancellationTokenSource source = new CancellationTokenSource();
      CancellationToken token = source.Token;

      Random rnd = new Random();
      Object lockObj = new Object();
      
      List<Task<int[]>> tasks = new List<Task<int[]>>();
      TaskFactory factory = new TaskFactory(token);
      for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
         int iteration = taskCtr + 1;
         tasks.Add(factory.StartNew( () => {
                                       int value;
                                       int[] values = new int[10];
                                       for (int ctr = 1; ctr <= 10; ctr++) {
                                          lock (lockObj) {
                                             value = rnd.Next(0,101);
                                          }
                                          if (value == 0) { 
                                             source.Cancel();
                                             Console.WriteLine("Cancelling at task {0}", iteration);
                                             break;
                                          }   
                                          values[ctr-1] = value; 
                                       }
                                       return values;
                                    }, token));   
         
      }
      try {
         Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(), 
                                                      (results) => {
                                                         Console.WriteLine("Calculating overall mean...");
                                                         long sum = 0;
                                                         int n = 0; 
                                                         foreach (var t in results) {
                                                            foreach (var r in t.Result) {
                                                                  sum += r;
                                                                  n++;
                                                               }
                                                         }
                                                         return sum/(double) n;
                                                      } , token);
         Console.WriteLine("The mean is {0}.", fTask.Result);
      }   
      catch (AggregateException ae) {
         foreach (Exception e in ae.InnerExceptions) {
            if (e is TaskCanceledException)
               Console.WriteLine("Unable to compute mean: {0}", 
                                 ((TaskCanceledException) e).Message);
            else
               Console.WriteLine("Exception: " + e.GetType().Name);
         }
      }
      finally {
         source.Dispose();
      }
   }
}
// Repeated execution of the example produces output like the following:
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 10
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 5.29545454545455.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 6
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 4.97363636363636.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 5
//       Unable to compute mean: A task was canceled.
//       
//       Cancelling at task 4
//       Unable to compute mean: A task was canceled.
//       
//       Calculating overall mean...
//       The mean is 4.86545454545455.

 

二   业务的事务一致性                                                                

       因为微服务的理念中是牺牲了系统业务的一致性,我们知道事务的一致性都是靠的数据库的本地事务,或者分布式事务来实现的,但是微服务是严禁使用分布式事务。那么如何保证整个系统的事务完整性尼?举个例子:比如订单服务中,新接受一个订单,这个订单需要同步到库房的订单子系统,那么在订单服务中的这个订单在最后更新自己订单状态的时候,是需要同时发送异步消息给库房消息服务器的,如果这时候网络断了,本地订单更新成功了,但是异步消息没有发送过去,这样就会引起业务的缺失,目前有两个方法可以实现:

      第一:为本地数据库创建事件源表,记录下消息和本地数据更新的全部状态,比如订单在更新前就可以添加事件,事件状态可以有,准备更新订单,订单已更新,发送消息队列,消息发送成功等。

这样的好处就是最后跟踪这个事务处理的时候,每个步骤都可以找到,而且完全不用事务。最后job去跟踪失败情况,然后根据情况处理。

      第二:只是用本地事务,就是在订单更新的时候,同时给事件源表添加消息内容,然后让后台job去发送消息,这样是最简单和最稳定的方式。

      当然,最合适的还是第一种方法,虽然代码能复杂点,但是最后的效果是一样的,而且效率是比第二种方法更高效,但是考虑打事件源表并不是并发频繁操作的表,所以这个看自己的喜好了。

针对一个系统,业务的一致性,也并不是全部,针对于一些关键业务做好一致性,但是很多其实可以设计成为在用户ui层面去补偿操作,唯一的坏处就是一部分数据需要重新填写。

三     事件源                                                                                                           

     这个事件源并不是为了解决业务的一致性,而是为了应对大数据量的请求,比如,客户管理,一个分类下有上万条记录需要处理,那么往往我们需要对性能和实时反馈上有个折衷。

     系统设计如下:

                                   

  这样看来,会增加1个api服务和一个后台服务,但是对于系统的问题,却得到了一个缓冲,或许这个设计不是最好的,但是却可以做一个抛砖引玉的案例,现实中案例非常多变,所以设计也会有很多方案。

  因为目前我们看到的大部分app,请求的时候,某些功能确实会有少许等待事件,这个都是为了折衷,当然这一篇内容并不是讨论云或者分布式计算,但是在后台这块处理越快,反馈也越快。

  这套方案的设计理念其实就是异步处理,可以有自己的优化空间,而并不会消耗api这个轻量级服务,后台分布式计算越快,app反应也越快,到一定程度,就并不会感觉到有延迟,这就是大师比喻的耳朵与眼睛的关系。

posted @ 2019-01-28 22:14  无为有道  阅读(1294)  评论(0编辑  收藏  举报