Redis 消息中间件 ServiceStack.Redis 轻量级

问题:

  • 公司开了个新项目,算上我一共3个人。车间里机台通过流水线连通联动的玩意。一个管理控制系统连接各个机台和硬件。专机类型就有5种,个数差不多20个左右。

  • 软件规划的时候采用总分的结构,管理控制系统和专机子系统之间通过消息中间件通讯。本来也想TCP连接来着,但是开发时间不允许。而且每个系统都得写一遍这个玩意。

  • 消息中间件有很多个,比如 Kafka、RabbitMQ、RocketMQ等国内外的消息中间件。这些中间件无论宣称的多么轻量级都要啃一下,更要命的是就他娘三个人。而且后面还要这个鸡儿系统可复制。

  • 考虑到消息及时性、开发难易程度、维护简便性等因素后决定用Redis的pub/sub功能来实现.软件结构大概如类似结构。

 

可用性:

  • 作为消息通知属于安装了Redis就有的功能,因为Redis是用在系统中存储一些热数据,不用单独维护,在Windows中属于服务直接就开了。

  • 作为可以分布式集群使用的数据库,消息传递应该比较OK了。虽然使用的client-server,但是server-server已经很好了。料想client-server也不会差

  • 试验消息内容发送订阅的情况下,速度在30毫秒内,貌似可以。看其他博主说大于10K入队比较慢,但是可以不用入消息队列啊,用发布订阅。

  • .net 下一般使用ServiceStack.Redis,要命的是4.0以后收费,可以破解的但是不支持List<T>型的数据直接存取,想用只能变成JSON字符串存着。

  • 如果只是用订阅发布功能,不存储热数据或者不使用List<T>的数据可以使用4.0以上的版本。文末会贴上两个类型的下载包。想用其他的包也可以,我这里只说一种思路。

实现:

模块结构图展示如下

public static class MSServer
    {
        // 定义一个object对象
        private static object objinstance = new object();

        private static ServerState CurState = ServerState.Free;

        static PooledRedisClientManager prcm;

        private static string clientmake = string.Empty;

        /// <summary>
        /// 连接的地址
        /// </summary>
        /// <param name="IP">地址127.0.0.1:6379</param>
        /// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param>
        /// <returns></returns>
        public static int OpenServer(string IP ,string[] rechannels)
        {
            try
            {
                if (prcm == null)
                {
                    lock (objinstance)
                    {
                        if (prcm == null)
                        {
                            prcm = CreateManager(IP, IP);
                            CurState = ServerState.Init;
                            return CreateLink(rechannels);
                        }
                    }
                }
            }
            catch
            {
                prcm = null;
                CurState = ServerState.Free;
                return -1;
            }
            return 1;
        }

        private static int CreateLink(string[] SourceID)
        {
            if (CurState == ServerState.Init && SourceID.Length > 0)
            {
                try
                {
                    using (IRedisClient Redis = prcm.GetReadOnlyClient())
                    {
                        clientmake = SourceID[0];
                        var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList();
                        info.ForEach(i =>
                        {
                            Redis.KillClient(i["addr"]);
                        });
                        Redis.SetClient(clientmake);
                        IRedisSubscription sc = Redis.CreateSubscription();
                        Task.Run(() =>
                        {
                            try
                            {
                                sc.SubscribeToChannels(SourceID);
                            }
                            catch { }
                        });
                        sc.OnMessage += new Action<string, string>(showpub);
                    }
                    CurState = ServerState.Work;
                }
                catch
                {
                    string message = string.Empty;
                    prcm = null;
                    CurState = ServerState.Free;
                    return -1;
                }
                return 1;
            }
            else
            {
                return 0;
            }
        }


        public static Action<string, string> ReceiveMessage;
        static void showpub(string channel, string message)
        {
            if (ReceiveMessage != null)
            {
                ReceiveMessage(channel, message);
            }
        }

        private static PooledRedisClientManager CreateManager(string writeHost, string readHost)
        {
            var redisClientConfig = new RedisClientManagerConfig
            {
                MaxWritePoolSize = 1,//“写”链接池链接数
                MaxReadPoolSize = 1,//“读”链接池链接数
                DefaultDb = 0,
                AutoStart = true,
            };
            //读的客户端只能接受特定的命令,不能用于发送信息
            var RedisClientManager = new PooledRedisClientManager(
                new string[] { writeHost }//用于写
                , new string[] { readHost }//用于读
                , redisClientConfig);
            CurState = ServerState.Init;

            return RedisClientManager;
        }
        /// <summary>
        /// 发送信息
        /// </summary>
        /// <param name="channel">通讯对象 "channel:1-13"</param>
        /// <param name="meesage">发送信息 "test send "</param>
        /// <returns>0 发送失败 1 发送成功 -1 连接损毁 检查网络后重建</returns>
        public static long PubMessage(string channel, string meesage)
        {
            if (CurState == ServerState.Work)
            {
                if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage))
                {
                    try
                    {
                        using (IRedisClient Redis = prcm.GetClient())
                        {
                            Redis.SetClient(clientmake);
                            return Redis.PublishMessage(channel, meesage);
                        }
                    }
                    catch
                    {
                        prcm = null;
                        CurState = ServerState.Free;
                        return -1;
                    }
                }
                else
                {
                    return 0;
                }
            }
            else
            {
                return -1;
            }
        }
    }

public enum ServerState
    {
        Free,
        Init,
        Work,
        Del
    }
    

 

 

有一个问题,就是连接远程的服务器时如果网络断开再重连,会残留没用的client ,这样如果网络断断续续的话,会留好多没有清除的客户端。

这个在3.0.504版本中Redis 中也有这个问题,不知道是基于什么考虑的。所以需要建立连接的时候,给个客户端名称,再初始化的时候删掉所有同类型的名称。

 

使用的时候大概类似操作 textbox2.text = “channel:1-5″ .为了简便发布的和监听的都是本地的一个通道。

private void button1_Click(object sender, EventArgs e)
        {

            //11.1.7.152   192.168.12.173
            int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text });
            label1.Text = result.ToString();
            //1匿名事件
            ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck);

            if (result == 0)
            {
                //发送失败重新发送 检查 通道和字符串后重新发送
            }
            else if (result == 1)
            {
                //发送成功
            }
            else if (result == -1)
            {
                //连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
            }

        }

        void fuck(string channel, string message)
        {
            this.BeginInvoke(new Action(() =>
            {
                textBox4.Text = channel + message;
            }));
        }
        public bool sdfsd = true;

        private void button3_Click(object sender, EventArgs e)
        {long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff"));

            if (result == 0)
                {
                    //发送失败重新发送
                }
            else if (result == 1)
                {
                            //发送成功
                }
            else if (result == -1)
                 {
                  //连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                 }
        }

 

为了简便channel:是通道的固定命令 ,可以自定义channel:后面的内容,发送就有反馈。确保所有机台都接收到。

如果有断线的需要程序自己重连,接收通道的客户端不可以再给其他的使用,Redis上说Redis client 进入订阅模式时只能接受订阅发布等命令指令,不接受普通的存取和其他命令

所以如果需要在读取、写入、发布、执行其他的指令需要使用其他客户端,否则就出错了。跑了几天了上亿次的测试貌似没有出现什么问题。

 

 

发布订阅消息不会走AOF RDB只存在于内存中,即发即用,用完就没了。没在线就没了。需要考虑使用环境。

还用ping pong来确定连接状态,也可以自定义数据,使用场景要自己开发,要适合自己的才是好的。

下载:

4.0 dll   

链接://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取码:js8p

 

5.8 dll不可以使用List<T>类型

链接://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
提取码:bxh2