最通俗易懂的Redis發布訂閱及程式碼實戰
發布訂閱簡介
除了使用List實現簡單的消息隊列功能以外,Redis還提供了發布訂閱的消息機制。在這種機制下,消息發布者向指定頻道(channel)發布消息,消息訂閱者可以收到指定頻道的消息,同一個頻道可以有多個消息訂閱者,如下圖:
Redis也提供了一些命令支援這個機制,接下來我們詳細介紹一下這些命令。
歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。
發布訂閱相關命令
在Redis中,發布訂閱相關命令有:
- 發布消息
- 訂閱頻道
- 取消訂閱
- 按照模式訂閱
- 按照模式取消訂閱
- 查詢訂閱資訊
發布消息
發布消息的命令是publish
,語法是:
publish 頻道名稱 消息
比如,要向channel:one-more-study:demo頻道發布一條消息「I am One More Study.」,命令如下:
> publish channel:one-more-study:demo "I am One More Study."
(integer) 0
返回的結果是訂閱者的個數,上例中沒有訂閱者,所以返回結果為0。
訂閱消息
訂閱消息的命令是subscribe
,訂閱者可以訂閱一個或者多個頻道,語法是:
subscribe 頻道名稱 [頻道名稱 ...]
比如,訂閱一個channel:one-more-study:demo頻道,命令如下:
> subscribe channel:one-more-study:demo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1
返回結果中有3條,分別表示:返回值的類型(訂閱成功)、訂閱的頻道名稱、目前已訂閱的頻道數量。當訂閱者接受到消息時,就會顯示:
1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."
同樣也是3條結果,分別表示:返回值的類型(資訊)、消息來源的頻道名稱、消息內容。
新開啟的訂閱者,是無法收到該頻道之前的歷史消息的,因為Redis沒有對發布的消息做持久化。
取消訂閱
取消訂閱的命令是unsubscribe
,可以取消一個或者多個頻道的訂閱,語法是:
unsubscribe [頻道名稱 [頻道名稱 ...]]
比如,取消訂閱channel:one-more-study:demo頻道,命令如下:
> unsubscribe channel:one-more-study:demo
1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0
返回結果中有3條,分別表示:返回值的類型(取消訂閱成功)、取消訂閱的頻道名稱、目前已訂閱的頻道數量。
歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。
按模式訂閱消息
按模式訂閱消息的命令是psubscribe
,訂閱一個或多個符合給定模式的頻道,語法是:
psubscribe 模式 [模式 ...]
每個模式以 * 作為匹配符,比如 channel* 匹配所有以 channel 開頭的頻道,命令如下:
> psubscribe channel:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
返回結果中有3條,分別表示:返回值的類型(按模式訂閱成功)、訂閱的模式、目前已訂閱的模式數量。當訂閱者接受到消息時,就會顯示:
1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."
返回結果中有4條,分別表示:返回值的類型(資訊)、消息匹配的模式、消息來源的頻道名稱、消息內容。
按模式取消訂閱
按模式取消訂閱的命令是punsubscribe
,可以取消一個或者多個模式的訂閱,語法是:
punsubscribe [模式 [模式 ...]]
每個模式以 * 作為匹配符,比如 channel:* 匹配所有以 channel 開頭的頻道,命令如下:
1> punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0
返回結果中有3條,分別表示:返回值的類型(按模式取消訂閱成功)、取消訂閱的模式、目前已訂閱的模式數量。
歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。
查詢訂閱資訊
查看活躍頻道
活躍頻道指的是至少有一個訂閱者的頻道,語法是:
pubsub channels [模式]
比如:
> pubsub channels
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"
> pubsub channels *demo
1) "channel:one-more-study:demo"
2) "channel:demo"
> pubsub channels *one-more-study*
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看頻道訂閱數
pubsub numsub [頻道名稱 ...]
比如:
> pubsub numsub channel:one-more-study:demo
1) "channel:one-more-study:demo"
2) (integer) 1
查看模式訂閱數
> pubsub numpat
(integer) 1
程式碼實戰
光說不練假把式,我們使用Java語言寫一個簡單的發布訂閱示例。
Jedis集群示例
Jedis是Redis官方推薦的Java連接開發工具,我們使用Jedis寫一個簡單的集群示例。
package onemore.study;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.util.HashSet;
import java.util.Set;
/**
* Jedis集群
*
* @author 萬貓學社
*/
public enum Cluster {
INSTANCE;
//為了簡單,把IP和埠直接寫在這裡,實際開發中寫在配置文件會更好。
private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
private JedisCluster jedisCluster;
Cluster() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
//最大連接數
poolConfig.setMaxTotal(20);
//最大空閑數
poolConfig.setMaxIdle(10);
//最小空閑數
poolConfig.setMinIdle(2);
//從jedis連接池獲取連接時,校驗並返回可用的連接
poolConfig.setTestOnBorrow(true);
//把連接放回jedis連接池時,校驗並返回可用的連接
poolConfig.setTestOnReturn(true);
Set<HostAndPort> nodes = new HashSet<>();
String[] hosts = hostAndPorts.split(";");
for (String hostport : hosts) {
String[] ipport = hostport.split(":");
String ip = ipport[0];
int port = Integer.parseInt(ipport[1]);
nodes.add(new HostAndPort(ip, port));
}
jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
}
public JedisCluster getJedisCluster() {
return jedisCluster;
}
}
歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。
發布者示例
package onemore.study;
import redis.clients.jedis.JedisCluster;
/**
* 發布者
*
* @author 萬貓學社
*/
public class Publisher implements Runnable {
private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";
@Override
public void run() {
JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
for (int i = 1; i <= 3; i++) {
String message = "第" + i + "消息";
System.out.println(Thread.currentThread().getName() + " 發布:" + message);
jedisCluster.publish(CHANNEL_NAME, message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------");
}
jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
}
}
訂閱者示例
package onemore.study;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;
/**
* 訂閱者
*
* @author 萬貓學社
*/
public class Subscriber implements Runnable {
private final String CHANNEL_NAME = "channel:one-more-study:demo";
private final String QUIT_COMMAND = "quit";
private final JedisPubSub jedisPubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println(Thread.currentThread().getName() + " 接收:" + message);
if (QUIT_COMMAND.equals(message)) {
unsubscribe(CHANNEL_NAME);
}
}
};
@Override
public void run() {
JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
}
}
歡迎關注微信公眾號:萬貓學社,每周一分享Java技術乾貨。
綜合示例
package onemore.study;
public class App {
public static void main(String[] args) throws InterruptedException {
//創建3個訂閱者
new Thread(new Subscriber()).start();
new Thread(new Subscriber()).start();
new Thread(new Subscriber()).start();
Thread.sleep(1000);
//創建發布者
new Thread(new Publisher()).start();
}
}
運行結果如下:
Thread-6 發布:第1消息
Thread-0 接收:第1消息
Thread-1 接收:第1消息
Thread-2 接收:第1消息
------------------
Thread-6 發布:第2消息
Thread-0 接收:第2消息
Thread-1 接收:第2消息
Thread-2 接收:第2消息
------------------
Thread-6 發布:第3消息
Thread-0 接收:第3消息
Thread-2 接收:第3消息
Thread-1 接收:第3消息
------------------
Thread-0 接收:quit
Thread-1 接收:quit
Thread-2 接收:quit
微信公眾號:萬貓學社
微信掃描二維碼
獲得更多Java技術乾貨
