Java簡單實現滑動窗口

  • 2020 年 1 月 13 日
  • 筆記

由於最近有一個統計單位時間內某key的訪問次數的需求,譬如每5秒訪問了redis的某key超過100次,就取出該key單獨處理。

這樣的單位時間統計,很明顯我們都知道有個邊界問題,譬如5秒內100次的限制。剛好前4.99秒訪問都是0,最後0.01秒來了100次,5.01秒又來了100次。也就是訪問有明顯的毛刺情況出現,為了弱化這個毛刺情況,我們可以採用滑動窗口。

滑動窗口

滑動窗口的主要原理比較簡單,就是將這個單位時間進行拆分,譬如5秒的統計範圍,我們將它劃分成5個1秒。

當請求進來時,先判斷當前請求屬於這5個1秒的時間片中的哪個,然後將對應的時間片對應的統計值加1,再判斷當前加上前4個時間片的次數總和是否已經超過了設置的閾值。

當時間已經到達第6個時間片時,就把第一個時間片給幹掉,因為無論第一片是多少個統計值,它都不會再參與後續的計算了。

就這樣,隨著時間的推移,統計值就隨著各個時間片的滾動,不斷地進行統計。

具體要將單位時間拆分為多少片,要根據實際情況來決定。當然,毫無疑問的是切分的越小,毛刺現象也越少。系統統計也越準確,隨之就是記憶體佔用會越大,因為你的這個窗口的數組會更大。

程式碼實現思路就是定義好分片數量,每個分片都有一個獨立的計數器,所有的分片合計為一個數組。當請求來時,按照分片規則,判斷請求應該劃分到哪個分片中去。要判斷是否超過閾值,就將前N個統計值相加,對比定義的閾值即可。

程式碼我直接引用別人寫好的了,源程式碼在https://www.iteye.com/blog/go12345-1744728

import java.util.concurrent.atomic.AtomicInteger;    /**   * 滑動窗口。該窗口同樣的key,都是單執行緒計算。   *   * @author wuweifeng wrote on 2019-12-04.   */  public class SlidingWindow {      /**       * 循環隊列,就是裝多個窗口用,該數量是windowSize的2倍       */      private AtomicInteger[] timeSlices;      /**       * 隊列的總長度       */      private int timeSliceSize;      /**       * 每個時間片的時長,以毫秒為單位       */      private int timeMillisPerSlice;      /**       * 共有多少個時間片(即窗口長度)       */      private int windowSize;      /**       * 在一個完整窗口期內允許通過的最大閾值       */      private int threshold;      /**       * 該滑窗的起始創建時間,也就是第一個數據       */      private long beginTimestamp;      /**       * 最後一個數據的時間戳       */      private long lastAddTimestamp;        public static void main(String[] args) {          //1秒一個時間片,窗口共5個          SlidingWindow window = new SlidingWindow(100, 4, 8);          for (int i = 0; i < 100; i++) {              System.out.println(window.addCount(2));                window.print();              System.out.println("--------------------------");              try {                  Thread.sleep(102);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }      }        public SlidingWindow(int duration, int threshold) {          //超過10分鐘的按10分鐘          if (duration > 600) {              duration = 600;          }          //要求5秒內探測出來的,          if (duration <= 5) {              this.windowSize = 5;              this.timeMillisPerSlice = duration * 200;          } else {              this.windowSize = 10;              this.timeMillisPerSlice = duration * 100;          }          this.threshold = threshold;          // 保證存儲在至少兩個window          this.timeSliceSize = windowSize * 2;            reset();      }        public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {          this.timeMillisPerSlice = timeMillisPerSlice;          this.windowSize = windowSize;          this.threshold = threshold;          // 保證存儲在至少兩個window          this.timeSliceSize = windowSize * 2;            reset();      }        /**       * 初始化       */      private void reset() {          beginTimestamp = SystemClock.now();          //窗口個數          AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];          for (int i = 0; i < timeSliceSize; i++) {              localTimeSlices[i] = new AtomicInteger(0);          }          timeSlices = localTimeSlices;      }        private void print() {          for (AtomicInteger integer : timeSlices) {              System.out.print(integer + "-");          }      }        /**       * 計算當前所在的時間片的位置       */      private int locationIndex() {          long now = SystemClock.now();          //如果當前的key已經超出一整個時間片了,那麼就直接初始化就行了,不用去計算了          if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {              reset();          }            return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);      }        /**       * 增加count個數量       */      public boolean addCount(int count) {          //當前自己所在的位置,是哪個小時間窗          int index = locationIndex();  //        System.out.println("index:" + index);          //然後清空自己前面windowSize到2*windowSize之間的數據格的數據          //譬如1秒分4個窗口,那麼數組共計8個窗口          //當前index為5時,就清空6、7、8、1。然後把2、3、4、5的加起來就是該窗口內的總和          clearFromIndex(index);            int sum = 0;          // 在當前時間片里繼續+1          sum += timeSlices[index].addAndGet(count);          //加上前面幾個時間片          for (int i = 1; i < windowSize; i++) {              sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();          }          System.out.println(sum + "---" + threshold);            lastAddTimestamp = SystemClock.now();            return sum >= threshold;      }        private void clearFromIndex(int index) {          for (int i = 1; i <= windowSize; i++) {              int j = index + i;              if (j >= windowSize * 2) {                  j -= windowSize * 2;              }              timeSlices[j].set(0);          }      }    }
import java.util.concurrent.Executors;  import java.util.concurrent.ScheduledExecutorService;  import java.util.concurrent.ThreadFactory;  import java.util.concurrent.TimeUnit;  import java.util.concurrent.atomic.AtomicLong;    /**   * 用於解決高並發下System.currentTimeMillis卡頓   * @author lry   */  public class SystemClock {        private final int period;        private final AtomicLong now;        private static class InstanceHolder {          private static final SystemClock INSTANCE = new SystemClock(1);      }        private SystemClock(int period) {          this.period = period;          this.now = new AtomicLong(System.currentTimeMillis());          scheduleClockUpdating();      }        private static SystemClock instance() {          return InstanceHolder.INSTANCE;      }        private void scheduleClockUpdating() {          ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {              @Override              public Thread newThread(Runnable runnable) {                  Thread thread = new Thread(runnable, "System Clock");                  thread.setDaemon(true);                  return thread;              }          });          scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);      }        private long currentTimeMillis() {          return now.get();      }        /**       * 用來替換原來的System.currentTimeMillis()       */      public static long now() {          return instance().currentTimeMillis();      }  }

參照程式碼main方法,通過修改每個時間片的時間,窗口數量,閾值,來進行測試。

這就是簡單實現了。