[Apache Pulsar] 企業級分佈式消息系統-Pulsar入門基礎

  • 2019 年 10 月 3 日
  • 筆記

Apache Pulsar

Pulsar是一個支持多租戶的、高性能的服務與服務之間消息通訊的解決方案,最初由雅虎開發,現在由Apache軟件基金會管理。 

Pulsar在Yahoo的生產環境運行了三年多,助力Yahoo的主要應用,如Yahoo Mail、Yahoo Finance、Yahoo Sports、Flickr、Gemini廣告平台和Yahoo分佈式鍵值存儲系統Sherpa。

 Kafka不夠好,智聯招聘基於Pulsar打造企業級事件中心。

Pulsar的主要特性如下:

  • Pulsar實例原生支持多集群,能無縫的基於地理位置進行跨集群備份
  • 非常低的消息發佈和端到端的延遲
  • 無縫擴展到超過百萬個topic
  • 支持Java,Go,Pytho和C++的客戶端
  • Topic支持多種訂閱模式: 獨佔(exclusive), 共享(shared)和災備(failover)
  • 通過Apache BookKeeper提供的持久化消息存儲機制保證消息的送達
  • serverless的輕量級計算框架Pulsar Functions提供了原生的流數據處理
  • serverless的連接器框架Pulsar IO構建於 Pulsar Functions之上,能夠輕鬆的將數據從Pulsar中移入和移出
  • 當數據老化時,分層存儲將數據從熱存儲卸載到冷存儲(如S3和GCS)  

消息隊列的使用場景包括異步處理,應用解耦,流量削鋒和消息通訊四個場景.

1. 消息系統概念

Pulsar採用了發佈訂閱的設計模式,也稱作pub-sub。該設計模式中,producer發佈消息到topic,consumer可以訂閱這些topic,處理發佈過來的消息,在處理完成後發送確認。
一旦訂閱被創建,所有的消息都將被Pulsar保留,即使consumer斷開連接。 只有在consumer確認消息被成功處理後,保留下來的消息才會被丟棄。

1.1 Messages

消息是Pulsar的基礎單元。 消息就是producer發給topic的內容,以及consumer從topic消費的內容(消息處理完成後發送確認)。 消息類似於郵政系統中的信件。
消息包含了多個屬性:Value(數據),Key(打標籤,用來壓縮消息),Properties(可選,用戶自定義key/value),Producer name(生產者名稱,可默認生成,也可指定),Sequence ID(消息的序列id),Publish time(發佈時間,生產者自動加上),Event time(消息的可選時間戳)。

1.2 Producers

生產者是關聯topic的程序,它發佈消息到Pulsar的broker上。
發送模式:Producer可以以同步(sync)或者異步(async)的方式發佈消息到broker。

  • 同步發送:producer發送每條消息後會等待broker的確認,如果沒有收到確認信息,producer會認為發送失敗
  • 異步發送:Producer將會把消息放入blocking隊列,然後馬上返回。 然後客戶端在後台將消息發送給broker。如果隊列已滿( 配置的最大數量),根據傳入producer的參數,producer可能阻塞或者直接返回失敗。

壓縮:消息在發送過程中可以被壓縮來節省帶寬,pulsar支持LZ4,ZLIB,ZSTD,SNAPPY類型。
批處理:如果啟用了批處理,生產者將在單個請求中發送批量消息。批處理大小由最大消息數和最大發佈延遲決定。

1.3 Consumers

消費者是訂閱關聯topic,然後接收消息的程序。
接收模式:消息可以通過同步或者異步的方式從broker接收。

  • 同步接收:同步接收將會阻塞,直到消息可用
  • 異步接收:異步接收立即返回future值,例如java中的CompletableFuture,一旦新消息可用,它立即完成。

監聽:客戶端庫為consumers提供listener的實現,例如Java客戶端,提供MesssageListener接口,實現該接口,一旦接受到新的消息,received方法將被調用。

void received(Consumer<T> consumer,Message<T> msg);

 

確認:當一個consumer 成功消費掉一條消息後,那麼這個consumer會發送一個確認請求到broker,broker會丟棄這條消息,否則保存這條消息。
消息的確認可以一個接一個,也可以累積一起。 累積確認時,消費者只需要確認最後一條他收到的消息。 所有之前(包含此條)的消息,都不會被再次重發給那個消費者。

累積消息確認不能用於共享訂閱模式,因為共享模式中,一個訂閱會涉及到多個消費者。
共享模式中,多條消息可以單獨確認。

否定確認:當consumer 在一定時間內沒有成功消費消息,而想再次消費該條消息,那麼這個consumer可以發送一個否定確認到broker,然後broker重發這條消息。消息可以一條接一條的否定確認,也可以累積否定確認,這取決於消費訂閱模式。在獨佔和災備模式,消費者只能否定確認其接收的最後一條消息。在共享模式,消費者可以獨立否定確認。
確認超時:當一條消息沒有被成功消費,並且您想要觸發broker自動重發消息時,您可以採用未確認消息自動重發機制。客戶端將在整個AckTimeout時間範圍內跟蹤未確認的消息,並在指定確認超時時間自動向broker發送重發未確認的消息請求。

在確認超時之前使用否定確認。否定確認以更精確的方式控制單個消息的重發,並在消息處理時間超過確認超時時間後,避免無效的重發消息。

死信(Dead letter)topic:死信topic使您能夠在消費者無法成功消費某些消息時消費新消息。在這種機制中,無法消費的消息存儲在單獨的topic,稱為死信topic。您可以決定如何處理死信topic中的消息。
在Java客戶端中,可以使用以下例子處理死信topic:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)            .topic(topic)            .subscriptionName("my-subscription")            .subscriptionType(SubscriptionType.Shared)            .deadLetterPolicy(DeadLetterPolicy.builder()                  .maxRedeliverCount(maxRedeliveryCount)                  .build())            .subscribe();

 

死信topic依賴於消息的重發。您需要確認消息的重發方法:否定確認或確認超時。在確認超時之前使用否定確認。

目前,死信topic僅適用於共享模式。

1.4 Topics

和其他的發佈訂閱系統一樣,Pulsar 中的 topic 是被命名的通道,用做從producer到 consumer傳輸消息。 Topic的名稱是具有明確定義結構的URL:

{persistent|non-persistent}://tenant/namespace/topic

 

persistent/non-persistent:topic的類型,包括持久化和非持久化(默認是持久類型)。topic指定持久化後,所有的消息會持久化到硬盤(這意味着多塊硬盤,除非是單機模式的broker)。反之,非持久topic的數據不會存儲到硬盤上。
tenant:topic在實例中的租戶,租戶對於Pulsar的多租戶來說是必不可少的,可以分佈在多個集群中。
namespace:Topic的管理單元,充當關聯topic組的管理機制。 大多數的topic配置在namespace層面生效。 每個tenant可以有多個namespace。
topic:topic名稱是自由定義的,在pulsar實例中無特殊意義。

1.4.1 namespace

命名空間是租戶內部邏輯上的命名術語。 一個租戶可以通過admin API創建多個命名空間。 例如,一個對接多個應用的租戶,可以為每個應用創建不同的namespace。 Namespace使得程序可以以層級的方式創建和管理topic。 例如:”my-tenant/app1″ ,它的namespace是app1這個應用,對應的租戶是 my-tenant。 你可以在namespace下創建任意數量的topic。

1.4.2 訂閱模型

訂閱是命名好的配置規則,用於確定如何將消息發給消費者。Pulsar有三種訂閱模式:exclusive(獨佔),shared(共享),failover(災備)。 下圖展示了這三種模式:

1.4.2.1 Exclusive

獨佔模式,只能有一個消費者訂閱topic。 如果多於一個消費者嘗試以同樣方式去訂閱topic,消費者將會收到錯誤。
上面的圖中,只有Consumer A可以消費。

Exclusive模式為默認訂閱模式。

1.4.2.2 Failover

