Task+ConcurrentQueue多線程編程
隊列(Queue)代表了一個先進先出的對象集合。當您需要對各項進行先進先出的訪問時,則使用隊列。當您在列表中添加一項,稱為入隊,當您從列表中移除一項時,稱為出隊。
ConcurrentQueue<T>隊列是一個高效的線程安全的隊列,是.Net Framework 4.0,System.Collections.Concurrent命名空間下的一個數據結構。
Task是在ThreadPool的基礎上推出的,我們知道了ThreadPool的弊端:我們不能控制線程池中線程的執行順序,也不能獲取線程池內線程取消/異常/完成的通知。net4.0在ThreadPool的基礎上推出了Task,Task擁有線程池的優點,同時也解決了使用線程池不易控制的弊端。
下面我們來看下Task+ConcurrentQueue實現多線程編程
1、定義最大線程數, 一般和本機的cpu 有關
/// <summary> /// 線程總數 /// </summary> private int threadNum = Convert.ToInt32(ConfigurationManager.AppSettings["ThreadNum"]);
ConcurrentQueue隊列
/// <summary> /// 隊列 /// </summary> private ConcurrentQueue<AssetRepayment> queues = new ConcurrentQueue<AssetRepayment>();
2、接下來 我們把業務數據取出來,加到定義的 queues.Enqueue(l);
var dt = DateTime.Now.Date; var list = AssetRepayService.GetRepayments().Where(o => o.AssetRepayStatus == AssetRepayStatus.NoSend && o.PlanRepaymentDate == dt && o.AssetRepayMode == AssetRepayMode.Withholding); int count = list.Count(); if (count == 0) { LogHelper.WriteFatal("代扣充值-沒有可執行的數據"); return; } totalCount = count; int allpage = count / 200 + (count % 200 == 0 ? 0 : 1); int page = 0; LogHelper.WriteFatal("代扣充值-可執行的數據:" + count + "條,頁數:"+ allpage); do { LogHelper.WriteFatal("代扣充值-第:" + page + "頁"); var ll = list.OrderBy(o=>o.Id).Skip(page++ * 200).Take(200).ToList(); foreach (var l in ll) { queues.Enqueue(l); } } while (page < allpage);
3、等數據全部加載到queues,我們接着下一步
List<Task> tasks = new List<Task>(); for (int i = 0; i < threadNum; i++) { var task = Task.Run(() => { Process(); }); tasks.Add(task); } var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) => { }); taskList.Wait();
利用Task 處理數據
這裡需要注意的是
var taskList = Task.Factory.ContinueWhenAll(tasks.ToArray(), (ts) => { }); taskList.Wait();
這個代表開始執行線程並且需要全部完成 才會退出 ContinueWhenAll
4 接下去我們看下處理程序怎麼消費數據
從隊列取數據
var currentIndex = Interlocked.Increment(ref index); AssetRepayment repayId = null; var isExit = queues.TryDequeue(out repayId); if (!isExit) { break; }
有數據的話 往下走 執行我們要的業務邏輯
var service = context.GetService<IDeTransactionService>(); service.SubDeTransaction(repayId); LogHelper.WriteFatal(string.Format("代扣充值 共{0}條 當前第{1}條", totalCount, currentIndex));
完整處理方法如下
private void Process() { using (var context = new MefContext()) { while (true) { var currentIndex = Interlocked.Increment(ref index); AssetRepayment repayId = null; var isExit = queues.TryDequeue(out repayId); if (!isExit) { break; } try { var service = context.GetService<IDeTransactionService>(); service.SubDeTransaction(repayId); LogHelper.WriteFatal(string.Format("代扣充值 共{0}條 當前第{1}條", totalCount, currentIndex)); } catch (Exception ex) { LogHelper.WriteError("代扣充值-", ex); } } } }
到此為止,我們實現了 Task+ConcurrentQueue多線程編程。
完整代碼塊
鏈接://pan.baidu.com/s/1jgpafTFssiVLmZhDe1CgYQ
提取碼:erib