消息隊列,推拉模式的區別在哪?

大家好,我是【架構擺渡人】,一隻十年的程式猿。這是消息隊列的第五篇文章,這個系列會給大家分享很多在實際工作中有用的經驗,如果有收穫,還請分享給更多的朋友。

在學習消息隊列的時候,大家都有一個共同的問題,那就是消息到底是服務端推送給客戶端還是客戶端主動去服務端拉取然後進行消費。今天這篇文章就來解答大家的這個的疑問。

推模式

首先我們來解決下什麼是推模式,顧名思義,推模式就是我推給你。在MQ中也就是Broker收到消息後主動推送給Consumer的操作,叫做推模式。

推模式的實現是客戶端會與服務端(Broker)建立長連接,當有消息時服務端會通過長連接通道將消息推送給客戶端,這樣客戶端就能實時消費到最新的消息。

優點:

  • 實時性強,有消息立馬推送給客戶端。

  • 客戶端實現簡單,只需要監聽服務端的推送即可。
    缺點:

  • 容易導致客戶端發生消息堆積的情況,因為每個客戶端的消費能力是不同的,如果簡單粗暴的有消息就推送,就會會出現堆積情況。

  • 服務端邏輯複雜,因為簡單的推送會導致客戶端出現堆積問題,所以服務端需要進行優化。記錄給每個客戶端的推送數據,然後根據每個客戶端的消費能力去平衡數據推送的速度。

拉模式

拉模式,顧名思義,就是我主動去拉取消息。在MQ中也就是Consumer主動向Broker詢問:有沒有消息啊,有的話給我一部分唄,我先拉1000條進行處理,處理完成之後再拉1000條。

拉模式肯定不能用傳統的定時拉取,定時長及時性無法保證,定時短,在沒有消息的情況下對服務端會一直請求。所以很多拉模式都是基於長輪詢來實現。

長輪詢就是客戶端向服務端發起請求,如果此時有數據就直接返回,如果沒有數據就保持連接,等到有數據時就直接返回。如果一直沒有數據,超時後客戶端再次發起請求,保持連接,這就是長輪詢的實現原理。很多的開源框架都是用的這種方式,比如配置中心Apollo的推送。

優點:

  • 不會造成客戶端消息積壓,消費完了再去拉取,主動權在自己手中。

  • 長輪詢實現的拉模式實時性也能夠保證。
    缺點:

  • 客戶端的邏輯實現相對複雜點,簡化了服務端的邏輯。
    推和拉都有各自的優勢和劣勢,不過目前主流的消息隊列大部分都用的拉模式,比如RocketMQ,Kafka。

拉模式程式碼實現

Java中可以使用Spring DeferredResult來實現非同步請求,如果有消息就直接返回,沒有消息則將此請求存儲起來,等到有消息是再通知該請求進行返回。如果一直沒有消息那麼就等到超時,客戶端收到超時消息重新進行消息的查詢。

首先我們定義一個消息查詢的介面,定義如下:

@GetMapping("/queryMessage")
public DeferredResult<String> queryMessage(String client) {
    DeferredResult<String> deferredResult = new DeferredResult<>(10000L);
    String msg = messageQueue.poll();
    if (Objects.nonNull(msg)) {
        deferredResult.setResult(msg);
    } else {
        deferredResult.onTimeout(() -> {
            deferredResultMap.remove(client);
        });
        deferredResultMap.put(client, deferredResult);
    }
    return deferredResult;
}

指定DeferredResult的超時時間為10秒,然後從messageQueue中獲取消息,此處的邏輯就是獲取沒有被消息的消息,這裡只是模擬。

如果有消息直接設置DeferredResult的result,立馬返回。如果當前沒有消息則註冊一個超時的回調,進行DeferredResult的移除動作。同時將DeferredResult對象快取起來。

然後我們在寫一個添加消息的介面,定義如下:

@GetMapping("/addMessage")
public String addMessage(String client) {
    messageQueue.add("test");
    DeferredResult deferredResult = deferredResultMap.get(client);
    if (Objects.nonNull(deferredResult)) {
        deferredResult.setResult("test");
    }
    return "success";
}

當有消息添加的時候,根據對應的client獲取快取的DeferredResult,如果有的話就直接設置結果,立馬返回,這樣客戶端就能立馬收到新的消息,實時性也有保證。

接下來模擬一個客戶端去查詢消息,定義如下:

ublic class MqClient {
    public static void main(String[] args) {
        queryMessage();
    }
    private static void queryMessage() {
        String result = request("//localhost:8080/queryMessage?client=xxx");
        if (result != null) {
            // 本地進行消費
            // ......
        }
        // 繼續拉取消息
        queryMessage();
    }
    private static String request(String url) {
        HttpURLConnection connection = null;
        BufferedReader reader = null;
        try {
            URL getUrl = new URL(url);
            connection = (HttpURLConnection) getUrl.openConnection();
            connection.setReadTimeout(20000);
            connection.setConnectTimeout(3000);
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Accept-Charset", "utf-8");
            connection.setRequestProperty("Content-Type", "application/json");
            connection.setRequestProperty("Charset", "UTF-8");
            System.out.println(connection.getResponseCode());
            if (200 == connection.getResponseCode()) {
                reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line = null;
                while ((line = reader.readLine()) != null) {
                    result.append(line);
                }
                System.out.println("結果 " + result);
                return result.toString();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
        return null;
    }
}

這裡需要注意的是,客戶端的請求超時時間要大於服務端定義的超時時間,主流程就是有消息進行本地消費,然後繼續拉取。

如果沒有消息,請求會一直等待,知道服務端超時,此時客戶端這邊會拿到http response code的值為503,然後繼續查詢消息。

所以大家可以看到,拉模式主要是客戶端來主導,至於拉取速度客戶端都可以進行控制,如果消息量夠大的話,每次拉取都能拿到沒有被消費的數據,基本上不會產生等等超時的情況。即使某些時候沒有拉取到新的消息,只要有新消息,服務端也會立馬獲取等待的DeferredResult進行結果的設置,立馬響應結果。

總結

本文給大家介紹了推拉模式的概念以及各自的優劣勢,同時也介紹了拉模式的實現原理,當然本文所示的程式碼並不程式碼開源框架裡面就是用的這種方式,只是告訴大家長輪詢的基本實現方式。

如果大家有這樣推送的場景,如果想用最簡單的方式實現,長輪詢是一個不錯的方式。在很多開源框架中都有類似的應用。

原創:架構擺渡人(公眾號ID:jiagoubaiduren),歡迎分享,轉載請保留出處。

開發自測mock框架://github.com/yinjihuan/fox-mock