C#实现请求唯一性校验支持高并发

  • 2019 年 10 月 3 日
  • 筆記

使用场景描述:

  网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

  这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

  对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

  公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

/*           * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。           * 它是被设计用来修饰被不同线程访问和修改的变量。           * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。           */          private static readonly object lockHelper = new object();            private volatile static UniqueCheck _instance;            /// <summary>          /// 获取单一实例          /// </summary>          /// <returns></returns>          public static UniqueCheck GetInstance()          {              if (_instance == null)              {                  lock (lockHelper)                  {                      if (_instance == null)                          _instance = new UniqueCheck();                  }              }              return _instance;          }  

  这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

  自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

  1 using System;    2 using System.Collections.Generic;    3 using System.Text;    4 using System.Threading;    5    6 namespace PackgeUniqueCheck    7 {    8     /// <summary>    9     /// 非加锁并发队列,处理100个并发数以内   10     /// </summary>   11     /// <typeparam name="T"></typeparam>   12     public class ConcurrentLinkedQueue<T>   13     {   14         private class Node<K>   15         {   16             internal K Item;   17             internal Node<K> Next;   18   19             public Node(K item, Node<K> next)   20             {   21                 this.Item = item;   22                 this.Next = next;   23             }   24         }   25   26         private Node<T> _head;   27         private Node<T> _tail;   28   29         public ConcurrentLinkedQueue()   30         {   31             _head = new Node<T>(default(T), null);   32             _tail = _head;   33         }   34   35         public bool IsEmpty   36         {   37             get { return (_head.Next == null); }   38         }   39         /// <summary>   40         /// 进入队列   41         /// </summary>   42         /// <param name="item"></param>   43         public void Enqueue(T item)   44         {   45             Node<T> newNode = new Node<T>(item, null);   46             while (true)   47             {   48                 Node<T> curTail = _tail;   49                 Node<T> residue = curTail.Next;   50   51                 //判断_tail是否被其他process改变   52                 if (curTail == _tail)   53                 {   54                     //A 有其他process执行C成功,_tail应该指向新的节点   55                     if (residue == null)   56                     {   57                         //C 其他process改变了tail节点,需要重新取tail节点   58                         if (Interlocked.CompareExchange<Node<T>>(   59                           ref curTail.Next, newNode, residue) == residue)   60                         {   61                             //D 尝试修改tail   62                             Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);   63                             return;   64                         }   65                     }   66                     else   67                     {   68                         //B 帮助其他线程完成D操作   69                         Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);   70                     }   71                 }   72             }   73         }   74         /// <summary>   75         /// 队列取数据   76         /// </summary>   77         /// <param name="result"></param>   78         /// <returns></returns>   79         public bool TryDequeue(out T result)   80         {   81             Node<T> curHead;   82             Node<T> curTail;   83             Node<T> next;   84             while (true)   85             {   86                 curHead = _head;   87                 curTail = _tail;   88                 next = curHead.Next;   89                 if (curHead == _head)   90                 {   91                     if (next == null) //Queue为空   92                     {   93                         result = default(T);   94                         return false;   95                     }   96                     if (curHead == curTail) //Queue处于Enqueue第一个node的过程中   97                     {   98                         //尝试帮助其他Process完成操作   99                         Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);  100                     }  101                     else  102                     {  103                         //取next.Item必须放到CAS之前  104                         result = next.Item;  105                         //如果_head没有发生改变,则将_head指向next并退出  106                         if (Interlocked.CompareExchange<Node<T>>(ref _head,  107                           next, curHead) == curHead)  108                             break;  109                     }  110                 }  111             }  112             return true;  113         }  114         /// <summary>  115         /// 尝试获取最后一个对象  116         /// </summary>  117         /// <param name="result"></param>  118         /// <returns></returns>  119         public bool TryGetTail(out T result)  120         {  121             result = default(T);  122             if (_tail == null)  123             {  124                 return false;  125             }  126             result = _tail.Item;  127             return true;  128         }  129     }  130 }

虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

  1 using System;    2 using System.Collections.Generic;    3 using System.Text;    4 using System.Threading;    5 using System.Collections;    6    7 namespace PackgeUniqueCheck    8 {    9     public class UniqueCheck   10     {   11         /*   12          * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。   13          * 它是被设计用来修饰被不同线程访问和修改的变量。   14          * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。   15          */   16         private static readonly object lockHelper = new object();   17   18         private volatile static UniqueCheck _instance;   19   20         /// <summary>   21         /// 获取单一实例   22         /// </summary>   23         /// <returns></returns>   24         public static UniqueCheck GetInstance()   25         {   26             if (_instance == null)   27             {   28                 lock (lockHelper)   29                 {   30                     if (_instance == null)   31                         _instance = new UniqueCheck();   32                 }   33             }   34             return _instance;   35         }   36   37         private UniqueCheck()   38         {   39             //创建一个线程安全的哈希表,作为字典缓存   40             _DataKey = Hashtable.Synchronized(new Hashtable());   41             Queue myqueue = new Queue();   42             _DataQueue = Queue.Synchronized(myqueue);   43             _Myqueue = new ConcurrentLinkedQueue<string>();   44             _Timer = new Thread(DoTicket);   45             _Timer.Start();   46         }   47   48         #region 公共属性设置   49         /// <summary>   50         /// 设定定时线程的休眠时间长度:默认为1分钟   51         /// 时间范围:1-7200000,值为1毫秒到2小时   52         /// </summary>   53         /// <param name="value"></param>   54         public void SetTimeSpan(int value)   55         {   56             if (value > 0&& value <=7200000)   57             {   58                 _TimeSpan = value;   59             }   60         }   61         /// <summary>   62         /// 设定缓存Cache中的最大记录条数   63         /// 值范围:1-5000000,1到500万   64         /// </summary>   65         /// <param name="value"></param>   66         public void SetCacheMaxNum(int value)   67         {   68             if (value > 0 && value <= 5000000)   69             {   70                 _CacheMaxNum = value;   71             }   72         }   73         /// <summary>   74         /// 设置是否在控制台中显示日志   75         /// </summary>   76         /// <param name="value"></param>   77         public void SetIsShowMsg(bool value)   78         {   79             Helper.IsShowMsg = value;   80         }   81         /// <summary>   82         /// 线程请求阻塞增量   83         /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%   84         /// </summary>   85         /// <param name="value"></param>   86         public void SetBlockNumExt(int value)   87         {   88             if (value > 0 && value <= _CacheMaxNum)   89             {   90                 _BlockNumExt = value;   91             }   92         }   93         /// <summary>   94         /// 请求阻塞时间   95         /// 值范围:1-max,根据阻塞增量设置请求阻塞时间   96         /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差   97         /// </summary>   98         /// <param name="value"></param>   99         public void SetBlockSpanTime(int value)  100         {  101             if (value > 0)  102             {  103                 _BlockSpanTime = value;  104             }  105         }  106         #endregion  107  108         #region 私有变量  109         /// <summary>  110         /// 内部运行线程  111         /// </summary>  112         private Thread _runner = null;  113         /// <summary>  114         /// 可处理高并发的队列  115         /// </summary>  116         private ConcurrentLinkedQueue<string> _Myqueue = null;  117         /// <summary>  118         /// 唯一内容的时间健值对  119         /// </summary>  120         private Hashtable _DataKey = null;  121         /// <summary>  122         /// 内容时间队列  123         /// </summary>  124         private Queue _DataQueue = null;  125         /// <summary>  126         /// 定时线程的休眠时间长度:默认为1分钟  127         /// </summary>  128         private int _TimeSpan = 3000;  129         /// <summary>  130         /// 定时计时器线程  131         /// </summary>  132         private Thread _Timer = null;  133         /// <summary>  134         /// 缓存Cache中的最大记录条数  135         /// </summary>  136         private int _CacheMaxNum = 500000;  137         /// <summary>  138         /// 线程请求阻塞增量  139         /// </summary>  140         private int _BlockNumExt = 10000;  141         /// <summary>  142         /// 请求阻塞时间  143         /// </summary>  144         private int _BlockSpanTime = 100;  145         #endregion  146  147         #region 私有方法  148         private void StartRun()  149         {  150             _runner = new Thread(DoAction);  151             _runner.Start();  152             Helper.ShowMsg("内部线程启动成功!");  153         }  154  155         private string GetItem()  156         {  157             string tp = string.Empty;  158             bool result = _Myqueue.TryDequeue(out tp);  159             return tp;  160         }  161         /// <summary>  162         /// 执行循环操作  163         /// </summary>  164         private void DoAction()  165         {  166             while (true)  167             {  168                 while (!_Myqueue.IsEmpty)  169                 {  170                     string item = GetItem();  171                     _DataQueue.Enqueue(item);  172                     if (!_DataKey.ContainsKey(item))  173                     {  174                         _DataKey.Add(item, DateTime.Now);  175                     }  176                 }  177                 //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");  178                 Thread.Sleep(2);  179             }  180         }  181         /// <summary>  182         /// 执行定时器的动作  183         /// </summary>  184         private void DoTicket()  185         {  186             while (true)  187             {  188                 Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());  189                 if (_DataQueue.Count > _CacheMaxNum)  190                 {  191                     while (true)  192                     {  193                         Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));  194                         string item = _DataQueue.Dequeue().ToString();  195                         if (!string.IsNullOrEmpty(item))  196                         {  197                             if (_DataKey.ContainsKey(item))  198                             {  199                                 _DataKey.Remove(item);  200                             }  201                             if (_DataQueue.Count <= _CacheMaxNum)  202                             {  203                                 Helper.ShowMsg("清理完成,开始休眠清理线程...");  204                                 break;  205                             }  206                         }  207                     }  208                 }  209                 Thread.Sleep(_TimeSpan);  210             }  211         }  212  213         /// <summary>  214         /// 线程进行睡眠等待  215         /// 如果当前负载压力大大超出了线程的处理能力  216         /// 那么需要进行延时调用  217         /// </summary>  218         private void BlockThread()  219         {  220             if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)  221             {  222                 Thread.Sleep(_BlockSpanTime);  223             }  224         }  225         #endregion  226  227         #region 公共方法  228         /// <summary>  229         /// 开启服务线程  230         /// </summary>  231         public void Start()  232         {  233             if (_runner == null)  234             {  235                 StartRun();  236             }  237             else  238             {  239                 if (_runner.IsAlive == false)  240                 {  241                     StartRun();  242                 }  243             }  244  245         }  246         /// <summary>  247         /// 关闭服务线程  248         /// </summary>  249         public void Stop()  250         {  251             if (_runner != null)  252             {  253                 _runner.Abort();  254                 _runner = null;  255             }  256         }  257  258         /// <summary>  259         /// 添加内容信息  260         /// </summary>  261         /// <param name="item">内容信息</param>  262         /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>  263         public bool AddItem(string item)  264         {  265             BlockThread();  266             item = Helper.MakeMd5(item);  267             if (_DataKey.ContainsKey(item))  268             {  269                 return false;  270             }  271             else  272             {  273                 _Myqueue.Enqueue(item);  274                 return true;  275             }  276         }  277         /// <summary>  278         /// 判断内容信息是否已经存在  279         /// </summary>  280         /// <param name="item">内容信息</param>  281         /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>  282         public bool CheckItem(string item)  283         {  284             item = Helper.MakeMd5(item);  285             return _DataKey.ContainsKey(item);  286         }  287         #endregion  288  289     }  290 }

模拟测试代码:

private static string _example = Guid.NewGuid().ToString();            private static UniqueCheck _uck = null;            static void Main(string[] args)          {              _uck = UniqueCheck.GetInstance();              _uck.Start();              _uck.SetIsShowMsg(false);              _uck.SetCacheMaxNum(20000000);              _uck.SetBlockNumExt(1000000);              _uck.SetTimeSpan(6000);                _uck.AddItem(_example);              Thread[] threads = new Thread[20];                for (int i = 0; i < 20; i++)              {                  threads[i] = new Thread(AddInfo);                  threads[i].Start();              }                Thread checkthread = new Thread(CheckInfo);              checkthread.Start();                string value = Console.ReadLine();                checkthread.Abort();              for (int i = 0; i < 50; i++)              {                  threads[i].Abort();              }              _uck.Stop();          }            static void AddInfo()          {              while (true)              {                  _uck.AddItem(Guid.NewGuid().ToString());              }          }            static void CheckInfo()          {              while (true)              {                  Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));                  Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));                  Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
          //调整进程休眠时间,可以测试高并发的情况 //Thread.Sleep(
1000); } }

测试截图: