Redis 消息中間件 ServiceStack.Redis 輕量級

問題:

  • 公司開了個新項目,算上我一共3個人。車間里機台通過流水線連通聯動的玩意。一個管理控制系統連接各個機台和硬體。專機類型就有5種,個數差不多20個左右。

  • 軟體規劃的時候採用總分的結構,管理控制系統和專機子系統之間通過消息中間件通訊。本來也想TCP連接來著,但是開發時間不允許。而且每個系統都得寫一遍這個玩意。

  • 消息中間件有很多個,比如 Kafka、RabbitMQ、RocketMQ等中國外的消息中間件。這些中間件無論宣稱的多麼輕量級都要啃一下,更要命的是就他娘三個人。而且後面還要這個雞兒系統可複製。

  • 考慮到消息及時性、開發難易程度、維護簡便性等因素後決定用Redis的pub/sub功能來實現.軟體結構大概如類似結構。

 

可用性:

  • 作為消息通知屬於安裝了Redis就有的功能,因為Redis是用在系統中存儲一些熱數據,不用單獨維護,在Windows中屬於服務直接就開了。

  • 作為可以分散式集群使用的資料庫,消息傳遞應該比較OK了。雖然使用的client-server,但是server-server已經很好了。料想client-server也不會差

  • 試驗消息內容發送訂閱的情況下,速度在30毫秒內,貌似可以。看其他部落客說大於10K入隊比較慢,但是可以不用入消息隊列啊,用發布訂閱。

  • .net 下一般使用ServiceStack.Redis,要命的是4.0以後收費,可以破解的但是不支援List<T>型的數據直接存取,想用只能變成JSON字元串存著。

  • 如果只是用訂閱發布功能,不存儲熱數據或者不使用List<T>的數據可以使用4.0以上的版本。文末會貼上兩個類型的下載包。想用其他的包也可以,我這裡只說一種思路。

實現:

模組結構圖展示如下

public static class MSServer
    {
        // 定義一個object對象
        private static object objinstance = new object();

        private static ServerState CurState = ServerState.Free;

        static PooledRedisClientManager prcm;

        private static string clientmake = string.Empty;

        /// <summary>
        /// 連接的地址
        /// </summary>
        /// <param name="IP">地址127.0.0.1:6379</param>
        /// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param>
        /// <returns></returns>
        public static int OpenServer(string IP ,string[] rechannels)
        {
            try
            {
                if (prcm == null)
                {
                    lock (objinstance)
                    {
                        if (prcm == null)
                        {
                            prcm = CreateManager(IP, IP);
                            CurState = ServerState.Init;
                            return CreateLink(rechannels);
                        }
                    }
                }
            }
            catch
            {
                prcm = null;
                CurState = ServerState.Free;
                return -1;
            }
            return 1;
        }

        private static int CreateLink(string[] SourceID)
        {
            if (CurState == ServerState.Init && SourceID.Length > 0)
            {
                try
                {
                    using (IRedisClient Redis = prcm.GetReadOnlyClient())
                    {
                        clientmake = SourceID[0];
                        var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList();
                        info.ForEach(i =>
                        {
                            Redis.KillClient(i["addr"]);
                        });
                        Redis.SetClient(clientmake);
                        IRedisSubscription sc = Redis.CreateSubscription();
                        Task.Run(() =>
                        {
                            try
                            {
                                sc.SubscribeToChannels(SourceID);
                            }
                            catch { }
                        });
                        sc.OnMessage += new Action<string, string>(showpub);
                    }
                    CurState = ServerState.Work;
                }
                catch
                {
                    string message = string.Empty;
                    prcm = null;
                    CurState = ServerState.Free;
                    return -1;
                }
                return 1;
            }
            else
            {
                return 0;
            }
        }


        public static Action<string, string> ReceiveMessage;
        static void showpub(string channel, string message)
        {
            if (ReceiveMessage != null)
            {
                ReceiveMessage(channel, message);
            }
        }

        private static PooledRedisClientManager CreateManager(string writeHost, string readHost)
        {
            var redisClientConfig = new RedisClientManagerConfig
            {
                MaxWritePoolSize = 1,//「寫」鏈接池鏈接數
                MaxReadPoolSize = 1,//「讀」鏈接池鏈接數
                DefaultDb = 0,
                AutoStart = true,
            };
            //讀的客戶端只能接受特定的命令,不能用於發送資訊
            var RedisClientManager = new PooledRedisClientManager(
                new string[] { writeHost }//用於寫
                , new string[] { readHost }//用於讀
                , redisClientConfig);
            CurState = ServerState.Init;

            return RedisClientManager;
        }
        /// <summary>
        /// 發送資訊
        /// </summary>
        /// <param name="channel">通訊對象 "channel:1-13"</param>
        /// <param name="meesage">發送資訊 "test send "</param>
        /// <returns>0 發送失敗 1 發送成功 -1 連接損毀 檢查網路後重建</returns>
        public static long PubMessage(string channel, string meesage)
        {
            if (CurState == ServerState.Work)
            {
                if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage))
                {
                    try
                    {
                        using (IRedisClient Redis = prcm.GetClient())
                        {
                            Redis.SetClient(clientmake);
                            return Redis.PublishMessage(channel, meesage);
                        }
                    }
                    catch
                    {
                        prcm = null;
                        CurState = ServerState.Free;
                        return -1;
                    }
                }
                else
                {
                    return 0;
                }
            }
            else
            {
                return -1;
            }
        }
    }

