Sentinel服务治理工作原理【源码笔记】
- 2019 年 10 月 4 日
- 筆記
目录
一、服务治理流程 1.服务治理流程图 2.重要概念 3.示例代码 二、定义流控规则 1.定义规则示例 2.将规则更新到缓存 三、定义受保护的资源 1.示例代码 2.资源上下文 3.构造资源插槽链 四、链条执行与规则判断
一、服务治理流程
通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。
1.服务治理流程图

2.重要概念
Resource
资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。
Context
Context存储当前调用链的元数据
String name: 上下文名称 DefaultNode entranceNode: 当前调用链的入口节点(根节点) private Entry curEntry:当前调用Entry private String origin:调用源可以是appId private final boolean async:是否异步
Slot
每个插槽负责不同的职责,统计流量信息、流控规则校验等。 不同规则的Slot形成插槽链表,逐级向下执行。
Entry
Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。
private long createTime: 创建时间用于统计RT private Node curNode: 记录当前上下文中资源的统计信息 private Node originNode:调用源的统计信息,调用源可以是appId protected ResourceWrapper resourceWrapper:资源封装类
Node
Node保存资源的实时统计信息
//每分钟的请求数(通过的+阻塞的) long totalRequest() //每分钟通过的请求数 long totalPass() //每分钟请求成功的数量 long totalSuccess(); //每分钟请求阻塞的数量 long blockRequest() //每分钟业务异常数量 long totalException() //通过的QPS double passQps() //阻塞的QPS double blockQps() //总的QPS double totalQps() //成功的QPS double successQps() //成功QPS的最大值 到当前时间 double maxSuccessQps() //异常的QPS double exceptionQps() //平均RT double avgRt() //最小的RT double minRt() //返回当前活动的线程数 int curThreadNum() //前一秒的block QPS double previousBlockQps() //前一秒通过的 QPS double previousPassQps() //获取资源的有效统计信息 Map<Long, MetricNode> metrics() //增加通过请求的数量 void addPassRequest(int count) //增加rt时间和成功的请求数量 void addRtAndSuccess(long rt, int success) //增加阻塞的QPS void increaseBlockQps(int count) //增加异常QPS void increaseExceptionQps(int count) //增加当前线程的数量 void increaseThreadNum() //减少当前线程的数量 void decreaseThreadNum() //重置计数器 void reset()
3.示例代码
FlowQpsDemo.java
public static void main(String[] args) throws Exception { //定义流控规则 initFlowQpsRule(); //打印流控信息 tick(); //模拟多线程触发流控 simulateTraffic(); System.out.println("===== begin to do flow control"); System.out.println("only 20 requests per second can pass"); }
小结:通过流控示例代码分析流Sentinel服务治理的工作流程。
二、定义流控规则
1.定义规则示例
private static void initFlowQpsRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); // 设置资源名称 rule1.setResource(KEY); // 阀值 rule1.setCount(20); // 基于运行指标QPS阀值类型 rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); // 对调用来源不做限制 rule1.setLimitApp("default"); rules.add(rule1); // 限流规则加入到缓存 FlowRuleManager.loadRules(rules); }
2.将规则更新到缓存
FlowRuleManager#成员变量
// 缓存定义的规则 private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>(); // 回调的Listener实现 private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); // 用于注册Listener private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>(); static { //将FlowPropertyListener注册到currentProperty中 currentProperty.addListener(LISTENER); SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); }
小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。
FlowRuleManager#loadRules
public static void loadRules(List<FlowRule> rules) { currentProperty.updateValue(rules); }
DynamicSentinelProperty#updateValue
@Override public boolean updateValue(T newValue) { //... value = newValue; for (PropertyListener<T> listener : listeners) { //回调listener listener.configUpdate(newValue); } return true; }
FlowRuleManager#FlowPropertyListener#configUpdate
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> { //回调执行方法,更新缓存规则 @Override public void configUpdate(List<FlowRule> value) { Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value); if (rules != null) { flowRules.clear(); flowRules.putAll(rules); } RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules); } }
小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。
三、定义受保护的资源
1.示例代码
while (!stop) { Entry entry = null; try { // 定义受保护的资源 entry = SphU.entry(KEY); // 获得通行证 允许通过 pass.addAndGet(1); } catch (BlockException e1) { // 未获得通行证 被阻塞 block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (entry != null) { entry.exit(); } } }
总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。
2.资源上下文
CtSph#entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //从线程变量获取资源上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { return new CtEntry(resourceWrapper, null, context); } //创建默认资源上下文 名称为:sentinel_default_context if (context == null) { context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); }
小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。
创建上下文Context
CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter
//从当前线程变量中 获取上下文 Context context = contextHolder.get(); if (context == null) { ]//创建用于统计进来流量的Node node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //Add entrance node. Constants.ROOT.addChild(node); //将该资源的Node加入缓存 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); //name为上下文名称 newMap.put(name, node); contextNameNodeMap = newMap; } //构建资源上下文 包含:统计信息的Node context = new Context(node, name); context.setOrigin(origin); //设置到当前线程变量 contextHolder.set(context);
小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。
3.构造资源插槽链
CtSph#entryWithPriority
private Entry entryWithPriority() throws BlockException { ... //构造资源的插槽链 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); ... }
CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { //获取该资源关联的插槽 ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { //加锁 synchronized (LOCK) { //再次获取该资源关联的插槽 chain = chainMap.get(resourceWrapper); //插槽链为null创建新链条 if (chain == null) { // Entry size limit. //插槽链条的数量最多为6000个 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //构造插槽链条 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
SlotChainProvider#newSlotChain
public static ProcessorSlotChain newSlotChain() { // 构造插槽链 if (builder != null) { return builder.build(); } // 加载自定义SlotChainBuilder // 默认选择DefaultProcessorSlotChain resolveSlotChainBuilder(); // 再次判断为null使用默认的DefaultProcessorSlotChain if (builder == null) { ... builder = new DefaultSlotChainBuilder(); } return builder.build(); }
DefaultSlotChainBuilder#build
ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot());
小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。
四、链条执行与规则判断
CtSph#entryWithPriority
private Entry entryWithPriority( Entry e = new CtEntry(resourceWrapper, chain, context); try { //触发插槽链执行及规则校验 chain.entry(); } catch (BlockException e1) { //触发流控向上抛出BlockException e.exit(count, args); throw e1; } catch (Throwable e1) { } //返回通行证Entry return e; }
小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。