­

Sentinel服务治理工作原理【源码笔记】

  • 2019 年 10 月 4 日
  • 筆記
目录
一、服务治理流程  1.服务治理流程图  2.重要概念  3.示例代码  二、定义流控规则  1.定义规则示例  2.将规则更新到缓存  三、定义受保护的资源  1.示例代码  2.资源上下文  3.构造资源插槽链  四、链条执行与规则判断
一、服务治理流程

通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。

1.服务治理流程图
2.重要概念
Resource

资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。

Context

Context存储当前调用链的元数据

String name: 上下文名称  DefaultNode entranceNode: 当前调用链的入口节点(根节点)  private Entry curEntry:当前调用Entry  private String origin:调用源可以是appId  private final boolean async:是否异步
Slot

每个插槽负责不同的职责,统计流量信息、流控规则校验等。 不同规则的Slot形成插槽链表,逐级向下执行。

Entry

Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。

private long createTime: 创建时间用于统计RT  private Node curNode: 记录当前上下文中资源的统计信息  private Node originNode:调用源的统计信息,调用源可以是appId  protected ResourceWrapper resourceWrapper:资源封装类
Node

Node保存资源的实时统计信息

//每分钟的请求数(通过的+阻塞的)  long totalRequest()  //每分钟通过的请求数  long totalPass()  //每分钟请求成功的数量  long totalSuccess();  //每分钟请求阻塞的数量  long blockRequest()  //每分钟业务异常数量  long totalException()  //通过的QPS  double passQps()  //阻塞的QPS  double blockQps()  //总的QPS  double totalQps()  //成功的QPS  double successQps()  //成功QPS的最大值 到当前时间  double maxSuccessQps()  //异常的QPS  double exceptionQps()  //平均RT  double avgRt()  //最小的RT  double minRt()  //返回当前活动的线程数  int curThreadNum()  //前一秒的block QPS  double previousBlockQps()  //前一秒通过的 QPS  double previousPassQps()  //获取资源的有效统计信息  Map<Long, MetricNode> metrics()  //增加通过请求的数量  void addPassRequest(int count)  //增加rt时间和成功的请求数量  void addRtAndSuccess(long rt, int success)  //增加阻塞的QPS  void increaseBlockQps(int count)  //增加异常QPS  void increaseExceptionQps(int count)  //增加当前线程的数量  void increaseThreadNum()  //减少当前线程的数量  void decreaseThreadNum()  //重置计数器  void reset()
3.示例代码

FlowQpsDemo.java

