【面試必備】聊聊高性能延時隊列應用

  • 2020 年 2 月 18 日
  • 筆記

延時隊列的應用場景:

下單後,30分鐘內未付款就自動取消訂單等; 支付後,24小時未評論自動好評; 在我們實際開發過程中,應用場景很多…

基於Redis Zset 實現

實現原理

Redis由於其自身的Zset數據結構,也同樣可以實現延時的操作。 Zset本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加元素時候可以指定,每次指定score後,Zset會自動重新按新的值調整順序

  1. 如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它會按照時間戳大小進行排序,也就是對執行時間前後進行排序。
> ZADD delay_queue 1581309229  taskId_1  (integer) 1  > ZADD delay_queue 1581309129  taskId_2  (integer) 1  > ZADD delay_queue 1581309329  taskId_3  (integer) 1
  1. 不斷地進行取第一個key值,如果當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的。 注意不需要遍歷整個Zset集合,以免造成性能浪費。
> ZRANGE delay_queue 0 -1 withscores  1) "taskId_2"  2) "1581309129"  3) "taskId_1"  4) "1581309229"  5) "taskId_3"  6) "1581309329"

使用注意

  • 遍歷邏輯,刪除邏輯,注意使用 Redis Lua 封裝,確保原子性操作。更要注意 Redis Lua 在 Redis Cluster 的偽集群問題。
  • 若是JAVA 語言可以直接使用 redisson,封裝了 DelayedQueue 的實現。

源碼邏輯 org/redisson/RedissonDelayedQueue.java

Beanstalkd 消息隊列

Beanstalkd,一個高性能、輕量級的分散式記憶體隊列系統。支援過有9.5 million用戶的Facebook Causes應用。後來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。

部署使用

  • Linux 安裝 || docker 部署
yum install beanstalkd    ||    docker run -d -p 11300:11300 pig4cloud/beanstalkd
  • 客戶端使用,pom 依賴
<!--封裝了 官方的 java sdk,只支援 springboot 2.X-->  <dependency>      <groupid>com.pig4cloud.beanstalk</groupid>      <artifactid>beanstalkd-client-spring-boot-starter</artifactid>      <version>0.0.1</version>  </dependency>
  • 默認配置
  • 程式碼使用
@Autowired  private JobProducer producer;    /**   * @param delay    是一個整形數,表示將job放入ready隊列需要等待的秒數   * @param ttr      time to run—是一個整形數,表示允許一個worker執行該job的秒數。這個時間將從一個worker 獲取一個job開始計算。   *                 如果該worker沒能在<ttr> 秒內刪除、釋放或休眠該job,這個job就會超時,服務端會主動釋放該job。   *                 最小ttr為1。如果客戶端設置了0,服務端會默認將其增加到1。   * @param priority 優先順序 0~2**32的整數,最高優先順序是0   */  @Test  public void testSend() {  	String taskId = "1";// 業務對象資訊      producer.putJob(0, 10, 10, taskId.getBytes());  }
@Component  public class DemoJobConsumer extends AbstractTubeConsumerListener {        @Override      public void work(JobConsumer consumer) {          // 阻塞多少秒獲取一次 Job          Job job = consumer.reserveJob(1000L);            // 消費此Job          consumer.deleteJob(job.getId());            // 執行延時的業務邏輯          String biz = new String(job.getData());      }  }

擴展

  • 資料庫,利用定時任務輪詢實現,業務量大會性能瓶頸。
  • 延時隊列的其他實現,比如 rabbitmq 利用ttl特性可以實現。無法取消已放入隊列裡面的數據,使用時特別注意死信隊列的配置等。
  • 還可以自己根據 時間輪片的演算法 自行實現 。
  • 總之一切,都要有補償的邏輯,無論是業務人員手動觸發還是自動補償。

> 項目推薦: Spring Cloud 、Spring Security OAuth2的RBAC許可權管理系統 歡迎關注 </ttr>