.net core 和 WPF 開發升訊威在線客服與營銷系統:使用執行緒安全的 BlockingCollection 實現高性能的數據處理
- 2021 年 2 月 2 日
本系列文章詳細介紹使用 .net core 和 WPF 開發 升訊威在線客服與營銷系統 的過程。本產品已經成熟穩定並投入商用。
在線演示環境://kf.shengxunwei.com 注意:演示環境僅供演示交流與評估,不保證 7×24 小時可用。
對於在線客服與營銷系統,客服端指的是後台提供服務的客服或營銷人員,他們使用客服程式在後台觀察網站的被訪情況,開展營銷活動或提供客戶服務。在本篇文章中,我將詳細介紹如何在 .net core 環境下使用 TCP 通訊技術實現穩定高效與安全的客服端程式。
- 需要使客服端程式具備 24 小時不間斷運行的能力,在處理網路通訊時,必須100%的穩定。
- 必須具備應對網路波動的能力,不能網路稍有波動就斷線。即使出現了短暫的網路中斷,客服程式也不能做掉線處理,而是要具備保持和自動重連的能力。
- 要考慮安全性問題,服務端的埠監聽,要能識別正常客服端連接,還是來自攻擊者的連接。
System.Collections.Concurrent 命名空間,其中包含多個執行緒安全且可縮放的集合類。 多個執行緒可以安全高效地從這些集合添加或刪除項,而無需在用戶程式碼中進行其他同步。 編寫新程式碼時,只要將多個執行緒同時寫入到集合時,就使用並發集合類。
某些並發集合類型使用輕量同步機制,如 SpinLock、SpinWait、SemaphoreSlim 和 CountdownEvent。 這些同步類型通常在將執行緒真正置於等待狀態之前,會在短時間內使用 忙旋轉。 預計等待時間非常短時,旋轉比等待所消耗的計算資源少得多,因為後者涉及資源消耗量大的內核轉換。 對於使用旋轉的集合類,這種效率意味著多個執行緒能夠以非常快的速率添加和刪除項。
BlockingCollection 是一個執行緒安全集合類,可提供以下內容:
- 生成者/使用者模式的實現; BlockingCollection 是介面的包裝 IProducerConsumerCollection 。
- 利用和方法並發添加和移除多個執行緒中的項 Add Take 。
- Add Take 當集合已滿或為空時阻止和操作的綁定集合。
- Add Take 使用 CancellationToken 或方法中的對象取消或操作 TryAdd TryTake 。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class BlockingCollectionDemo
static async Task Main()
await AddTakeDemo.BC_AddTakeCompleteAdding();
await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
Console.WriteLine("Press any key to exit.");
class AddTakeDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
public static async Task BC_AddTakeCompleteAdding()
using (BlockingCollection<int> bc = new BlockingCollection<int>())
// Spin up a Task to populate the BlockingCollection
Task t1 = Task.Run(() =>
// Spin up a Task to consume the BlockingCollection
Task t2 = Task.Run(() =>
// Consume consume the BlockingCollection
while (true) Console.WriteLine(bc.Take());
catch (InvalidOperationException)
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
await Task.WhenAll(t1, t2);
class TryTakeDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
public static void BC_TryTake()
// Construct and fill our BlockingCollection
using (BlockingCollection<int> bc = new BlockingCollection<int>())
int NUMITEMS = 10000;
for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
int outerSum = 0;
// Delegate for consuming the BlockingCollection and adding up all items
Action action = () =>
int localItem;
int localSum = 0;
while (bc.TryTake(out localItem)) localSum += localItem;
Interlocked.Add(ref outerSum, localSum);
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action);
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
class FromToAnyDemo
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
public static void BC_FromToAny()
BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
int numFailures = 0;
for (int i = 0; i < 10; i++)
if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);
// Should be able to retrieve 10 items
int numItems = 0;
int item;
while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
class ConsumingEnumerableDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
public static async Task BC_GetConsumingEnumerable()
using (BlockingCollection<int> bc = new BlockingCollection<int>())
// Kick off a producer task
await Task.Run(async () =>
for (int i = 0; i < 10; i++)
await Task.Delay(100); // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
本文對使用執行緒安全的 BlockingCollection 實現高性能的數據處理進行了簡要的介紹,在接下來的文章中,我將具體解構服務端程式的結構和設計、客服端程式的結構和設計,敬請關注。
