聊聊rocketmq的RemotingSendRequestException

  • 2019 年 12 月 17 日
  • 笔记

本文主要研究一下rocketmq的RemotingSendRequestException

RemotingSendRequestException

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/exception/RemotingSendRequestException.java

public class RemotingSendRequestException extends RemotingException {      private static final long serialVersionUID = 5391285827332471674L;  ​      public RemotingSendRequestException(String addr) {          this(addr, null);      }  ​      public RemotingSendRequestException(String addr, Throwable cause) {          super("send request to <" + addr + "> failed", cause);      }  }
  • RemotingSendRequestException继承了RemotingException,它的构造器要求addr参数

invokeSyncImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {      //......  ​      public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,          final long timeoutMillis)          throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {          final int opaque = request.getOpaque();  ​          try {              final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);              this.responseTable.put(opaque, responseFuture);              final SocketAddress addr = channel.remoteAddress();              channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                  @Override                  public void operationComplete(ChannelFuture f) throws Exception {                      if (f.isSuccess()) {                          responseFuture.setSendRequestOK(true);                          return;                      } else {                          responseFuture.setSendRequestOK(false);                      }  ​                      responseTable.remove(opaque);                      responseFuture.setCause(f.cause());                      responseFuture.putResponse(null);                      log.warn("send a request command to channel <" + addr + "> failed.");                  }              });  ​              RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);              if (null == responseCommand) {                  if (responseFuture.isSendRequestOK()) {                      throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,                          responseFuture.getCause());                  } else {                      throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());                  }              }  ​              return responseCommand;          } finally {              this.responseTable.remove(opaque);          }      }  ​      //......  }
  • invokeSyncImpl方法在responseCommand为null且responseFuture.isSendRequestOK()为false的时候抛出RemotingSendRequestException

invokeAsyncImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {      //......  ​      public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,          final InvokeCallback invokeCallback)          throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {          long beginStartTime = System.currentTimeMillis();          final int opaque = request.getOpaque();          boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);          if (acquired) {              final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);              long costTime = System.currentTimeMillis() - beginStartTime;              if (timeoutMillis < costTime) {                  once.release();                  throw new RemotingTimeoutException("invokeAsyncImpl call timeout");              }  ​              final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);              this.responseTable.put(opaque, responseFuture);              try {                  channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                      @Override                      public void operationComplete(ChannelFuture f) throws Exception {                          if (f.isSuccess()) {                              responseFuture.setSendRequestOK(true);                              return;                          }                          requestFail(opaque);                          log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));                      }                  });              } catch (Exception e) {                  responseFuture.release();                  log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);                  throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);              }          } else {              if (timeoutMillis <= 0) {                  throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");              } else {                  String info =                      String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",                          timeoutMillis,                          this.semaphoreAsync.getQueueLength(),                          this.semaphoreAsync.availablePermits()                      );                  log.warn(info);                  throw new RemotingTimeoutException(info);              }          }      }  ​      //......  }
  • invokeAsyncImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

invokeOnewayImpl

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

public abstract class NettyRemotingAbstract {      //......  ​      public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)          throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {          request.markOnewayRPC();          boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);          if (acquired) {              final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);              try {                  channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                      @Override                      public void operationComplete(ChannelFuture f) throws Exception {                          once.release();                          if (!f.isSuccess()) {                              log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");                          }                      }                  });              } catch (Exception e) {                  once.release();                  log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");                  throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);              }          } else {              if (timeoutMillis <= 0) {                  throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");              } else {                  String info = String.format(                      "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",                      timeoutMillis,                      this.semaphoreOneway.getQueueLength(),                      this.semaphoreOneway.availablePermits()                  );                  log.warn(info);                  throw new RemotingTimeoutException(info);              }          }      }  ​      //......  }
  • invokeOnewayImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

小结

RemotingSendRequestException继承了RemotingException,它的构造器要求addr参数;invokeSyncImpl方法在responseCommand为null且responseFuture.isSendRequestOK()为false的时候抛出RemotingSendRequestException;invokeAsyncImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException;invokeOnewayImpl在acquired为true,channel.writeAndFlush(request)抛出异常时,会将异常转为RemotingSendRequestException

doc