Redis 消息中間件 ServiceStack.Redis 輕量級
問題:
-
公司開了個新項目,算上我一共3個人。車間里機台通過流水線連通聯動的玩意。一個管理控制系統連接各個機台和硬體。專機類型就有5種,個數差不多20個左右。
-
軟體規劃的時候採用總分的結構,管理控制系統和專機子系統之間通過消息中間件通訊。本來也想TCP連接來著,但是開發時間不允許。而且每個系統都得寫一遍這個玩意。
-
消息中間件有很多個,比如 Kafka、RabbitMQ、RocketMQ等中國外的消息中間件。這些中間件無論宣稱的多麼輕量級都要啃一下,更要命的是就他娘三個人。而且後面還要這個雞兒系統可複製。
-
考慮到消息及時性、開發難易程度、維護簡便性等因素後決定用Redis的pub/sub功能來實現.軟體結構大概如類似結構。
可用性:
-
作為消息通知屬於安裝了Redis就有的功能,因為Redis是用在系統中存儲一些熱數據,不用單獨維護,在Windows中屬於服務直接就開了。
-
作為可以分散式集群使用的資料庫,消息傳遞應該比較OK了。雖然使用的client-server,但是server-server已經很好了。料想client-server也不會差
-
試驗消息內容發送訂閱的情況下,速度在30毫秒內,貌似可以。看其他部落客說大於10K入隊比較慢,但是可以不用入消息隊列啊,用發布訂閱。
-
.net 下一般使用ServiceStack.Redis,要命的是4.0以後收費,可以破解的但是不支援List<T>型的數據直接存取,想用只能變成JSON字元串存著。
- 如果只是用訂閱發布功能,不存儲熱數據或者不使用List<T>的數據可以使用4.0以上的版本。文末會貼上兩個類型的下載包。想用其他的包也可以,我這裡只說一種思路。
實現:
模組結構圖展示如下
public static class MSServer { // 定義一個object對象 private static object objinstance = new object(); private static ServerState CurState = ServerState.Free; static PooledRedisClientManager prcm; private static string clientmake = string.Empty; /// <summary> /// 連接的地址 /// </summary> /// <param name="IP">地址127.0.0.1:6379</param> /// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param> /// <returns></returns> public static int OpenServer(string IP ,string[] rechannels) { try { if (prcm == null) { lock (objinstance) { if (prcm == null) { prcm = CreateManager(IP, IP); CurState = ServerState.Init; return CreateLink(rechannels); } } } } catch { prcm = null; CurState = ServerState.Free; return -1; } return 1; } private static int CreateLink(string[] SourceID) { if (CurState == ServerState.Init && SourceID.Length > 0) { try { using (IRedisClient Redis = prcm.GetReadOnlyClient()) { clientmake = SourceID[0]; var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList(); info.ForEach(i => { Redis.KillClient(i["addr"]); }); Redis.SetClient(clientmake); IRedisSubscription sc = Redis.CreateSubscription(); Task.Run(() => { try { sc.SubscribeToChannels(SourceID); } catch { } }); sc.OnMessage += new Action<string, string>(showpub); } CurState = ServerState.Work; } catch { string message = string.Empty; prcm = null; CurState = ServerState.Free; return -1; } return 1; } else { return 0; } } public static Action<string, string> ReceiveMessage; static void showpub(string channel, string message) { if (ReceiveMessage != null) { ReceiveMessage(channel, message); } } private static PooledRedisClientManager CreateManager(string writeHost, string readHost) { var redisClientConfig = new RedisClientManagerConfig { MaxWritePoolSize = 1,//「寫」鏈接池鏈接數 MaxReadPoolSize = 1,//「讀」鏈接池鏈接數 DefaultDb = 0, AutoStart = true, }; //讀的客戶端只能接受特定的命令,不能用於發送資訊 var RedisClientManager = new PooledRedisClientManager( new string[] { writeHost }//用於寫 , new string[] { readHost }//用於讀 , redisClientConfig); CurState = ServerState.Init; return RedisClientManager; } /// <summary> /// 發送資訊 /// </summary> /// <param name="channel">通訊對象 "channel:1-13"</param> /// <param name="meesage">發送資訊 "test send "</param> /// <returns>0 發送失敗 1 發送成功 -1 連接損毀 檢查網路後重建</returns> public static long PubMessage(string channel, string meesage) { if (CurState == ServerState.Work) { if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage)) { try { using (IRedisClient Redis = prcm.GetClient()) { Redis.SetClient(clientmake); return Redis.PublishMessage(channel, meesage); } } catch { prcm = null; CurState = ServerState.Free; return -1; } } else { return 0; } } else { return -1; } } } public enum ServerState { Free, Init, Work, Del }
有一個問題,就是連接遠程的伺服器時如果網路斷開再重連,會殘留沒用的client ,這樣如果網路斷斷續續的話,會留好多沒有清除的客戶端。
這個在3.0.504版本中Redis 中也有這個問題,不知道是基於什麼考慮的。所以需要建立連接的時候,給個客戶端名稱,再初始化的時候刪掉所有同類型的名稱。
使用的時候大概類似操作 textbox2.text = “channel:1-5″ .為了簡便發布的和監聽的都是本地的一個通道。
private void button1_Click(object sender, EventArgs e) { //11.1.7.152 192.168.12.173 int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text }); label1.Text = result.ToString(); //1匿名事件 ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck); if (result == 0) { //發送失敗重新發送 檢查 通道和字元串後重新發送 } else if (result == 1) { //發送成功 } else if (result == -1) { //連接錯誤 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text }); } } void fuck(string channel, string message) { this.BeginInvoke(new Action(() => { textBox4.Text = channel + message; })); } public bool sdfsd = true; private void button3_Click(object sender, EventArgs e) {long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff")); if (result == 0) { //發送失敗重新發送 } else if (result == 1) { //發送成功 } else if (result == -1) { //連接錯誤 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text }); } }
為了簡便channel:是通道的固定命令 ,可以自定義channel:後面的內容,發送就有回饋。確保所有機台都接收到。
如果有斷線的需要程式自己重連,接收通道的客戶端不可以再給其他的使用,Redis上說Redis client 進入訂閱模式時只能接受訂閱發布等命令指令,不接受普通的存取和其他命令
所以如果需要在讀取、寫入、發布、執行其他的指令需要使用其他客戶端,否則就出錯了。跑了幾天了上億次的測試貌似沒有出現什麼問題。
發布訂閱消息不會走AOF RDB只存在於記憶體中,即發即用,用完就沒了。沒在線就沒了。需要考慮使用環境。
還用ping pong來確定連接狀態,也可以自定義數據,使用場景要自己開發,要適合自己的才是好的。
下載:
4.0 dll
鏈接://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取碼:js8p
5.8 dll不可以使用List<T>類型
鏈接://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
提取碼:bxh2