最通俗易懂的Redis發佈訂閱及代碼實戰

發佈訂閱簡介

除了使用List實現簡單的消息隊列功能以外,Redis還提供了發佈訂閱的消息機制。在這種機制下,消息發佈者向指定頻道(channel)發佈消息,消息訂閱者可以收到指定頻道的消息,同一個頻道可以有多個消息訂閱者,如下圖:

在這裡插入圖片描述

Redis也提供了一些命令支持這個機制,接下來我們詳細介紹一下這些命令。

歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。

發佈訂閱相關命令

在Redis中,發佈訂閱相關命令有:

  1. 發佈消息
  2. 訂閱頻道
  3. 取消訂閱
  4. 按照模式訂閱
  5. 按照模式取消訂閱
  6. 查詢訂閱信息

發佈消息

發佈消息的命令是publish,語法是:

publish 頻道名稱 消息

比如,要向channel:one-more-study:demo頻道發佈一條消息「I am One More Study.」,命令如下:

> publish channel:one-more-study:demo "I am One More Study."
(integer) 0

返回的結果是訂閱者的個數,上例中沒有訂閱者,所以返回結果為0。

訂閱消息

訂閱消息的命令是subscribe,訂閱者可以訂閱一個或者多個頻道,語法是:

subscribe 頻道名稱 [頻道名稱 ...]

比如,訂閱一個channel:one-more-study:demo頻道,命令如下:

> subscribe channel:one-more-study:demo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1

返回結果中有3條,分別表示:返回值的類型(訂閱成功)、訂閱的頻道名稱、目前已訂閱的頻道數量。當訂閱者接受到消息時,就會顯示:

1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."

同樣也是3條結果,分別表示:返回值的類型(信息)、消息來源的頻道名稱、消息內容。

新開啟的訂閱者,是無法收到該頻道之前的歷史消息的,因為Redis沒有對發佈的消息做持久化。

取消訂閱

取消訂閱的命令是unsubscribe,可以取消一個或者多個頻道的訂閱,語法是:

unsubscribe [頻道名稱 [頻道名稱 ...]]

比如,取消訂閱channel:one-more-study:demo頻道,命令如下:

> unsubscribe channel:one-more-study:demo
1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0

返回結果中有3條,分別表示:返回值的類型(取消訂閱成功)、取消訂閱的頻道名稱、目前已訂閱的頻道數量。

歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。

按模式訂閱消息

按模式訂閱消息的命令是psubscribe,訂閱一個或多個符合給定模式的頻道,語法是:

psubscribe 模式 [模式 ...]

每個模式以 * 作為匹配符,比如 channel* 匹配所有以 channel 開頭的頻道,命令如下:

> psubscribe channel:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1

返回結果中有3條,分別表示:返回值的類型(按模式訂閱成功)、訂閱的模式、目前已訂閱的模式數量。當訂閱者接受到消息時,就會顯示:

1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."

返回結果中有4條,分別表示:返回值的類型(信息)、消息匹配的模式、消息來源的頻道名稱、消息內容。

按模式取消訂閱

按模式取消訂閱的命令是punsubscribe,可以取消一個或者多個模式的訂閱,語法是:

punsubscribe [模式 [模式 ...]]

每個模式以 * 作為匹配符,比如 channel:* 匹配所有以 channel 開頭的頻道,命令如下:

1> punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0

返回結果中有3條,分別表示:返回值的類型(按模式取消訂閱成功)、取消訂閱的模式、目前已訂閱的模式數量。

歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。

查詢訂閱信息

查看活躍頻道

活躍頻道指的是至少有一個訂閱者的頻道,語法是:

pubsub channels [模式]

比如:

> pubsub channels
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"
> pubsub channels *demo
1) "channel:one-more-study:demo"
2) "channel:demo"
> pubsub channels *one-more-study*
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看頻道訂閱數
pubsub numsub [頻道名稱 ...]

