.net core 消息流处理流程

前言

2020年即将进入尾声,分享一下在现公司业务处理流程,一起讨论在分布式场景下,如何通过消息流的方式处理各种复杂的业务场景,这里涉及到一些常用组件,后面结合场景与代码来具体说明

场景说明

这里就拿我负责的短信应用来举例,它由3个核心模块组成

  1. 短信网关(接收客户提交短信,接收通道短信回执,转发回执给客户...)
  2. 计费服务(短信计费,购买套餐...)
  3. 短信通道(短信提交到通道...)

痛点

  • 批量提交营销类短信,如客户一次提交10w-20w条短信
  • 短信实时计费,延迟问题
  • 通道限流控制
  • 通知回执延时问题

组件

mysql
redis
阿里云日志

流程如下

blockchain

痛点解决方案

  • 批量提交短信

批量提交这里主要以下几种情况

  1. 短信内容一致,手机号多个
  2. 短信内容不一致(例如内容携带用户信息等... 通常采用excel上传)
  3. 循环调用接口,提交短信

这里仅针对情况3来说明,首先将用户信息存入redis(先读缓存,再读库),减少在验证与鉴权时对数据库的查询压力,校验通过的消息开始写入收单队列,并记录日志(注意日志一定要异步写入),队列使用的redis队列,以目前的业务承载能力,是完全没有问题的,收单接口的qps可以通过分布式来提升,这个得益于k8s容器伸缩,平时我们一般是5个pod在运行(相当于负载5个应用),在节假日高峰期可以起10个pod,这里性能的瓶颈主要集中在redis队列,如果有更高要求可以尝试换成rabbitmq,kafka等

  • mysql批量持久化

这也一直是我比较迷的一个地方,数据库使用的阿里云mysql,单条循环插入速度在200/s左右,我这里采用的dapper,通过拼接values来批量插入,速度大概能达到3000/s,后面看看有没有更好的方案

  • 短信实时计费,延迟问题

在分布式情况下,实时计费又是一个比较大的性能瓶颈,直接读库,并修改用户条数显然是不行的,这样会出现脏读,导致最终数据不准确而出损(程序员就要背锅),而且效率低下,使用redis分布式锁等同于将分布式改成单应用,需要频繁更新缓存里的用户短信余量,速度大概在150/s - 180/s,消费速度过慢会导致队列里数据堆积,短信延迟过高,特别是通知类短信(如获取验证码)对延迟有较高的要求,这里使用redis的计数器来实现,通过计数器递减的方式,如果短信余量<0则用户短信余量不足,再单独起一个任务,每分钟同步一下用户短信余量,用户充值与短信失败回退,也是对计数器的操作

  • 通道限流控制

通道限流主要是供货商对通道进行流量限制,超频的短信会直接失败,一般出现在营销类短信,因为每条通道的价格(与地域,三网,到达率...有关)都不一样,每个用户都会分配通道,为了让短信尽量成功,程序需要进行限流,这里也是使用redis计数器实现,设置一个1分钟失效的缓存,超过频次后,会尝试其它通道,没有可用通道则再次写入队列

  • 通知回执延时问题

系统拿到回执后,需要将回执通知给客户配置的http地址,并得到客户的响应,未响应的回执会再次入列,直到客户响应,或者超过推送策略(推送3次或超过多长时间),有些客户配置的http地址响应非常慢,或者干脆是一些访问超时的地址,这样会导致通知延迟,通知类发卡密业务的短信对回执有较高要求,这里通过多线程来实现,通过创建多个线程从队列里获取回执并转发

  List<Task> tasks = new List<Task>();
  for (int i = 0; i < 10; i++)
  {
      tasks.Add(Task.Run(async () =>{...}));
  }
  await Task.WhenAll(tasks);
  • 注意事项
  1. 因为操作redis的地方非常多,为了便于管理,所有redis的key建议统一写在常量类里,并写上注释,并制定对应的规范,方便维护
/// 项目名_类型_操作_参数    有效期
project_queue_action_params 
  1. 业务日志,业务日志方便排查问题,建议不要偷懒,在业务的入口跟出口都写上对应的日志信息
posted @ 2020-11-18 16:43  提伯斯  阅读(1620)  评论(8编辑  收藏  举报