7.Sentinel源碼分析—Sentinel是怎麼和控制台通訊的?
- 2019 年 10 月 3 日
- 筆記
這裡會介紹:
- Sentinel會使用多執行緒的方式實現一個類Reactor的IO模型
- Sentinel會使用心跳檢測來觀察控制台是否正常
Sentinel源碼解析系列:
1.Sentinel源碼分析—FlowRuleManager載入規則做了什麼?
2. Sentinel源碼分析—Sentinel是如何進行流量統計的?
3. Sentinel源碼分析— QPS流量控制是如何實現的?
4.Sentinel源碼分析— Sentinel是如何做到降級的?
5.Sentinel源碼分析—Sentinel如何實現自適應限流?
6.Sentinel源碼分析—Sentinel是如何動態載入配置限流的?
在看我的這篇文章之前大家可以先看一下官方的這篇文章:https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0。介紹了控制台怎麼使用,以及客戶端要怎麼設置才能被收集數據。
客戶端會在InitExecutor調用doInit方法中與控制台建立通訊,所以我們直接看doInit方法:
InitExecutor#doInit
public static void doInit() { //InitExecutor只會初始化一次,並且初始化失敗會退出 if (!initialized.compareAndSet(false, true)) { return; } try { //通過spi載入InitFunc子類 ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class); List<OrderWrapper> initList = new ArrayList<OrderWrapper>(); for (InitFunc initFunc : loader) { RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName()); //給所有的initFunc排序,按@InitOrder從小到大進行排序 //然後封裝成OrderWrapper對象 insertSorted(initList, initFunc); } for (OrderWrapper w : initList) { w.func.init(); RecordLog.info(String.format("[InitExecutor] Executing %s with order %d", w.func.getClass().getCanonicalName(), w.order)); } } catch (Exception ex) { RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex); ex.printStackTrace(); } catch (Error error) { RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error); error.printStackTrace(); } }
因為這裡我們引入了sentinel-transport-simple-http
模組,所以使用spi載入InitFunc的子類的時候會載入三個子類實例,分別是:CommandCenterInitFunc、HeartbeatSenderInitFunc、MetricCallbackInit。
然後會遍歷loader,根據@InitOrder的大小進行排序,並封裝成OrderWrapper放入到initList中。
所以initList裡面的對象順序是:
- CommandCenterInitFunc
- HeartbeatSenderInitFunc
- MetricCallbackInit
然後遍歷initList依次調用init方法。
所以下面我們來看一下這三個實現類的init方法做了什麼:
CommandCenterInitFunc
CommandCenterInitFunc#init
public void init() throws Exception { //獲取commandCenter對象 CommandCenter commandCenter = CommandCenterProvider.getCommandCenter(); if (commandCenter == null) { RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter"); return; } //調用SimpleHttpCommandCenter的beforeStart方法 //用來設置CommandHandler的實現類 commandCenter.beforeStart(); commandCenter.start(); RecordLog.info("[CommandCenterInit] Starting command center: " + commandCenter.getClass().getCanonicalName()); }
這個方法裡面的所有操作都是針對CommandCenter來進行的,所以我們先來看看CommandCenterProvider這個類。
CommandCenterProvider
static { //初始化commandCenter對象 resolveInstance(); } private static void resolveInstance() { //獲取SpiOrder更大的子類實現類 CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class); if (resolveCommandCenter == null) { RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found"); } else { commandCenter = resolveCommandCenter; RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass() .getCanonicalName()); } }
CommandCenterProvider會在首次初始化的時候調用resolveInstance方法。在resolveInstance方法裡面會調用SpiLoader.loadHighestPriorityInstance
來獲取CommandCenter,這裡獲取的是SimpleHttpCommandCenter這個實例,loadHighestPriorityInstance方法具體的實現非常簡單,我就不去分析了。
然後將commandCenter賦值SimpleHttpCommandCenter實例。
所以CommandCenterProvider.getCommandCenter()
方法返回的是SimpleHttpCommandCenter實例。
然後調用SimpleHttpCommandCenter的beforeStart方法。
SimpleHttpCommandCenter#beforeStart
public void beforeStart() throws Exception { // Register handlers //調用CommandHandlerProvider的namedHandlers方法 //獲取CommandHandler的spi中設置的實現類 Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers(); //將handlers中的數據設置到handlerMap中 registerCommands(handlers); }
這個方法首先會調用CommandHandlerProvider的namedHandlers中獲取所有的CommandHandler實現類。
CommandHandlerProvider#namedHandlers
private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class); public Map<String, CommandHandler> namedHandlers() { Map<String, CommandHandler> map = new HashMap<String, CommandHandler>(); for (CommandHandler handler : serviceLoader) { //獲取實現類CommandMapping註解的name屬性 String name = parseCommandName(handler); if (!StringUtil.isEmpty(name)) { map.put(name, handler); } } return map; }
這個類會通過spi先載入CommandHandler的實現類,然後將實現類按註解上面的name屬性放入到map裡面去。
CommandHandler的實現類是用來和控制台進行交互的處理類,負責處理。
這也是策略模式的一種應用,根據map裡面的不同策略來做不同的處理,例如SendMetricCommandHandler是用來統計調用資訊然後發送給控制台用的,ModifyRulesCommandHandler是用來做實時修改限流策略的處理的等等。
然後我們再回到CommandCenterInitFunc中,繼續往下走,調用commandCenter.start()
方法。
SimpleHttpCommandCenter#start
public void start() throws Exception { //獲取當前機器的cpu執行緒數 int nThreads = Runtime.getRuntime().availableProcessors(); //創建一個cpu執行緒數大小的固定執行緒池,用來做業務執行緒池用 this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { CommandCenterLog.info("EventTask rejected"); throw new RejectedExecutionException(); } }); Runnable serverInitTask = new Runnable() { int port; { try { //獲取port port = Integer.parseInt(TransportConfig.getPort()); } catch (Exception e) { port = DEFAULT_PORT; } } @Override public void run() { boolean success = false; //創建一個ServerSocket ServerSocket serverSocket = getServerSocketFromBasePort(port); if (serverSocket != null) { CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort()); socketReference = serverSocket; executor.submit(new ServerThread(serverSocket)); success = true; port = serverSocket.getLocalPort(); } else { CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work"); } if (!success) { port = PORT_UNINITIALIZED; } TransportConfig.setRuntimePort(port); //關閉執行緒池 executor.shutdown(); } }; new Thread(serverInitTask).start(); }
- 這個方法會創建一個固定大小的業務執行緒池
- 創建一個serverInitTask,裡面負責建立serverSocket然後用executor去創建一個ServerThread非同步執行serverSocket
- executor用完之後會在serverInitTask裡面調用executor的shutdown方法去關閉執行緒池
其中executor是一個單執行緒的執行緒池:
private ExecutorService executor = Executors.newSingleThreadExecutor( new NamedThreadFactory("sentinel-command-center-executor"));
ServerThread是SimpleHttpCommandCenter的內部類:
public void run() { while (true) { Socket socket = null; try { //建立連接 socket = this.serverSocket.accept(); //默認的超時時間是3s setSocketSoTimeout(socket); HttpEventTask eventTask = new HttpEventTask(socket); //使用業務執行緒非同步處理 bizExecutor.submit(eventTask); } catch (Exception e) { CommandCenterLog.info("Server error", e); if (socket != null) { try { socket.close(); } catch (Exception e1) { CommandCenterLog.info("Error when closing an opened socket", e1); } } try { // In case of infinite log. Thread.sleep(10); } catch (InterruptedException e1) { // Indicates the task should stop. break; } } } }
run方法會使用構造器傳入的serverSocket建立連接後設置超時時間,封裝成HttpEventTask類,然後使用上面創建的bizExecutor非同步執行任務。
HttpEventTask是Runnable的實現類,所以調用bizExecutor的submit的時候會調用其中的run方法使用socket與控制台進行交互。
HttpEventTask#run
public void run() { .... // Validate the target command. //獲取commandName String commandName = HttpCommandUtils.getTarget(request); if (StringUtil.isBlank(commandName)) { badRequest(printWriter, "Invalid command"); return; } // Find the matching command handler. //根據commandName獲取處理器名字 CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { //調用處理器結果,然後返回給控制台 CommandResponse<?> response = commandHandler.handle(request); handleResponse(response, printWriter, outputStream); } .... } catch (Throwable e) { .... } finally { .... } }
HttpEventTask的run方法很長,但是很多都是有關輸入輸出流的,我們不關心,所以省略。只需要知道會把request請求最後轉換成一個控制台發過來的指令,然後通過SimpleHttpCommandCenter調用getHandler得到處理器,然後處理數據就行了。
所以這個整個的處理流程就是:
通過這樣的一個處理流程,然後實現了類似reactor的一個處理流程。
SimpleHttpCommandCenter#getHandler
public static CommandHandler getHandler(String commandName) { return handlerMap.get(commandName); }
handlerMap裡面的數據是通過前面我們分析的調用beforeStart方法設置進來的。
然後通過commandName獲取對應的控制台,例如:控制台發送過來metric指令,那麼就會對應的調用SendMetricCommandHandler的handle方法來處理控制台的指令。
我們來看看SendMetricCommandHandler是怎麼處理返回統計數據的:
SendMetricCommandHandler#handle
public CommandResponse<String> handle(CommandRequest request) { // Note: not thread-safe. if (searcher == null) { synchronized (lock) { //獲取應用名 String appName = SentinelConfig.getAppName(); if (appName == null) { appName = ""; } if (searcher == null) { //用來找metric文件, searcher = new MetricSearcher(MetricWriter.METRIC_BASE_DIR, MetricWriter.formMetricFileName(appName, PidUtil.getPid())); } } } //獲取請求的開始結束時間和最大的行數 String startTimeStr = request.getParam("startTime"); String endTimeStr = request.getParam("endTime"); String maxLinesStr = request.getParam("maxLines"); //用來確定資源 String identity = request.getParam("identity"); long startTime = -1; int maxLines = 6000; if (StringUtil.isNotBlank(startTimeStr)) { startTime = Long.parseLong(startTimeStr); } else { return CommandResponse.ofSuccess(""); } List<MetricNode> list; try { // Find by end time if set. if (StringUtil.isNotBlank(endTimeStr)) { long endTime = Long.parseLong(endTimeStr); //根據開始結束時間找到統計數據 list = searcher.findByTimeAndResource(startTime, endTime, identity); } else { if (StringUtil.isNotBlank(maxLinesStr)) { maxLines = Integer.parseInt(maxLinesStr); } maxLines = Math.min(maxLines, 12000); list = searcher.find(startTime, maxLines); } } catch (Exception ex) { return CommandResponse.ofFailure(new RuntimeException("Error when retrieving metrics", ex)); } if (list == null) { list = new ArrayList<>(); } //如果identity為空就加入CPU負載和系統負載 if (StringUtil.isBlank(identity)) { addCpuUsageAndLoad(list); } StringBuilder sb = new StringBuilder(); for (MetricNode node : list) { sb.append(node.toThinString()).append("n"); } return CommandResponse.ofSuccess(sb.toString()); }
我們在1.Sentinel源碼分析—FlowRuleManager載入規則做了什麼?里介紹了Metric統計資訊會在MetricTimerListener的run方法中定時寫入文件中去。
所以handle方法裡面主要是如何根據請求的開始結束時間,資源名來獲取磁碟的文件,然後返回磁碟的統計資訊,並記錄一下當前的統計資訊,防止重複發送統計數據到控制台。
HeartbeatSenderInitFunc
HeartbeatSenderInitFunc主要是用來做心跳執行緒使用的,定期的和控制台進行心跳連接。
HeartbeatSenderInitFunc#init
public void init() { //獲取HeartbeatSender的實現類 HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender(); if (sender == null) { RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded"); return; } //創建一個corepoolsize為2,maximumPoolSize為最大的執行緒池 initSchedulerIfNeeded(); //獲取心跳間隔時間,默認10s long interval = retrieveInterval(sender); //設置間隔心跳時間 setIntervalIfNotExists(interval); //開啟一個定時任務,每隔interval時間發送一個心跳 scheduleHeartbeatTask(sender, interval); }
- 首先會調用HeartbeatSenderProvider.getHeartbeatSender方法,裡面會根據spi創建實例,返回一個SimpleHttpHeartbeatSender實例。
- 調用initSchedulerIfNeeded方法創建一個corepoolsize為2的執行緒池
- 獲取心跳間隔時間,如果沒有設置,那麼是10s
- 調用scheduleHeartbeatTask方法開啟一個定時執行緒調用。
我們來看看scheduleHeartbeatTask方法:
HeartbeatSenderInitFunc#scheduleHeartbeatTask
private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) { pool.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { sender.sendHeartbeat(); } catch (Throwable e) { RecordLog.warn("[HeartbeatSender] Send heartbeat error", e); } } }, 5000, interval, TimeUnit.MILLISECONDS); RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: " + sender.getClass().getCanonicalName()); }
默認的情況,創建的這個定時任務會每隔10s調用一次SimpleHttpHeartbeatSender的sendHeartbeat方法。
SimpleHttpHeartbeatSender#sendHeartbeat
public boolean sendHeartbeat() throws Exception { if (TransportConfig.getRuntimePort() <= 0) { RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat"); return false; } //獲取控制台的ip和埠等資訊 InetSocketAddress addr = getAvailableAddress(); if (addr == null) { return false; } //設置http調用的ip和埠,還有訪問的url SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH); //獲取版本號,埠等資訊 request.setParams(heartBeat.generateCurrentMessage()); try { //發送post請求 SimpleHttpResponse response = httpClient.post(request); if (response.getStatusCode() == OK_STATUS) { return true; } } catch (Exception e) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e); } return false; }
這個心跳檢測的方法就寫的很簡單了,通過Dcsp.sentinel.dashboard.server預先設置好的ip和埠號發送post請求到控制台,然後檢測是否返回200,如果是則說明控制台正常,否則進行異常處理。