Pulsar部署和實踐(一)

前言

本地Docker部署Pulsar消息代理實現消息發佈和消息訂閱

 

介紹

Apache Pulsar 介紹

 

相關概念,後面有時間再花時間整理下。

 

實踐步驟

1.使用dokcer本地部署pulsar

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.7.1 \
  bin/pulsar standalone

  

2.docker ps -a 查看pulsar運行是否正常,可以看到下圖已經部署成功

pulsar連接地址://localhost:8080

         pulsar://localhost:6650

3.使用C#客戶端Publish Message到pulsar broker中

(1)為了演示,我這裡創建了一個C#控制台項目

 

(2)我們使用官網推薦的C# pulsar客戶端包,添加安裝DotPulsar nuget包

(3)創建client

  //1。創建pulsar客戶端
           var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();

 

(4)創建生產者,發送消息

            //2、創建Pulsar Producer(生產者)
            var producer = client.NewProducer()
                     .Topic("persistent://public/default/mytopic")
                     .Create();
            var data = Encoding.UTF8.GetBytes("Hello Pulsar");
            await producer.Send(data);

上圖可見顯示創建producer成功。

(5)下面再創建一個客戶端來消費發送者發送的消息(「Hello Pulsar」)。

            //2、創建Pulsar Producer(消費者)
            var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            //3.消費消息
            await foreach (var message in consumer.Messages())
            {
                Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
            }

見上圖,發佈者發送消息成功被訂閱者消費。

4.代碼示例

//PublisherClient
  static async Task Main(string[] args)
        {
            Console.WriteLine("Hello Pulsar");
​
            //1。創建pulsar客戶端
            var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();
​
            //2、創建Pulsar Producer(生產者)
            var producer = client.NewProducer()
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            for (int i = 0; i < 5; i++)
            {
                var data = Encoding.UTF8.GetBytes($"Hello Pulsar {i}");
                await producer.Send(data);
                Console.WriteLine($"發送消息成功");
            }
            
            Console.ReadKey();
        }
        
        
  //SubscriberClient
  static async Task Main(string[] args)
        {
            //1。創建pulsar客戶端
            var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();
​
            //2、創建Pulsar Producer(消費者)
            var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            //3.消費消息
            await foreach (var message in consumer.Messages())
            {
                Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
            }
​
​
            Console.ReadKey();
        }
Tags: