Java簡單實現滑動窗口
- 2020 年 1 月 13 日
- 筆記
由於最近有一個統計單位時間內某key的訪問次數的需求,譬如每5秒訪問了redis的某key超過100次,就取出該key單獨處理。
這樣的單位時間統計,很明顯我們都知道有個邊界問題,譬如5秒內100次的限制。剛好前4.99秒訪問都是0,最後0.01秒來了100次,5.01秒又來了100次。也就是訪問有明顯的毛刺情況出現,為了弱化這個毛刺情況,我們可以採用滑動窗口。
滑動窗口
滑動窗口的主要原理比較簡單,就是將這個單位時間進行拆分,譬如5秒的統計範圍,我們將它劃分成5個1秒。
當請求進來時,先判斷當前請求屬於這5個1秒的時間片中的哪個,然後將對應的時間片對應的統計值加1,再判斷當前加上前4個時間片的次數總和是否已經超過了設置的閾值。
當時間已經到達第6個時間片時,就把第一個時間片給幹掉,因為無論第一片是多少個統計值,它都不會再參與後續的計算了。
就這樣,隨著時間的推移,統計值就隨著各個時間片的滾動,不斷地進行統計。

具體要將單位時間拆分為多少片,要根據實際情況來決定。當然,毫無疑問的是切分的越小,毛刺現象也越少。系統統計也越準確,隨之就是記憶體佔用會越大,因為你的這個窗口的數組會更大。
程式碼實現思路就是定義好分片數量,每個分片都有一個獨立的計數器,所有的分片合計為一個數組。當請求來時,按照分片規則,判斷請求應該劃分到哪個分片中去。要判斷是否超過閾值,就將前N個統計值相加,對比定義的閾值即可。
程式碼我直接引用別人寫好的了,源程式碼在https://www.iteye.com/blog/go12345-1744728
import java.util.concurrent.atomic.AtomicInteger; /** * 滑動窗口。該窗口同樣的key,都是單執行緒計算。 * * @author wuweifeng wrote on 2019-12-04. */ public class SlidingWindow { /** * 循環隊列,就是裝多個窗口用,該數量是windowSize的2倍 */ private AtomicInteger[] timeSlices; /** * 隊列的總長度 */ private int timeSliceSize; /** * 每個時間片的時長,以毫秒為單位 */ private int timeMillisPerSlice; /** * 共有多少個時間片(即窗口長度) */ private int windowSize; /** * 在一個完整窗口期內允許通過的最大閾值 */ private int threshold; /** * 該滑窗的起始創建時間,也就是第一個數據 */ private long beginTimestamp; /** * 最後一個數據的時間戳 */ private long lastAddTimestamp; public static void main(String[] args) { //1秒一個時間片,窗口共5個 SlidingWindow window = new SlidingWindow(100, 4, 8); for (int i = 0; i < 100; i++) { System.out.println(window.addCount(2)); window.print(); System.out.println("--------------------------"); try { Thread.sleep(102); } catch (InterruptedException e) { e.printStackTrace(); } } } public SlidingWindow(int duration, int threshold) { //超過10分鐘的按10分鐘 if (duration > 600) { duration = 600; } //要求5秒內探測出來的, if (duration <= 5) { this.windowSize = 5; this.timeMillisPerSlice = duration * 200; } else { this.windowSize = 10; this.timeMillisPerSlice = duration * 100; } this.threshold = threshold; // 保證存儲在至少兩個window this.timeSliceSize = windowSize * 2; reset(); } public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) { this.timeMillisPerSlice = timeMillisPerSlice; this.windowSize = windowSize; this.threshold = threshold; // 保證存儲在至少兩個window this.timeSliceSize = windowSize * 2; reset(); } /** * 初始化 */ private void reset() { beginTimestamp = SystemClock.now(); //窗口個數 AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize]; for (int i = 0; i < timeSliceSize; i++) { localTimeSlices[i] = new AtomicInteger(0); } timeSlices = localTimeSlices; } private void print() { for (AtomicInteger integer : timeSlices) { System.out.print(integer + "-"); } } /** * 計算當前所在的時間片的位置 */ private int locationIndex() { long now = SystemClock.now(); //如果當前的key已經超出一整個時間片了,那麼就直接初始化就行了,不用去計算了 if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) { reset(); } return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize); } /** * 增加count個數量 */ public boolean addCount(int count) { //當前自己所在的位置,是哪個小時間窗 int index = locationIndex(); // System.out.println("index:" + index); //然後清空自己前面windowSize到2*windowSize之間的數據格的數據 //譬如1秒分4個窗口,那麼數組共計8個窗口 //當前index為5時,就清空6、7、8、1。然後把2、3、4、5的加起來就是該窗口內的總和 clearFromIndex(index); int sum = 0; // 在當前時間片里繼續+1 sum += timeSlices[index].addAndGet(count); //加上前面幾個時間片 for (int i = 1; i < windowSize; i++) { sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get(); } System.out.println(sum + "---" + threshold); lastAddTimestamp = SystemClock.now(); return sum >= threshold; } private void clearFromIndex(int index) { for (int i = 1; i <= windowSize; i++) { int j = index + i; if (j >= windowSize * 2) { j -= windowSize * 2; } timeSlices[j].set(0); } } }
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 用於解決高並發下System.currentTimeMillis卡頓 * @author lry */ public class SystemClock { private final int period; private final AtomicLong now; private static class InstanceHolder { private static final SystemClock INSTANCE = new SystemClock(1); } private SystemClock(int period) { this.period = period; this.now = new AtomicLong(System.currentTimeMillis()); scheduleClockUpdating(); } private static SystemClock instance() { return InstanceHolder.INSTANCE; } private void scheduleClockUpdating() { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "System Clock"); thread.setDaemon(true); return thread; } }); scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS); } private long currentTimeMillis() { return now.get(); } /** * 用來替換原來的System.currentTimeMillis() */ public static long now() { return instance().currentTimeMillis(); } }
參照程式碼main方法,通過修改每個時間片的時間,窗口數量,閾值,來進行測試。
這就是簡單實現了。