public enum ServerState
    {
        Free,
        Init,
        Work,
        Del
    }
    

 

 

有一個問題,就是連接遠程的伺服器時如果網路斷開再重連,會殘留沒用的client ,這樣如果網路斷斷續續的話,會留好多沒有清除的客戶端。

這個在3.0.504版本中Redis 中也有這個問題,不知道是基於什麼考慮的。所以需要建立連接的時候,給個客戶端名稱,再初始化的時候刪掉所有同類型的名稱。

 

使用的時候大概類似操作 textbox2.text = “channel:1-5″ .為了簡便發布的和監聽的都是本地的一個通道。

private void button1_Click(object sender, EventArgs e)
        {

            //11.1.7.152   192.168.12.173
            int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text });
            label1.Text = result.ToString();
            //1匿名事件
            ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck);

            if (result == 0)
            {
                //發送失敗重新發送 檢查 通道和字元串後重新發送
            }
            else if (result == 1)
            {
                //發送成功
            }
            else if (result == -1)
            {
                //連接錯誤 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
            }

        }

        void fuck(string channel, string message)
        {
            this.BeginInvoke(new Action(() =>
            {
                textBox4.Text = channel + message;
            }));
        }
        public bool sdfsd = true;

        private void button3_Click(object sender, EventArgs e)
        {long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff"));

            if (result == 0)
                {
                    //發送失敗重新發送
                }
            else if (result == 1)
                {
                            //發送成功
                }
            else if (result == -1)
                 {
                  //連接錯誤 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                 }
        }

 

為了簡便channel:是通道的固定命令 ,可以自定義channel:後面的內容,發送就有回饋。確保所有機台都接收到。

如果有斷線的需要程式自己重連,接收通道的客戶端不可以再給其他的使用,Redis上說Redis client 進入訂閱模式時只能接受訂閱發布等命令指令,不接受普通的存取和其他命令

所以如果需要在讀取、寫入、發布、執行其他的指令需要使用其他客戶端,否則就出錯了。跑了幾天了上億次的測試貌似沒有出現什麼問題。

 

 

發布訂閱消息不會走AOF RDB只存在於記憶體中,即發即用,用完就沒了。沒在線就沒了。需要考慮使用環境。

還用ping pong來確定連接狀態,也可以自定義數據,使用場景要自己開發,要適合自己的才是好的。

下載:

4.0 dll   

鏈接://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取碼:js8p

 

5.8 dll不可以使用List<T>類型

鏈接://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
提取碼:bxh2

Tags: