[聚合文章] .Net 如何模拟会话级别的信号量,对http接口调用频率进行限制(有demo)

.Net 2018-01-09 17 阅读

现在,因为种种因素,你必须对一个请求或者方法进行频率上的访问限制。
比如, 你对外提供了一个API接口,注册用户每秒钟最多可以调用100次,非注册用户每秒钟最多可以调用10次。
比如, 有一个非常吃服务器资源的方法,在同一时刻不能超过10个人调用这个方法,否则服务器满载。
比如, 有一些特殊的页面,访客并不能频繁的访问或发言。
比如, 秒杀活动等进行。
比如 ,防范DDOS,当达到一定频率后调用脚本iis服务器ip黑名单,防火墙黑名单。
如上种种的举例,也就是说,如何从一个切面的角度对调用的方法进行频率上的限制。而对频率限制,服务器层面都有最直接的解决方法,现在我说的则是代码层面上的频率管控。

本文给出两个示例,一个是基于单机环境的实现,第二个则是基于分布式的Redis实现

--------------------

以第一个API接口需求为例,先说下单机环境下的实现。
按照惯性思维,我们自然会想到缓存的过期策略这种方法,但是严格来讲就HttpRuntime.Cache而言,通过缓存的过期策略来对请求进行频率的并发控制是不合适的。
  HttpRuntime.Cache 是应用程序级别的Asp.Net的缓存技术,通过这个技术可以申明多个缓存对象,可以为每个对象设置过期时间,当过期时间到达后该缓存对象就会消失(也就是当你访问该对象的时候为Null)

  为什么这样说呢?比如对某个方法(方法名:GetUserList)我们要进行1秒钟最多10次的限制,现在我们就新建一个int型的Cache对象,然后设置1秒钟后过期消失。那么每当访问GetUserList方法前,我们就先判断这个Cache对象的值是否大于10,如果大于10就不执行GetUserList方法,如果小于10则允许执行。每当访问该对象的时候如果不存在或者过期就新建,这样周而复始,则该对象永远不可能超过10。

1   if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大于10请求失败2   {3      Console.WriteLine("禁止请求");4   }5   else6   {7      HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //否则该缓存对象的值+18      Console.WriteLine("允许请求");9   }

这样的思想及实现相对来说非常简单,但是基于这样的一个模型设定,那么就会出现这种情况:

 

 

如上图,每个点代表一次访问请求,我在0秒的时候 新建了一个名字为GetUserListNum的缓存对象。
在0~0.5秒期间 我访问了3次在0.5~1秒期间,我们访问了7次。此时,该对象消失,然后我们接着访问,该对象重置为0.
                在第1~1.5秒期间,还是访问了7次,在第1.5秒~2秒期间访问了3次。

基于这种简单缓存过期策略的模型,在这2秒钟内,我们虽然平均每秒钟都访问了10次,满足这个规定,但是如果我们从中取一个期间段,0.5秒~1.5秒期间,也是1秒钟,但是却实实在在的访问了14次!远远超过了我们设置的 1秒钟最多访问10次的 限制。

 

那么如何科学的来解决上面的问题呢?我们可以通过模拟会话级别的信号量这一手段,这也就是我们今天的主题了。
   什么是信号量?仅就以代码而言,  static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5);  它的意思就代表在多线程情况下,在任何一时刻,只能同时5个线程去访问。

 

4容器4线程模型

现在,在实现代码的之前我们先设计一个模型。

  假设我们有一个用户A的管道,这个管道里装着用户A的请求,比如用户A在一秒钟发出了10次请求,那么每一个请求过来,管道里的元素都会多一个。但是我们设定这个管道最多只能容纳10个元素,而且每个元素的存活期为1秒,1秒后则该元素消失。那么这样设计的话,无论是速率还是数量的突进,都会有管道长度的限制。这样一来,无论从哪一个时间节点或者时间间隔出发,这个管道都能满足我们的频率限制需求。

而这里的管道,就必须和会话Id来对应了。每当有新会话进来的时候就生成一个新管道。这个会话id根据自己场景所定,可以是sessionId,可以是ip,也可以是token。

那么既然这个管道是会话级别的,我们肯定得需要一个容器,来装这些管道。现在,我们以IP来命名会话管道,并把所有的管道都装载在一个容器中,如图

而基于刚才的设定,我们还需要对容器内的每条管道的元素进行处理,把过期的给剔除掉,为此,还需要单独为该容器开辟出一个线程来为每条管道进行元素的清理。而当管道的元素为0时,我们就清掉该管道,以便节省容器空间。

 

当然,由于用户量多,一个容器内可能存在上万个管道,这个时候仅仅用一个容器来装载来清理,在效率上显然是不够的。这个时候,我们就得对容器进行横向扩展了。

  比如,我们可以根据Cpu核心数自动生成对应的数量的容器,然后根据一个算法,对IP来进行导流。我当前cpu是4个逻辑核心,就生成了4个容器,每当用户访问的时候,都会最先经过一个算法,这个算法会对IP进行处理,如192.168.1.11~192.168.1.13这个Ip段进第一个容器,xxx~xxx进第二个容器,依次类推,相应的,也就有了4个线程去分别处理4个容器中的管道。

 

