C# RabbitMQ的使用

本文目的如題。

安裝

先說一下RabbitMQ的安裝,建議使用Docker鏡像安裝,Docker安裝的好處是不管Windows系統還是Linux,安裝步驟少,安裝方法相同,不容易出錯。使用下面的命令就可以:

docker run -d --hostname myRabbit --name rabbitmq3.9.11 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_DEFAULT_VHOST=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management

安裝完成後,可以打開瀏覽器訪問管理網站//127.0.0.1:15672,使用安裝時設置的用戶名和密碼登錄,就可以進行管理了。

不管使用什麼方法安裝,都可以運行本文中的示例。這些示例中使用了用戶admin,密碼是admin,如果沒有,可以在管理網站中創建:

本文的示例中還使用了my_vhost虛擬主機,如果沒有,也需要定義一下:

注意,admin 需要有對my_vhost的操作許可權。

編寫消息接收端

安裝完成後可以進行開發了。我們需要編寫消息的生產者和消費者,如果哪一部分出了問題,或者RabbitMQ伺服器出了問題,都會影響工作的進展。因此我們分步進行,先編寫消息接受部分,也就是所謂的消費者,與RabbitMQ伺服器聯調,成功後再進行下一步。

先創建一個.Net 6的控制台項目,可以使用Visual Studio創建。如果使用命令行,命令如下:

mkdir DirectReceiveDemo
cd DirectReceiveDemo
dotnet new console 

然後安裝rabbitmq.client程式包:

dotnet add package rabbitmq.client

編寫Program.cs程式碼如下:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "mymessage",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

執行dotnet run 運行程式碼,程式會一直等待輸入,這時需要輸入一些消息驗證程式。現在登錄管理網站//127.0.0.1:15672/,使用安裝時設置的用戶名和密碼,在Connections分頁中可以看到多了新的連接:


在Channel分頁中可以看到當前的Chanel:

進入Queues分頁,點擊列表中的mymessage

進入mymessage隊列:

在Publish message中寫一些消息並發送。回到控制台接收程式,消息應該已經被接收了。

到這裡,接收部分完成,退出這個程式,我們開始編寫發送部分。

編寫發送端

創建過程跟接收部分完全一樣,只是項目名稱為DirectSendDemo,Program.cs程式碼如下:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

    Console.WriteLine("輸入需要傳輸的消息,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "",
                             routingKey: "mymessage",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車退出");
Console.ReadLine();

運行這個項目,輸入一些消息,

還是回到管理頁面,在mymessage隊列頁面,執行GetMessage,可以獲取發送的消息。

測試發送端和接收端


現在我們可以讓發送和接收一起工作了,在兩個終端分別啟動發送和接收程式,看是否可以一起工作。

發送和接收可以一起工作了。

現在可以用這兩個程式做一些測試,首先看一下一個發送端,兩個接收端是什麼情況:

我們發現,接收端會輪流接收消息。
兩個發送端對一個接收端的情況如下:

跟想像的一樣,接收端會處理所有消息。

Fanout 模式

現在我們需要處理一個消息有多個消費者的情況,這種情況下,消息需要發送給交換機(exchange),然後將交換機與消息隊列綁定,一個交換機可以綁定多個消息隊列,這樣,不同的消息消費者都可以接收到消息。 我們創建一個新的發送方FanoutSender,將消息發送給exchange:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("example.exchange", ExchangeType.Fanout, true, false, null);

    Console.WriteLine("輸入需要傳輸的消息,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "example.exchange",
                             routingKey: "",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車退出");
Console.ReadLine();

然後創建兩個接收方,FanoutReceiver1和FanoutReceiver2,分別接收que1和que2隊列的消息,這兩個隊列都綁定到相同的交換機,程式碼如下:
FanoutReceiver1:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que1",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que1", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que1",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

FanoutReceiver2:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que2",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que2", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que2",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

同時啟動這三個程式,運行結果如下:

發送的消息被同時接收。

使用這種方式,我們可以靈活擴展消息的消費者,比如用戶提醒功能,目前已經有了郵件提醒和簡訊提醒,對應的兩個隊列綁定到相同交換機,如果再增加微信提醒,只要再增加一個綁定隊列和相應的處理程式就可以了。

Direct模式和RouteKey


在Fanout模式下,我們將消息發送到訂閱消息的所有隊列中,如果我們希望選擇性地向隊列發送消息,可以使用Direct模式,根據不同的RouteKey向不同的隊列發送消息。

我們建立三個控制台程式程式模擬一個發送方和兩個接收方,項目的創建方法同上,程式碼如下:
發送:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("directdemo.exchange", ExchangeType.Direct, true, false, null);

    Console.WriteLine("輸入需要傳輸的消息,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("輸入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "directdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發送消息 {0} Routekey {1}", message,routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車退出");
Console.ReadLine();

接收1:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "log_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "log_que", exchange: "directdemo.exchange",
routingKey: "log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "log_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

接收2:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "email_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "email_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

上面的程式碼中,關鍵是隊列綁定:

   channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");

這句話將queue、exchange和routingKey綁定在一起。運行效果如下:

Topic 模式

前面的Direct模式中,RouteKey是固定的,Topic模式引入了通配符,RouteKey可以是符合表達式的任何字元串。

  • 通配符「*」,代表一個字元
  • 通配符「#」,代表0或多個字元

仔細研究上面的規則,會發現Topic模式可以代替Direct和Fanout,如果RouteKey被設置為「#」,就是隊列可以接收任何消息,這與Fanout模式相同,如果RouteKey中沒有通配符,則和使用Direct模式的效果相同。

現在我們編寫Topic模式的發送和接收,程式碼如下:
Topic模式發送:

using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("topicdemo.exchange", ExchangeType.Topic, true, false, null);

    Console.WriteLine("輸入需要傳輸的消息,輸入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("輸入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "topicdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 發送消息 {0} Routekey {1}", message, routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回車退出");
Console.ReadLine();

Topic模式接收:

using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "topicdemo.exchange",
type: ExchangeType.Topic, durable: true);

    channel.QueueDeclare(queue: "topic_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "topic_que", exchange: "topicdemo.exchange",
routingKey: "#.log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "topic_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回車退出");
    Console.ReadLine();
}

我們設置的RouteKey是”#.log”,也就是匹配這個表達式的RouteKey的消息會被接收到:

到這裡RabbitMQ常用的幾種模式都介紹了,最後說一點程式碼中的細節,在發送方和接收方程式碼中,有重複的queue或者exchange聲明,比如:

    channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);

這些程式碼讓人感到有些困惑,似乎每次都需要聲明,而實際上是只要存在相關的queue或者exchange,這些程式碼就不再起作用。之所以在發送方和接收方都包含這些程式碼,是因為不知道是否存在相關的queue或exchange,也不知道誰先啟動,避免出錯。如果在RabbitMQ的Web管理頁面預先手工創建了相應的queue或者exchange,這些程式碼是可以去掉的。

本文程式碼可以從github下載://github.com/zhenl/ZL.RabbitMQ.Demo