比如:

> pubsub numsub channel:one-more-study:demo
1) "channel:one-more-study:demo"
2) (integer) 1
查看模式訂閱數
> pubsub numpat
(integer) 1

代碼實戰

光說不練假把式,我們使用Java語言寫一個簡單的發佈訂閱示例。

Jedis集群示例

Jedis是Redis官方推薦的Java連接開發工具,我們使用Jedis寫一個簡單的集群示例。

package onemore.study;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

/**
 * Jedis集群
 *
 * @author 萬貓學社
 */
public enum Cluster {
    INSTANCE;

    //為了簡單,把IP和端口直接寫在這裡,實際開發中寫在配置文件會更好。
    private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
    private JedisCluster jedisCluster;

    Cluster() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();

        //最大連接數
        poolConfig.setMaxTotal(20);
        //最大空閑數
        poolConfig.setMaxIdle(10);
        //最小空閑數
        poolConfig.setMinIdle(2);

        //從jedis連接池獲取連接時,校驗並返回可用的連接
        poolConfig.setTestOnBorrow(true);
        //把連接放回jedis連接池時,校驗並返回可用的連接
        poolConfig.setTestOnReturn(true);

        Set<HostAndPort> nodes = new HashSet<>();
        String[] hosts = hostAndPorts.split(";");
        for (String hostport : hosts) {
            String[] ipport = hostport.split(":");
            String ip = ipport[0];
            int port = Integer.parseInt(ipport[1]);
            nodes.add(new HostAndPort(ip, port));
        }
        jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
    }

    public JedisCluster getJedisCluster() {
        return jedisCluster;
    }
}

歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。

發佈者示例

package onemore.study;

import redis.clients.jedis.JedisCluster;

/**
 * 發佈者
 *
 * @author 萬貓學社
 */
public class Publisher implements Runnable {
    private final String CHANNEL_NAME = "channel:one-more-study:demo";
    private final String QUIT_COMMAND = "quit";

    @Override
    public void run() {
        JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
        for (int i = 1; i <= 3; i++) {
            String message = "第" + i + "消息";
            System.out.println(Thread.currentThread().getName() + " 發佈:" + message);
            jedisCluster.publish(CHANNEL_NAME, message);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("------------------");
        }
        jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
    }
}

訂閱者示例

package onemore.study;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

/**
 * 訂閱者
 *
 * @author 萬貓學社
 */
public class Subscriber implements Runnable {
    private final String CHANNEL_NAME = "channel:one-more-study:demo";
    private final String QUIT_COMMAND = "quit";

    private final JedisPubSub jedisPubSub = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName() + " 接收:" + message);
            if (QUIT_COMMAND.equals(message)) {
                unsubscribe(CHANNEL_NAME);
            }
        }
    };

    @Override
    public void run() {
        JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
        jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
    }
}

歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。

綜合示例

package onemore.study;

public class App {
    public static void main(String[] args) throws InterruptedException {
        //創建3個訂閱者
        new Thread(new Subscriber()).start();
        new Thread(new Subscriber()).start();
        new Thread(new Subscriber()).start();
        Thread.sleep(1000);

        //創建發佈者
        new Thread(new Publisher()).start();
    }
}

運行結果如下:

Thread-6 發佈:第1消息
Thread-0 接收:第1消息
Thread-1 接收:第1消息
Thread-2 接收:第1消息
------------------
Thread-6 發佈:第2消息
Thread-0 接收:第2消息
Thread-1 接收:第2消息
Thread-2 接收:第2消息
------------------
Thread-6 發佈:第3消息
Thread-0 接收:第3消息
Thread-2 接收:第3消息
Thread-1 接收:第3消息
------------------
Thread-0 接收:quit
Thread-1 接收:quit
Thread-2 接收:quit

微信公眾號:萬貓學社

微信掃描二維碼

獲得更多Java技術乾貨