你還在擔心rpc介面超時嗎
- 2020 年 7 月 4 日
- 筆記
在使用dubbo時,通常會遇到timeout這個屬性,timeout屬性的作用是:給某個服務調用設置超時時間,如果服務在設置的時間內未返回結果,則會拋出調用超時異常:TimeoutException,在使用的過程中,我們有時會對provider和consumer兩個配置都會設置timeout值,那麼服務調用過程中會以哪個為準?橘子同學今天主要針對這個問題進行分析和擴展。
三種設置方式
以provider配置為例:
#### 方法級別
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl">
<dubbo:method name="test" timeout="10000"/>
</dubbo:service>
#### 介面級別
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
#### 全局級別
<dubbo:service ="10000"/>
優先順序選擇
在dubbo中如果provider和consumer都配置了相同的一個屬性,比如本文分析的timeout,其實它們是有優先順序的,consumer方法配置 > provider方法配置 > consumer介面配置 > provider介面配置 > consumer全局配置 > provider全局配置。所以對於小橘開始的提出的問題就有了結果,會以消費者配置的為準,接下結合源碼來進行解析,其實源碼很簡單,在RegistryDirectory類中將服務列錶轉換為DubboInvlker方法中進行了處理:
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// 重點就是下面這個方法
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
重點就是上面mergeUrl方法,將provider和comsumer的url參數進行了整合,在mergeUrl方法有會調用ClusterUtils.mergeUrl方法進行整合,因為這個方法比較簡單,就是對一些參數進行了整合了,會用consumer參數進行覆蓋,這裡就不分析了,如果感興趣的同學可以去研究一下。
超時處理
在配置設置了超時timeout,那麼程式碼中是如何處理的,這裡咱們在進行一下擴展,分析一下dubbo中是如何處理超時的,在調用服務方法,最後都會調用DubboInvoker.doInvoke方法,咱們就從這個方法開始分析:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
// 非同步處理
if (isAsyncFuture) {
// register resultCallback, sometimes we need the async result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
// 同步處理
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在這個方法中,就以同步模式進行分析,看request方法,request()方法會返回一個DefaultFuture類,在去調用DefaultFuture.get()方法,這裡其實涉及到一個在非同步中實現同步的技巧,咱們這裡不做分析,所以重點就在get()方法里:
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
在調用get()方法時,會去調用get(timeout)這個方法,在這個方法中會傳一個timeout欄位,在和timeout就是咱們配置的那個參數,在這個方法中咱們要關注下面一個程式碼塊:
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 執行緒阻塞
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 在超時時間裡,還沒有結果,則拋出超時異常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
重點看await()方法,會進行阻塞timeout時間,如果阻塞時間到了,則會喚醒往下執行,超時跳出while循環中,判斷是否有結果返回,如果沒有(這個地方要注意:只有有結果返回,或超時才跳出循環中),則拋出超時異常。講到這裡,超時原理基本上其實差不多了,DefaultFuture這個類還有個地方需要注意,在初始化DefaultFuture對象時,會去創建一個超時的延遲任務,延遲時間就是timeout值,在這個延遲任務中也會調用signal()方法喚醒阻塞。
分批調用
不過在調用rpc遠程介面,如果對方的介面不能一次承載返回請求結果能力,我們一般做法是分批調用,將調用一次分成調用多次,然後對每次結果進行匯聚,當然也可以做用利用多執行緒的能力去執行。後面文章小橘將會介紹這種模式,敬請關注哦!
/**
* Description:通用 分批調用工具類
* 場景:
* <pre>
* 比如List參數的size可能為 幾十個甚至上百個
* 如果invoke介面比較慢,傳入50個以上會超時,那麼可以每次傳入20個,分批執行。
* </pre>
* Author: OrangeCsong
*/
public class ParallelInvokeUtil {
private ParallelInvokeUtil() {}
/**
* @param sourceList 源數據
* @param size 分批大小
* @param buildParam 構建函數
* @param processFunction 處理函數
* @param <R> 返回值
* @param <T> 入參\
* @param <P> 構建參數
* @return
*/
public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size,
Function<List<T>, P> buildParam,
Function<P, List<R>> processFunction) {
if (CollectionUtils.isEmpty(sourceList)) {
return new ArrayList<>(0);
}
Preconditions.checkArgument(size > 0, "size大小必須大於0");
return Lists.partition(sourceList, size).stream()
.map(buildParam)
.map(processFunction)
.filter(Objects::nonNull)
.reduce(new ArrayList<>(),
(resultList1, resultList2) -> {
resultList1.addAll(resultList2);
return resultList1;
});
}
}
本文由部落格群發一文多發等運營工具平台 OpenWrite 發布