【Soul网关探秘】http数据同步-Web端处理变更通知

个人知识库

引言

上一篇,梳理http 数据同步策略的变更通知机制,本篇开始探究配置变更通知到达后, soul-web 端的处理响应。

不同数据变更的通知机制应当是一致的,故本篇以 selector 配置变更通知为切入点进行深入。

通知处理入口

上回我们说到 HttpSyncDataService 的 doLongPolling,在其内部发起通知订阅并接收响应通知:

private void doLongPolling(final String server) {
    ...
    String listenerUrl = server + "/configs/listener";
    ...
    try {
      	// 发起监听请求
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
    } catch (RestClientException e) {
        ...
    }
  	// 处理变更通知
    if (groupJson != null) {
        // fetch group configuration async.
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {
            log.info("Group config changed: {}", Arrays.toString(changedGroups));
            // 获取组配置
          	this.doFetchGroupConfig(server, changedGroups);
        }
    }
}

在收到变更通知时,若存在配置组变更,则按变更组获取相应配置。

获取配置

获取组配置处理如下:

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    ...
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    ...
    try {
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        ...
    }
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    ...
}

内部发起配置获取请求并更新本地缓存。

更新配置组缓存

由 HttpSyncDataService 实现本地缓存更新:

  private boolean updateCacheWithJson(final String json) {
    JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
    JsonObject data = jsonObject.getAsJsonObject("data");
    // if the config cache will be updated?
    return factory.executor(data);
}

转成 Json 对象后交由 DataRefreshFactory 进行处理。

DataRefreshFactory 处理如下:

public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0];
}

调用相应数据刷新类刷新数据。

统一由 AbstractDataRefresh 的 refresh 进行处理:

public Boolean refresh(final JsonObject data) {
    boolean updated = false;
    JsonObject jsonObject = convert(data);
    if (null != jsonObject) {
        ConfigData<T> result = fromJson(jsonObject);
        if (this.updateCacheIfNeed(result)) {
            updated = true;
            refresh(result.getData());
        }
    }
    return updated;
}

先更新本地缓存,再调用子类实现的 refresh。

此处的更新本地缓存处理,由子类 SelectorDataRefresh 的 updateCacheIfNeed 实现:

protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) {
    return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR);
}

向父类 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置组。

父类 AbstractDataRefresh 的 updateCacheIfNeed 处理:

protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
    // 首次初始化缓存
    if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
        return true;
    }
    ResultHolder holder = new ResultHolder(false);
    GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
        // 必须比较最后更新时间
        if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
            ...
            holder.result = true;
            return newVal;
        }
        ...
        return oldVal;
    });
    return holder.result;
}

通过比较新老缓存的 MD5 值来判定是否发生变更,存在变更则更新本地缓存(注意还有最后更新时间判定)。

处理刷新事件

SelectorDataRefresh 的 refresh 实现:

protected void refresh(final List<SelectorData> data) {
    if (CollectionUtils.isEmpty(data)) {
        log.info("clear all selector cache, old cache");
        data.forEach(pluginDataSubscriber::unSelectorSubscribe);
        pluginDataSubscriber.refreshSelectorDataAll();
    } else {
        // update cache for UpstreamCacheManager
        pluginDataSubscriber.refreshSelectorDataAll();
        data.forEach(pluginDataSubscriber::onSelectorSubscribe);
    }
}
  • 若最新数据为空,则循环取消订阅并刷新所有选择器数据,实际是清空选择器缓存。
  • 若最新数据不为空,则刷新所有选择器数据并循环响应选择器订阅事件处理,实际是更新上游服务缓存。

取消订阅

CommonPluginDataSubscriber 实现订阅取消:

public void unSelectorSubscribe(final SelectorData selectorData) {
    subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
}

subscribeDataHandler 对 selectorData 的 delete 处理:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        if (data instanceof PluginData) {
            ...
        } else if (data instanceof SelectorData) {
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                ...
            } else if (dataType == DataEventTypeEnum.DELETE) {
                BaseDataCache.getInstance().removeSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
            }
        } else if (data instanceof RuleData) {
            ...
        }
    });
}

从 BaseDataCache 删除目标选择器数据,并移除选择器。

此处由 DividePluginDataHandler 提供 removeSelector 实现:

public void removeSelector(final SelectorData selectorData) {
    UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
}

根据 selector id 移除缓存的上游服务,注意只是从 UPSTREAM_MAP_TEMP 移除

public void removeByKey(final String key) {
    UPSTREAM_MAP_TEMP.remove(key);
}

刷新数据

CommonPluginDataSubscriber 实现数据刷新:

public void refreshSelectorDataAll() {
    BaseDataCache.getInstance().cleanSelectorData();
}

注意这里的 refresh all 实际是做的 clean 操作。

BaseDataCache 的 cleanSelectorData 处理:

public void cleanSelectorData() {
    SELECTOR_MAP.clear();
}

直接清除 SELECTOR_MAP 所有数据。

响应订阅

CommonPluginDataSubscriber 实现订阅响应:

public void onSelectorSubscribe(final SelectorData selectorData) {
    subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}

subscribeDataHandler 对 selectorData 的 update 处理:

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        if (data instanceof PluginData) {
            ...
        } else if (data instanceof SelectorData) {
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                BaseDataCache.getInstance().cacheSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                ...
            }
        } else if (data instanceof RuleData) {
            ...
        }
    });
}

缓存选择器数据到 BaseDataCache,并处理选择器。

此处由 DividePluginDataHandler 提供 handlerSelector 实现:

public void handlerSelector(final SelectorData selectorData) {
    UpstreamCacheManager.getInstance().submit(selectorData);
}

提交选择器数据到 UpstreamCacheManager。

UpstreamCacheManager 的 submit 处理:

public void submit(final SelectorData selectorData) {
    final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
    if (null != upstreamList && upstreamList.size() > 0) {
        UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
        UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
    } else {
        UPSTREAM_MAP.remove(selectorData.getId());
        UPSTREAM_MAP_TEMP.remove(selectorData.getId());
    }
}

根据 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。

总结

本篇梳理和分析了配置变更通知到达后 soul-web 端的处理流程,最终处理主要是更新本地配置缓存以及维护上游服务散列表。

soul-web收到变更通知后处理流程如下:

soul-web 端收到响应

  • 若配置组数据存在变更,则发起获取配置请求获取最新配置信息
    • 更新配置组缓存
    • 循环处理配置数据刷新事件
      • 若最新配置数据为空,则删除本地配置数据并移除上游服务
      • 若最新配置数据不为空,则缓存配置组数据并更新上游服务
  • 若配置组数据无变更,不作处理