那么,最终就形成了我们的4容器4线程模型了。

现在,着眼于编码实现:

  首先我们需要一个能承载这些容器的载体,这个载体类似于连接池的概念,可以根据一些需要自动生成适应数量的容器,如果有特殊要求的话,还可以在容器上切出一个容器管理的面,在线程上切出一个线程管理的面以便于实时监控和调度。如果真要做这样一个系统,那么 容器的调度 和 线程的调度功能 是必不可少的,而本Demo则是完成了主要功能,像容器和线程在代码中我也没剥离开来,算法也是直接写死的,实际设计中,对算法的设计还是很重要的,还有多线程模型中,怎样上锁才能让效率最大化也是重中之重的。

而这里为了案例的直观就直接写死成4个容器。

public static List<Container> ContainerList = new List<Container>(); //容器载体static Factory(){     for (int i = 0; i < 4; i++)     {        ContainerList.Add(new Container(i));  //遍历4次  生成4个容器     }     foreach (var item in ContainerList)     {        item.Run();    //开启线程     }}

现在,我们假定 有编号为 0 到 40 这样的 41个用户。那么这个导流算法 我也就直接写死,编号0至9的用户 将他们的请求给抛转到第一个容器,编号10~19的用户 放到第二个容器,编号20~29放到第三个容器,编号30~40的用户放到第四个容器。

那么这个代码就是这样的:

 static Container GetContainer(int userId, out int i) //获取容器的算法 {     if (0 <= userId && userId < 10)    //编号0至9的用户  返回第一个容器  依次类推     {          i = 0;          return ContainerList[0];     }     if (10 <= userId && userId < 20)     {          i = 1;          return ContainerList[1];     }     if (20 <= userId && userId < 30)     {          i = 2;          return ContainerList[2];      }      i = 3;      return ContainerList[3];  }

当我们的会话请求经过算法的导流之后,都必须调用一个方法,用于辨别管道数量。如果管道数量已经大于10,则请求失败,否则成功

  public static void Add(int userId)  {       if (GetContainer(userId, out int i).Add(userId))            Console.WriteLine("容器" + i + " 用户" + userId + "  发起请求");       else            Console.WriteLine("容器" + i + " 用户" + userId + "  被拦截");  }

接下来就是容器Container的代码了。

这里,对容器的选型用线程安全的ConcurrentDictionary类。
  线程安全:当多个线程同时读写同一个共享元素的时候,就会出现数据错乱,迭代报错等安全问提
  ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0专为解决Dictionary线程安全而出的新类型
  ReaderWriterLockSlim:较ReaderWriterLock优化的读写锁,多个线程同时访问读锁 或  一个线程访问写锁

private ReaderWriterLockSlim obj = new ReaderWriterLockSlim();  //在每个容器中申明一个读写锁public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //创建该容器 dic

然后当你向容器添加一条管道中的数据是通过这个方法:

 public bool Add(int userId) {     obj.EnterReadLock();//挂读锁,允许多个线程同时写入该方法     try     {         ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(), new ConcurrentList<DateTime>()); //如果不存在就新建 ConcurrentList         return dtList.CounterAdd(10, DateTime.Now); //管道容量10,当临界管道容量后 返回false     }     finally     {         obj.ExitReadLock();     } }

 这里,为了在后面的线程遍历删除ConcurrentList的管道的时候保证ConcurrentList的安全性,所以此处要加读锁。

 而ConcurrentList,因为.Net没有推出List集合类的线程安全,所以自己新建了一个继承于List<T>的安全类型,在这里 封装了3个需要使用的方法。

public class ConcurrentList<T> : List<T>{    private object obj = new object();        public bool CounterAdd(int num, T value)    {        lock (obj)        {            if (base.Count >= num)                return false;            else                base.Add(value);            return true;        }    }    public new bool Remove(T value)    {        lock (obj)        {            base.Remove(value);            return true;        }    }    public new T[] ToArray()     {        lock (obj)        {            return base.ToArray();        }    }}

最后就是线程的运行方法:

 public void Run() {     ThreadPool.QueueUserWorkItem(c =>     {         while (true)         {             if (dic.Count > 0)             {                 foreach (var item in dic.ToArray())                 {                     ConcurrentList<DateTime> list = item.Value;                     foreach (DateTime dt in list.ToArray())                        {                         if (DateTime.Now.AddSeconds(-3) > dt)                         {                             list.Remove(dt);                             Console.WriteLine("容器" + seat + " 已删除用户" + item.Key + "管道中的一条数据");                         }                     }                     if (list.Count == 0)                     {                         obj.EnterWriteLock();                         try                         {                             if (list.Count == 0)                             {                                 if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i))                                 { Console.WriteLine("容器" + seat + " 已清除用户" + item.Key + "的List管道"); }                             }                         }                         finally                         {                             obj.ExitWriteLock();                         }
                

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。