2. Sentinel源碼分析—Sentinel是如何進行流量統計的?
- 2019 年 10 月 3 日
- 筆記
這一篇我還是繼續上一篇沒有講完的內容,先上一個例子:
private static final int threadCount = 100; public static void main(String[] args) { initFlowRule(); for (int i = 0; i < threadCount; i++) { Thread entryThread = new Thread(new Runnable() { @Override public void run() { while (true) { Entry methodA = null; try { TimeUnit.MILLISECONDS.sleep(5); methodA = SphU.entry("methodA"); } catch (BlockException e1) { // Block exception } catch (Exception e2) { // biz exception } finally { if (methodA != null) { methodA.exit(); } } } } }); entryThread.setName("working thread"); entryThread.start(); } } private static void initFlowRule() { List<FlowRule> rules = new ArrayList<FlowRule>(); FlowRule rule1 = new FlowRule(); rule1.setResource("methodA"); // set limit concurrent thread for 'methodA' to 20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD); rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); }
SphU#entry
我先把例子放上來
Entry methodA = null; try { methodA = SphU.entry("methodA"); // dosomething } catch (BlockException e1) { block.incrementAndGet(); } catch (Exception e2) { // biz exception } finally { total.incrementAndGet(); if (methodA != null) { methodA.exit(); } }
我們先進入到entry方法裡面:
SphU#entry
public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
這個方法裡面會調用Env的sph靜態方法,我們進入到Env裡面看看
public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
這個方法初始化的時候會調用InitExecutor.doInit()
InitExecutor#doInit
public static void doInit() { //InitExecutor只會初始化一次,並且初始化失敗會退出 if (!initialized.compareAndSet(false, true)) { return; } try { //通過spi載入InitFunc子類,默認是MetricCallbackInit ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class); List<OrderWrapper> initList = new ArrayList<OrderWrapper>(); for (InitFunc initFunc : loader) { RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName()); //由於這裡只有一個loader裡面只有一個子類,那麼直接就返回initList裡面包含一個元素的集合 insertSorted(initList, initFunc); } for (OrderWrapper w : initList) { //這裡調用MetricCallbackInit的init方法 w.func.init(); RecordLog.info(String.format("[InitExecutor] Executing %s with order %d", w.func.getClass().getCanonicalName(), w.order)); } } catch (Exception ex) { RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex); ex.printStackTrace(); } catch (Error error) { RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error); error.printStackTrace(); } }
這個方法主要是通過spi載入InitFunc 的子類,默認是MetricCallbackInit。
然後會將MetricCallbackInit封裝成OrderWrapper實例,然後遍歷,調用
MetricCallbackInit的init方法:
MetricCallbackInit#init
public void init() throws Exception { //添加回調函數 //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(), new MetricEntryCallback()); //key是com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(), new MetricExitCallback()); }
這個init方法就是註冊了兩個回調實例MetricEntryCallback和MetricExitCallback。
然後會通過調用Env.sph.entry
會最後調用到CtSph的entry方法:
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //這裡name是Resource,type是out StringResourceWrapper resource = new StringResourceWrapper(name, type); //count是1 ,args是一個空數組 return entry(resource, count, args); }
這個方法會將resource和type封裝成StringResourceWrapper實例,然後調用entry重載方法追蹤到CtSph的entryWithPriority方法。
//這裡傳入得參數count是1,prioritized=false,args是容量為1的空數組 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //獲取當前執行緒的上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //為空的話,創建一個默認的context if (context == null) { //1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//這裡會返回false return new CtEntry(resourceWrapper, null, context); } //2 //創建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ //如果超過了插槽的最大數量,那麼會返回null if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { //3 //調用責任鏈 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
這個方法是最核心的方法,主要做了三件事:
- 如果context為null則創建一個新的
- 通過責任鏈方式創建功能插槽
- 調用責任鏈插槽
在講創建context之前我們先看一下ContextUtil這個類初始化的時候會做什麼
ContextUtil
/** * Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name. */ private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); static { // Cache the entrance node for default context. initDefaultContext(); } private static void initDefaultContext() { String defaultContextName = Constants.CONTEXT_DEFAULT_NAME; //初始化一個sentinel_default_context,type為in的隊形 EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null); //Constants.ROOT會初始化一個name是machine-root,type=IN的對象 Constants.ROOT.addChild(node); //所以現在map裡面有一個key=CONTEXT_DEFAULT_NAME的對象 contextNameNodeMap.put(defaultContextName, node); }
ContextUtil在初始化的時候會先調用initDefaultContext方法。通過Constants.ROOT
創建一個root節點,然後將創建的node作為root的子節點入隊,然後將node節點put到contextNameNodeMap中
結構如下:
Constants.ROOT: machine-root(EntryType#IN) / / sentinel_default_context(EntryType#IN)
現在我們再回到entryWithPriority方法中:
if (context == null) {//1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); }
如果context為空,那麼會調用MyContextUtil.myEnter
創建一個新的context,這個方法最後會調用到ContextUtil.trueEnter
方法中進行創建。
protected static Context trueEnter(String name, String origin) { Context context = contextHolder.get(); if (context == null) { Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; DefaultNode node = localCacheNameMap.get(name); if (node == null) { //如果為null的話,檢查contextNameNodeMap的size是不是超過2000 if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 重複initDefaultContext方法的內容 try { LOCK.lock(); node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. Constants.ROOT.addChild(node); Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } context = new Context(node, name); context.setOrigin(origin); contextHolder.set(context); } return context; }
在trueEnter方法中會做一個校驗,如果contextNameNodeMap中的數量已經超過了2000,那麼會返回一個NULL_CONTEXT。由於我們在initDefaultContext中已經初始化過了node節點,所以這個時候直接根據name獲取node節點放入到contextHolder中。
創建完了context之後我們再回到entryWithPriority方法中繼續往下走:
//創建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
通過調用lookProcessChain方法會創建功能插槽
CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { //根據resourceWrapper初始化插槽 ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit.最大插槽數量為6000 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //初始化新的插槽 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
這裡會調用SlotChainProvider.newSlotChain
進行插槽的初始化。
SlotChainProvider#newSlotChain
public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } //根據spi初始化slotChainBuilder,默認是DefaultSlotChainBuilder resolveSlotChainBuilder(); if (slotChainBuilder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } return slotChainBuilder.build(); }
默認調用DefaultSlotChainBuilder的build方法進行初始化
DefaultSlotChainBuilder#build
public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //創建Node節點 chain.addLast(new NodeSelectorSlot()); //用於構建資源的 ClusterNode chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); //用於統計實時的調用數據 chain.addLast(new StatisticSlot()); //用於對入口的資源進行調配 chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); //用於限流 chain.addLast(new FlowSlot()); //用於降級 chain.addLast(new DegradeSlot()); return chain; }
DefaultProcessorSlotChain裡面會創建一個頭節點,然後把其他節點通過addLast串成一個鏈表:
最後我們再回到CtSph的entryWithPriority方法中,往下走調用chain.entry
方法觸發調用鏈。
Context
在往下看Slot插槽之前,我們先總結一下Context是怎樣的一個結構:
在Sentinel中,所有的統計操作都是基於context來進行的。context會通過ContextUtil的trueEnter方法進行創建,會根據context的不同的name來組裝不同的Node來實現數據的統計。
在經過NodeSelectorSlot的時候會根據傳入的不同的context的name欄位來獲取不同的DefaultNode對象,然後設置到context的curEntry實例的curNode屬性中。
NodeSelectorSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Build invocation tree ((DefaultNode) context.getLastNode()).addChild(node); } } } //設置到context的curEntry實例的curNode屬性中 context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
然後再經過ClusterBuilderSlot槽位在初始化的時候會初始化一個靜態的全局clusterNodeMap用來記錄所有的ClusterNode,維度是ResourceWrapper。每次調用entry方法的時候會先去全局的clusterNodeMap,找不到就會創建一個新的clusterNode,放入到node的ClusterNode屬性中,用來統計ResourceWrapper維度下面的所有數據。
//此變數是靜態的,所以只會初始化一次,存有所有的ResourceWrapper維度下的數據 private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { synchronized (lock) { if (clusterNode == null) { // Create the cluster node. clusterNode = new ClusterNode(); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); }
StatisticSlot
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { //先直接往下調用,如果沒有報錯則進行統計 // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); //當前執行緒數加1 // Request passed, add thread count and pass count. node.increaseThreadNum(); //通過的請求加上count node.addPassRequest(count); ... } catch (PriorityWaitException ex) { node.increaseThreadNum(); ... } catch (BlockException e) { //設置錯誤資訊 // Blocked, set block exception to current entry. context.getCurEntry().setError(e); ... //設置被阻塞的次數 // Add block count. node.increaseBlockQps(count); ... throw e; } catch (Throwable e) { // Unexpected error, set error to current entry. context.getCurEntry().setError(e); //設置異常的次數 // This should not happen. node.increaseExceptionQps(count); ... throw e; } }
這段程式碼中,我把不相關的程式碼都省略了,不影響我們的主流程。
在entry方法裡面,首先是往下繼續調用,根據其他的節點的情況來進行統計,比如拋出異常,那麼就統計ExceptionQps,被阻塞那麼就統計BlockQps,直接通過,那麼就統計PassRequest。
我們先看一下執行緒數是如何統計的:node.increaseThreadNum()
DefaultNode#increaseThreadNum
我們先看一下DefaultNode的繼承關係:
public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); }
所以super.increaseThreadNum
是調用到了父類的increaseThreadNum方法。
this.clusterNode.increaseThreadNum()
這句程式碼和super.increaseThreadNum
是一樣的使用方式,所以看看StatisticNode的increaseThreadNum方法就好了
StatisticNode#increaseThreadNum
private LongAdder curThreadNum = new LongAdder(); public void decreaseThreadNum() { curThreadNum.increment(); }
這個方法很簡單,每次都直接使用LongAdder的api加1就好了,最後會在退出的時候減1,使用LongAdder也保證了原子性。
如果請求通過的時候會繼續往下調用node.addPassRequest
:
DefaultNode#addPassRequest
public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); }
這句程式碼也是調用了StatisticNode的addPassRequest方法進行統計的。
StatisticNode#addPassRequest
public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
這段程式碼裡面有兩個調用,一個是按分鐘統計的,一個是按秒統計的。因為我們這裡是使用的FlowRuleManager所以是會記錄按分鐘統計的。具體是怎麼初始化,以及怎麼列印統計日誌的可以看看我上一篇分析:1.Sentinel源碼分析—FlowRuleManager載入規則做了什麼?,我這裡不再贅述。
所以我們直接看看rollingCounterInMinute.addPass(count)
這句程式碼就好了,這句程式碼會直接調用ArrayMetric的addPass方法。
ArrayMetric#addPass
public void addPass(int count) { //獲取當前的時間窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); //窗口內的pass加1 wrap.value().addPass(count); }
這裡會首先調用currentWindow獲取當前的時間窗口WindowWrap,然後調用調用窗口內的MetricBucket的addPass方法加1,我繼續拿我上一篇文章的圖過來說明:
我面來到MetricBucket的addPass方法:
MetricBucket#addPass
public void addPass(int n) { add(MetricEvent.PASS, n); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; }
addPass方法會使用枚舉類然後將counters數組內的pass槽位的值加n;counters數組是LongAdder數組,所以也不會有執行緒安全問題。
node.increaseBlockQps和node.increaseExceptionQps程式碼也是一樣的,大家可以自行去看看。
FlowSlot
FlowSlot可以根據預先設置的規則來判斷一個請求是否應該被通過。
FlowSlot
private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); }
FlowSlot在實例化的時候會設置一個規則檢查器,然後在調用entry方法的時候會調用規則檢查器的checkFlow方法
我們進入到FlowRuleChecker的checkFlow 方法中:
FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //返回FlowRuleManager裡面註冊的所有規則 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //如果當前的請求不能通過,那麼就拋出FlowException異常 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } };
checkFlow這個方法就是過去所有的規則然後根據規則進行過濾。主要的過濾操作是在canPassCheck中進行的。
FlowRuleChecker#canPassCheck
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //如果沒有設置limitapp,那麼不進行校驗,默認會給個defualt String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } //集群模式 if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } //本地模式 return passLocalCheck(rule, context, node, acquireCount, prioritized); }
這個方法首先會校驗limitApp,然後判斷是集群模式還是本地模式,我們這裡暫時分析本地模式。
FlowRuleChecker#passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //節點選擇 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //根據設置的規則來攔截 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
本地模式中,首先會調用selectNodeByRequesterAndStrategy進行節點選擇,根據不同的模式選擇不同的節點,然後調用規則控制器的canPass方法進行攔截。
FlowRuleChecker#selectNodeByRequesterAndStrategy
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // The limit app should not be empty. String limitApp = rule.getLimitApp(); //關係限流策略 int strategy = rule.getStrategy(); String origin = context.getOrigin(); //origin不為`default` or `other`,並且limitApp和origin相等 if (limitApp.equals(origin) && filterOrigin(origin)) {//1 if (strategy == RuleConstant.STRATEGY_DIRECT) { // Matches limit origin, return origin statistic node. return context.getOriginNode(); } //關係限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2 if (strategy == RuleConstant.STRATEGY_DIRECT) { //這裡返回ClusterNode,表示所有應用對該資源的所有請求情況 // Return the cluster node. return node.getClusterNode(); } //關係限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3 if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } //關係限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } return null; }
這個方法主要是用來根據控制根據不同的規則,獲取不同的node進行數據的統計。
- 在標記1中表示,如果流控規則配置了來源應用且不是"default"或者"other"這種特殊值,那麼這種時候該規則就只對配置的來源應用生效。
- 在標記2中表示,limitApp是"default",代表針對所有應用進行統計。
- 標記7中,這個是"other"值的處理,假設當前請求來源不在當前規則的limitApp中,則進行下面的處理。
我這裡引用官方文檔的一段話進行解釋:
default:表示不區分調用者,來自任何調用者的請求都將進行限流統計。如果這個資源名的調用總和超過了這條規則定義的閾值,則觸發限流。 {some_origin_name}:表示針對特定的調用者,只有來自這個調用者的請求才會進行流量控制。例如 NodeA 配置了一條針對調用者caller1的規則,那麼當且僅當來自 caller1 對 NodeA 的請求才會觸發流量控制。 other:表示針對除 {some_origin_name} 以外的其餘調用方的流量進行流量控制。例如,資源NodeA配置了一條針對調用者 caller1 的限流規則,同時又配置了一條調用者為 other 的規則,那麼任意來自非 caller1 對 NodeA 的調用,都不能超過 other 這條規則定義的閾值 同一個資源名可以配置多條規則,規則的生效順序為:{some_origin_name} > other > default
然後返回到passLocalCheck方法中,繼續往下走,調用rule.getRater()
,我們這裡沒有指定特殊的rater,所以返回的是DefaultController。
DefaultController#canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) { //判斷是限流還是限制並發數量,然後獲取流量或並發數量 int curCount = avgUsedTokens(node); //如果兩者相加大於限定的並發數 if (curCount + acquireCount > count) { ... return false; } return true; }
這裡首先調用avgUsedTokens,根據grade判斷當前的規則是QPS限流還是執行緒數限流,如果兩者之和大於count,那麼返回false。
返回false之後會回到FlowRuleChecker的checkFlow方法,拋出FlowException異常。
到這裡Sentinel的主流程就分析完畢了。