災備模式,多個consumer可以綁定到同一個訂閱。Consumer將會按字典順序排序,第一個consumer被初始化為唯一接受消息的消費者,這個consumer被稱為master consumer。
當master consumer斷開時,所有的消息(未被確認和後續進入的)將會被分發給隊列中的下一個consumer。 下圖中,Consumer B-0是master consumer,當Consumer B-0斷開連接時,由於Consumer B-1在隊列中下一個位置,那麼它將會開始接收消息。

1.4.2.3 Shared

共享模式,多個消費者可以綁定到同一個訂閱上。 消息通過round robin輪詢機制分發給不同的消費者,並且每個消息僅會被分發給一個消費者。當消費者斷開連接,所有被發送給他,但沒有被確認的消息將被重新安排,分發給其它存活的消費者。
下圖中,topic下有5條消息,m0~m4,消費者有C1/C2/C3,最終m0和m3分配給C1,m1分給C2,m2和m4分給C3,可以說明每個消息僅發給一個消費者。

Shared模式的限制: 有兩點需注意,1、不保證消息順序; 2、不能使用累計確認

Key_shared:
在Key-shared模式下,多個消費者可以關聯到同一訂閱。消息以分佈式在消費者之間傳遞,具有相同key/orderingKey 的消息僅傳遞給一個消費者。無論消息被重發多少次,它都發給同一個消費者。當消費者連接或斷開連接時,將導致某些消息的key的消費者變更。

該模式限制:消息必須指定key/orderingKey;不能使用累計確認;該模式目前是測試版,可以在broker.config禁用。

1.5 多topic訂閱

當consumer訂閱pulsar的topic時,它默認指定訂閱了一個topic,例如:persistent://public/default/my-topic。 從Pulsar的1.23.0-incubating的版本開始,Pulsar消費者可以同時訂閱多個topic。 你可以用以下兩種方式定義topic的列表:

  • 通過最基礎的正則表達式(regex),例如 persistent://public/default/finance-.*
  • 通過明確指定的topic列表

通過正則訂閱多主題時,所有的主題必須在同一個namespace。

當訂閱多主題時,Pulsar客戶端會自動調用Pulsar的API來發現匹配表達式或者列表的所有topic,然後全部訂閱。 如果此時有暫不存在的topic,那麼一旦這些topic被創建,conusmer會自動訂閱。

不能保證順序性 當消費者訂閱多topic時,Pulsar所提供對單一topic訂閱的順序保證,就hold不住了。 如果你在使用Pulsar的時候,遇到必須保證順序的需求,強烈建議不要使用此特性。

下面是多主題訂閱在java中的例子:

import java.util.regex.Pattern;    import org.apache.pulsar.client.api.Consumer;  import org.apache.pulsar.client.api.PulsarClient;    PulsarClient pulsarClient = // 實例化pulsar客戶端    // 訂閱一個namespace下的所有topic  Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");  Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()                  .topicsPattern(allTopicsInNamespace)                  .subscriptionName("subscription-1")                  .subscribe();    // 根據正則訂閱一個namespace下的多個topic  Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");  Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()                  .topicsPattern(someTopicsInNamespace)                  .subscriptionName("subscription-1")                  .subscribe();

 

1.6 Partitioned topics(分區topic)

通常一個topic僅被一個broker服務,這限制了topic的最大吞吐量。 分區topic是特殊的topic類型,他可以被多個broker處理,這讓topic有更高的吞吐量。
其實在背後,分區的topic通過N個內部topic實現,N是分區的數量。 當向分區的topic發送消息,每條消息被路由到其中一個broker。 Pulsar自動處理跨broker的分區分佈。
下圖對此做了闡明:
分析上圖可知,Topic1有5個分區(P0到P4),分佈在3個broker上。因為分區數量多於broker數量,其中有兩個broker每個處理兩個分區,第三個broker則只處理一個。(再次強調,分區的分佈是Pulsar自動處理的)。
這個topic的消息被廣播給兩個consumer,路由模式決定哪個broker處理哪個partition,訂閱模式決定哪條消息發送到哪個consumer。
大多數境況下,路由和訂閱模式可以分開制定。通常來講,吞吐能力的要求,決定了分區/路 的方式。訂閱模式則應該由應用來做決定。
分區topic和普通topic,對於訂閱模式如何工作,沒有任何不同。分區只是決定了從生產者生產消息到消費者處理及確認消息過程中發生的事情。
分區topic需要通過admin API顯式創建,創建topic時可以指定分區數。