public static void main(String[] args) throws Exception {      //定义流控规则      initFlowQpsRule();      //打印流控信息      tick();      //模拟多线程触发流控      simulateTraffic();        System.out.println("===== begin to do flow control");      System.out.println("only 20 requests per second can pass");  }

小结:通过流控示例代码分析流Sentinel服务治理的工作流程。

二、定义流控规则
1.定义规则示例
private static void initFlowQpsRule() {      List<FlowRule> rules = new ArrayList<FlowRule>();      FlowRule rule1 = new FlowRule();      // 设置资源名称      rule1.setResource(KEY);      // 阀值      rule1.setCount(20);      // 基于运行指标QPS阀值类型      rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);      // 对调用来源不做限制      rule1.setLimitApp("default");      rules.add(rule1);      // 限流规则加入到缓存      FlowRuleManager.loadRules(rules);  }
2.将规则更新到缓存

FlowRuleManager#成员变量

// 缓存定义的规则  private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();  // 回调的Listener实现  private static final FlowPropertyListener LISTENER = new FlowPropertyListener();  // 用于注册Listener  private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();    static {      //将FlowPropertyListener注册到currentProperty中      currentProperty.addListener(LISTENER);      SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);  }

小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。

FlowRuleManager#loadRules

public static void loadRules(List<FlowRule> rules) {    currentProperty.updateValue(rules);  }

DynamicSentinelProperty#updateValue

@Override  public boolean updateValue(T newValue) {      //...      value = newValue;      for (PropertyListener<T> listener : listeners) {          //回调listener          listener.configUpdate(newValue);      }      return true;  }

FlowRuleManager#FlowPropertyListener#configUpdate

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {  //回调执行方法,更新缓存规则  @Override  public void configUpdate(List<FlowRule> value) {      Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);      if (rules != null) {          flowRules.clear();          flowRules.putAll(rules);      }      RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);  }  }

小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。

三、定义受保护的资源
1.示例代码
while (!stop) {      Entry entry = null;      try {          // 定义受保护的资源          entry = SphU.entry(KEY);          // 获得通行证 允许通过          pass.addAndGet(1);      } catch (BlockException e1) {          // 未获得通行证 被阻塞          block.incrementAndGet();      } catch (Exception e2) {          // biz exception      } finally {          total.incrementAndGet();          if (entry != null) {              entry.exit();          }      }  }

总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。

2.资源上下文

CtSph#entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)  throws BlockException {  //从线程变量获取资源上下文  Context context = ContextUtil.getContext();  if (context instanceof NullContext) {      return new CtEntry(resourceWrapper, null, context);  }  //创建默认资源上下文 名称为:sentinel_default_context  if (context == null) {      context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());  }

小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。

创建上下文Context

CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter

//从当前线程变量中 获取上下文  Context context = contextHolder.get();      if (context == null) {      ]//创建用于统计进来流量的Node      node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);      //Add entrance node.      Constants.ROOT.addChild(node);      //将该资源的Node加入缓存      Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);      newMap.putAll(contextNameNodeMap);      //name为上下文名称      newMap.put(name, node);      contextNameNodeMap = newMap;  }  //构建资源上下文 包含:统计信息的Node  context = new Context(node, name);  context.setOrigin(origin);  //设置到当前线程变量  contextHolder.set(context);

小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。

3.构造资源插槽链

CtSph#entryWithPriority

private Entry entryWithPriority()          throws BlockException {  ...  //构造资源的插槽链  ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);  ...  }

CtSph#lookProcessChain

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {      //获取该资源关联的插槽  ProcessorSlotChain chain = chainMap.get(resourceWrapper);  if (chain == null) {      //加锁      synchronized (LOCK) {          //再次获取该资源关联的插槽          chain = chainMap.get(resourceWrapper);          //插槽链为null创建新链条          if (chain == null) {              // Entry size limit.              //插槽链条的数量最多为6000个              if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {                  return null;              }              //构造插槽链条              chain = SlotChainProvider.newSlotChain();              Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(                  chainMap.size() + 1);              newMap.putAll(chainMap);              newMap.put(resourceWrapper, chain);              chainMap = newMap;          }      }  }  return chain;  }

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() {      // 构造插槽链      if (builder != null) {          return builder.build();      }      // 加载自定义SlotChainBuilder      // 默认选择DefaultProcessorSlotChain      resolveSlotChainBuilder();      // 再次判断为null使用默认的DefaultProcessorSlotChain      if (builder == null) {          ...          builder = new DefaultSlotChainBuilder();      }      return builder.build();  }

DefaultSlotChainBuilder#build

ProcessorSlotChain chain = new DefaultProcessorSlotChain();  chain.addLast(new NodeSelectorSlot());  chain.addLast(new ClusterBuilderSlot());  chain.addLast(new LogSlot());  chain.addLast(new StatisticSlot());  chain.addLast(new SystemSlot());  chain.addLast(new AuthoritySlot());  chain.addLast(new FlowSlot());  chain.addLast(new DegradeSlot());

小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。

四、链条执行与规则判断

CtSph#entryWithPriority

private Entry entryWithPriority(     Entry e = new CtEntry(resourceWrapper, chain, context);      try {          //触发插槽链执行及规则校验          chain.entry();      } catch (BlockException e1) {          //触发流控向上抛出BlockException          e.exit(count, args);          throw e1;      } catch (Throwable e1) {      }      //返回通行证Entry      return e;  }

小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。