现在,因为种种因素,你必须对一个请求或者方法进行频率上的访问限制。
比如, 你对外提供了一个API接口,注册用户每秒钟最多可以调用100次,非注册用户每秒钟最多可以调用10次。
比如, 有一个非常吃服务器资源的方法,在同一时刻不能超过10个人调用这个方法,否则服务器满载。
比如, 有一些特殊的页面,访客并不能频繁的访问或发言。
比如, 秒杀活动等进行。
比如 ,防范DDOS,当达到一定频率后调用脚本iis服务器ip黑名单,防火墙黑名单。
如上种种的举例,也就是说,如何从一个切面的角度对调用的方法进行频率上的限制。而对频率限制,服务器层面都有最直接的解决方法,现在我说的则是代码层面上的频率管控。
本文给出两个示例,一个是基于单机环境的实现,第二个则是基于分布式的Redis实现。
--------------------
以第一个API接口需求为例,先说下单机环境下的实现。
按照惯性思维,我们自然会想到缓存的过期策略这种方法,但是严格来讲就HttpRuntime.Cache而言,通过缓存的过期策略来对请求进行频率的并发控制是不合适的。
HttpRuntime.Cache 是应用程序级别的Asp.Net的缓存技术,通过这个技术可以申明多个缓存对象,可以为每个对象设置过期时间,当过期时间到达后该缓存对象就会消失(也就是当你访问该对象的时候为Null)
为什么这样说呢?比如对某个方法(方法名:GetUserList)我们要进行1秒钟最多10次的限制,现在我们就新建一个int型的Cache对象,然后设置1秒钟后过期消失。那么每当访问GetUserList方法前,我们就先判断这个Cache对象的值是否大于10,如果大于10就不执行GetUserList方法,如果小于10则允许执行。每当访问该对象的时候如果不存在或者过期就新建,这样周而复始,则该对象永远不可能超过10。
1 if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大于10请求失败2 {3 Console.WriteLine("禁止请求");4 }5 else6 {7 HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //否则该缓存对象的值+18 Console.WriteLine("允许请求");9 }
这样的思想及实现相对来说非常简单,但是基于这样的一个模型设定,那么就会出现这种情况:
如上图,每个点代表一次访问请求,我在0秒的时候 新建了一个名字为GetUserListNum的缓存对象。
在0~0.5秒期间 我访问了3次,在0.5~1秒期间,我们访问了7次。此时,该对象消失,然后我们接着访问,该对象重置为0.
在第1~1.5秒期间,还是访问了7次,在第1.5秒~2秒期间访问了3次。
基于这种简单缓存过期策略的模型,在这2秒钟内,我们虽然平均每秒钟都访问了10次,满足这个规定,但是如果我们从中取一个期间段,0.5秒~1.5秒期间,也是1秒钟,但是却实实在在的访问了14次!远远超过了我们设置的 1秒钟最多访问10次的 限制。
那么如何科学的来解决上面的问题呢?我们可以通过模拟会话级别的信号量这一手段,这也就是我们今天的主题了。
什么是信号量?仅就以代码而言, static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5); 它的意思就代表在多线程情况下,在任何一时刻,只能同时5个线程去访问。
4容器4线程模型
现在,在实现代码的之前我们先设计一个模型。
假设我们有一个用户A的管道,这个管道里装着用户A的请求,比如用户A在一秒钟发出了10次请求,那么每一个请求过来,管道里的元素都会多一个。但是我们设定这个管道最多只能容纳10个元素,而且每个元素的存活期为1秒,1秒后则该元素消失。那么这样设计的话,无论是速率还是数量的突进,都会有管道长度的限制。这样一来,无论从哪一个时间节点或者时间间隔出发,这个管道都能满足我们的频率限制需求。
而这里的管道,就必须和会话Id来对应了。每当有新会话进来的时候就生成一个新管道。这个会话id根据自己场景所定,可以是sessionId,可以是ip,也可以是token。
那么既然这个管道是会话级别的,我们肯定得需要一个容器,来装这些管道。现在,我们以IP来命名会话管道,并把所有的管道都装载在一个容器中,如图
而基于刚才的设定,我们还需要对容器内的每条管道的元素进行处理,把过期的给剔除掉,为此,还需要单独为该容器开辟出一个线程来为每条管道进行元素的清理。而当管道的元素为0时,我们就清掉该管道,以便节省容器空间。
当然,由于用户量多,一个容器内可能存在上万个管道,这个时候仅仅用一个容器来装载来清理,在效率上显然是不够的。这个时候,我们就得对容器进行横向扩展了。
比如,我们可以根据Cpu核心数自动生成对应的数量的容器,然后根据一个算法,对IP来进行导流。我当前cpu是4个逻辑核心,就生成了4个容器,每当用户访问的时候,都会最先经过一个算法,这个算法会对IP进行处理,如192.168.1.11~192.168.1.13这个Ip段进第一个容器,xxx~xxx进第二个容器,依次类推,相应的,也就有了4个线程去分别处理4个容器中的管道。
那么,最终就形成了我们的4容器4线程模型了。
现在,着眼于编码实现:
首先我们需要一个能承载这些容器的载体,这个载体类似于连接池的概念,可以根据一些需要自动生成适应数量的容器,如果有特殊要求的话,还可以在容器上切出一个容器管理的面,在线程上切出一个线程管理的面以便于实时监控和调度。如果真要做这样一个系统,那么 容器的调度 和 线程的调度功能 是必不可少的,而本Demo则是完成了主要功能,像容器和线程在代码中我也没剥离开来,算法也是直接写死的,实际设计中,对算法的设计还是很重要的,还有多线程模型中,怎样上锁才能让效率最大化也是重中之重的。
而这里为了案例的直观就直接写死成4个容器。
public static List<Container> ContainerList = new List<Container>(); //容器载体static Factory(){ for (int i = 0; i < 4; i++) { ContainerList.Add(new Container(i)); //遍历4次 生成4个容器 } foreach (var item in ContainerList) { item.Run(); //开启线程 }}
现在,我们假定 有编号为 0 到 40 这样的 41个用户。那么这个导流算法 我也就直接写死,编号0至9的用户 将他们的请求给抛转到第一个容器,编号10~19的用户 放到第二个容器,编号20~29放到第三个容器,编号30~40的用户放到第四个容器。
那么这个代码就是这样的:
static Container GetContainer(int userId, out int i) //获取容器的算法 { if (0 <= userId && userId < 10) //编号0至9的用户 返回第一个容器 依次类推 { i = 0; return ContainerList[0]; } if (10 <= userId && userId < 20) { i = 1; return ContainerList[1]; } if (20 <= userId && userId < 30) { i = 2; return ContainerList[2]; } i = 3; return ContainerList[3]; }
当我们的会话请求经过算法的导流之后,都必须调用一个方法,用于辨别管道数量。如果管道数量已经大于10,则请求失败,否则成功
public static void Add(int userId) { if (GetContainer(userId, out int i).Add(userId)) Console.WriteLine("容器" + i + " 用户" + userId + " 发起请求"); else Console.WriteLine("容器" + i + " 用户" + userId + " 被拦截"); }
接下来就是容器Container的代码了。
这里,对容器的选型用线程安全的ConcurrentDictionary类。
线程安全:当多个线程同时读写同一个共享元素的时候,就会出现数据错乱,迭代报错等安全问提
ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0专为解决Dictionary线程安全而出的新类型
ReaderWriterLockSlim:较ReaderWriterLock优化的读写锁,多个线程同时访问读锁 或 一个线程访问写锁
private ReaderWriterLockSlim obj = new ReaderWriterLockSlim(); //在每个容器中申明一个读写锁public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //创建该容器 dic
然后当你向容器添加一条管道中的数据是通过这个方法:
public bool Add(int userId) { obj.EnterReadLock();//挂读锁,允许多个线程同时写入该方法 try { ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(), new ConcurrentList<DateTime>()); //如果不存在就新建 ConcurrentList return dtList.CounterAdd(10, DateTime.Now); //管道容量10,当临界管道容量后 返回false } finally { obj.ExitReadLock(); } }
这里,为了在后面的线程遍历删除ConcurrentList的管道的时候保证ConcurrentList的安全性,所以此处要加读锁。
而ConcurrentList,因为.Net没有推出List集合类的线程安全,所以自己新建了一个继承于List<T>的安全类型,在这里 封装了3个需要使用的方法。
public class ConcurrentList<T> : List<T>{ private object obj = new object(); public bool CounterAdd(int num, T value) { lock (obj) { if (base.Count >= num) return false; else base.Add(value); return true; } } public new bool Remove(T value) { lock (obj) { base.Remove(value); return true; } } public new T[] ToArray() { lock (obj) { return base.ToArray(); } }}
最后就是线程的运行方法:
public void Run() { ThreadPool.QueueUserWorkItem(c => { while (true) { if (dic.Count > 0) { foreach (var item in dic.ToArray()) { ConcurrentList<DateTime> list = item.Value; foreach (DateTime dt in list.ToArray()) { if (DateTime.Now.AddSeconds(-3) > dt) { list.Remove(dt); Console.WriteLine("容器" + seat + " 已删除用户" + item.Key + "管道中的一条数据"); } } if (list.Count == 0) { obj.EnterWriteLock(); try { if (list.Count == 0) { if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i)) { Console.WriteLine("容器" + seat + " 已清除用户" + item.Key + "的List管道"); } } } finally { obj.ExitWriteLock(); }注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。