1.6.1 路由模式

發佈到分區主題時,必須指定路由模式。路由模式決定每個消息應該發佈到哪個分區,即哪個內部主題。三種路由模式如下:

  • RoundRobinPartition:如果沒有key,所有的消息通過輪詢方式被路由到不同的分區,以達到最大吞吐量。請注意round-robin並不是作用於每條單獨的消息,而是作用於延遲處理的批次邊界,以確保批處理有效。 如果為message指定了key,分區的producer會把key做hash,然後分配消息到指定的分區。 這是默認的模式。
  • SinglePartition:如果沒有key被提供,producer將會隨機選擇一個分區,把所有的消息發往該分區。 如果為message指定了key,分區的producer會把key做hash,然後分配消息到指定的分區。
  • CustomPartition:使用客制化消息路由實現,可以決定特定的消息進入指定的分區。 用戶可以創建客制化的路由模式,通過使用 Java client ,實現MessageRouter接口。

1.7 順序保證

消息的順序與消息路由模式和消息的key有關。通常,用戶需要對每個key分區的消息保證順序。
當使用 SinglePartition或者RoundRobinPartition模式時,如果消息有key,消息將會被路由到匹配的分區,這是基於ProducerBuilder 中HashingScheme 指定的散列shema。
順序保證有兩種方式:

  • 按key分區:所有擁有相同key的消息有序, 並且會被發送至相同的partition。使用SinglePartition或RoundRobinPartition模式, 每條消息都需要有key。
  • 按producer:來自於相同producer的消息有序,路由策略為SinglePartition, 且每條消息都沒有key。
1.7.1 HashingScheme

HashingScheme 是代表一組標準散列函數的枚舉,為一個指定消息選擇分區時使用。 有兩種可用的散列函數:JavaStringHash 和Murmur332Hash,producer 的默認hash函數是JavaStringHash。請注意,當producer可能來自於不同語言客戶端時,JavaStringHash是不起作用的。建議使用Murmur332Hash。

1.8 非持久topic

默認情況下,Pulsar保存所有沒有確認的消息到多個BookKeeper的bookies中(存儲節點)。持久topic的消息數據可以在broker重啟或者訂閱者出問題的情況下存活下來。 因此,持久性topic上的消息數據可以在 broker 重啟和訂閱者故障轉移之後繼續存在。
但是,Pulsar還支持非持久性topic,這些topic的消息從不持久化存儲到磁盤,只存在於內存中。 Pulsar也提供了非持久topic。非持久topic的消息不會被保存在硬盤上,只存活於內存中。當使用非持久topic分發時,關掉Pulsar的broker或者關閉訂閱者,此topic( non-persistent))上所有的瞬時消息都會丟失,意味着客戶端可能會遇到消息缺失。
非持久性topic具有這種形式的名稱(注意名稱中的 non-persistent):

non-persistent://tenant/namespace/topic

 

非持久topic中,broker會立即發佈消息給所有連接的訂閱者,而不會在BookKeeper中存儲。 如果有一個訂閱者斷開連接,broker將無法重發這些瞬時消息,訂閱者將永遠也不能收到這些消息了。 去掉持久化存儲的步驟,在某些情況下,使得非持久topic的消息比持久topic稍微變快。但是同時,Pulsar的一些核心優勢也喪失掉了。

非持久topic,消息數據僅存活在內存。 如果broker掛掉或者因其他情況不能從內存取到,你的消息數據就可能丟失。 只有在真的確信你的使用場景符合,並且你可以忍受時,才可去使用非持久topic。

默認非持久topic在broker上是開啟的。 你可以通過broker的配置關閉。 你可以通過使用pulsar-admin-topics接口管理非持久topic。

1.8.1 性能

非持久消息通常比持久消息更快,因為broker無須持久化消息,當消息被分發給所有訂閱者時,會立即發送ack給producer。 非持久topic讓producer有更低的發佈延遲。

1.8.2 客戶端API

Producer和consumer連接持久topic和連接到非持久topic的方式是一樣的。非持久的區別在於,topic的名稱必須以non-persistent開頭。 三種訂閱模式–exclusive,shared,failover對於非持久topic都是支持的。
下面是一個非持久topic的java consumer例子:

PulsarClient client = PulsarClient.builder()      .serviceUrl("pulsar://localhost:6650")      .build();  String npTopic = "non-persistent://public/default/my-topic"; //這裡表明是非持久化  String subscriptionName = "my-subscription-name";    Consumer<byte[]> consumer = client.newConsumer()      .topic(npTopic)      .subscriptionName(subscriptionName)      .subscribe();

 

這裡還有一個非持久topic的java producer例子:

Producer<byte[]> producer = client.newProducer()              .topic(npTopic)              .create();

 

1.9 消息保留和到期(retention and expiry)

Pulsar broker默認如下:

  • 立即刪除所有已經被cunsumer確認過的的消息
  • 以消息backlog的形式,持久保存所有的未被確認消息

Pulsar有兩個特性,讓你可以覆蓋上面的默認行為:

  • 消息存留讓你可以保存consumer確認過的消息
  • 消息過期讓你可以給未被確認的消息設置存活時長(TTL) 所有消息保留和到期都在namespace級別進行管理。有關操作方法,請參閱Message retention and expiry cookbook。
    下圖說明了這兩種概念: 圖中第一個是消息存留,存留規則會被用於某namespace下所有的topic,指明哪些消息會被持久存儲,即使已經被確認過。 沒有被留存規則覆蓋的消息將會被刪除。 沒有留存規則的話,所有被確認的消息都會被刪除。
    圖中第二個是消息過期,有些消息即使還沒有被確認,也被刪除掉了。因為根據設置在namespace上的TTL,他們已經過期了。(例如,TTL為5分鐘,過了十分鐘消息還沒被確認)

1.10 重複數據消除(Message deduplication)

當消息被Pulsar持久化多於一次的時候,消息就會重複。 消息去重是Pulsar可選的特性,阻止不必要的消息重複,每條消息僅處理一次,即使消息被接收多次。
下圖說明了禁用和啟用重複數據消除的情況:
上圖第一個場景中,消息去重被關閉。 Producer發佈消息1到一個topic,消息到達broker後,被持久化到BookKeeper。 然後producer又發送了消息1(可能因為某些重試邏輯),然後消息被接收後又持久化在BookKeeper,這意味着消息重複發生了。
在第二個場景中,producer發送了消息1,消息被broker接收然後持久化,和第一個場景是一樣的。 當producer再次發送消息時,broker知道已經收到個消息1,所以不會再持久化消息1。

消息重複數據消除是在namespace級別處理的。

1.10.1 生產者冪等

消息去重的另外一種方法是確保每條消息僅生產一次。 這種方法通常被叫做生產者冪等。 這種方式的缺點是,把消息去重的工作推給了應用去做。 在Pulsar中,去重被broker處理的,這意味着你不需要修改你的客戶端代碼。 你只需要做一些管理上的變化(參考Managing message deduplication )。

1.10.2 去重和實際一次語義

消息去重,使Pulsar成為與流處理引擎(SPE)或者其他尋求實際一次處理語義的系統連接的完美消息系統。 消息系統若不提供自動消息去重,則需要SPE或者其他系統保證去重。這意味着嚴格的消息順序來自於讓程序承擔額外的去重工作。 使用Pulsar,嚴格的順序保證不會帶來任何應用層面的代價。

結語

由於篇幅有限,本篇文章只講述Pulsar消息系統的基本概念,下篇文章重點講解客戶端庫使用教程。