JDK HttpClient 單次請求的生命周期
HttpClient 單次請求的生命周期
1. 簡述
上篇我們通過流程圖和時序圖,簡要了解了HttpClient的請求處理流程,並重點認識了其對用戶請求的修飾和對一次用戶請求可能引發的多重請求——響應交換的處理。本篇,我們以最基礎的Http1.1為例,深入單次請求的處理過程,見證其完整的生命歷程。
本篇是HttpClient源碼分析的核心。我們將看到連接的管理和復用、channel的讀寫、響應式流的運用。
本文所述的HttpClient都指代JDK11開始內置的HttpClient及相關類,源碼分析基於JAVA 17。閱讀本文需要理解Reactive Streams規範及對應的JAVA Flow api的原理和使用。
2. uml圖
為了方便,我們再次回顧HttpClient發送請求的流程圖和時序圖:
以下是本篇分析的重點類:Http1Exchange的uml類圖:
3. Http連接的建立、復用和降級
在單次請求的過程中,首先進行的操作就是Http連接的建立。我們主要關注Http1連接。連接的過程可以簡要概括如下:
- 根據請求類型實例化不同的ExchangeImpl,負責具體的請求——響應過程
- 根據交換類型決定要實例化的HTTP連接的版本;根據請求類型從連接池中嘗試獲取對應路由的已有連接
- 連接池中獲取不到連接,實例化對應的Http連接(在最基本的Http1.1連接中,會開啟NIOSocket通道並包裹到管道中)
- 如果初始化的連接實例是Http2,而協商發現不支援,則降級為建立Http1連接
我們將看到,HttpClient在Http1、Http2兩個不同版本協議間切換自如。根據是否進行SSL加密,HttpClient會實例化HttpConnection的不同子類,而如果是Http2連接,那麼一個組合了該子類實例的Http2Connection實例將會負責對Http2連接的管理。否則,則是HttpConnection的子類自身管理連接。
3.1 調用流程及連接的建立和復用
接下來是具體的分析。我們回顧上篇分析的MultiExchange的responseAsyncImpl方法,該方法負責把用戶請求過濾後,委託給Exchange類處理,自己接受響應並處理多重請求。
//處理一次用戶請求帶來的一個或多個請求的過程,返回一個最終響應
private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
//省略………………
Exchange<T> exch = getExchange();
// 2. get response
// 由單個交換對象(Exhange)負責處理當前的單個請求,非同步返迴響應
//這是我們即將分析的方法
cf = exch.responseAsync()
.thenCompose((Response response) -> {
//省略……
}
return cf;
}
現在,我們將關注點轉向Exchange類對一次請求——響應的處理。我們關注Exchange::responseAsync方法:
/**
* One request/response exchange (handles 100/101 intermediate response also).
* depth field used to track number of times a new request is being sent
* for a given API request. If limit exceeded exception is thrown.
*
* Security check is performed here:
* - uses AccessControlContext captured at API level
* - checks for appropriate URLPermission for request
* - if permission allowed, grants equivalent SocketPermission to call
* - in case of direct HTTP proxy, checks additionally for access to proxy
* (CONNECT proxying uses its own Exchange, so check done there)
*
*/
final class Exchange<T> {
//此處是Exchange類的成員變數的展示
final HttpRequestImpl request;
final HttpClientImpl client;
//ExchangeImpl抽象成員,具體類型根據連接類型確定
volatile ExchangeImpl<T> exchImpl;
volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
volatile CompletableFuture<Void> bodyIgnored;
// used to record possible cancellation raised before the exchImpl
// has been established.
private volatile IOException failed;
@SuppressWarnings("removal")
final AccessControlContext acc;
final MultiExchange<T> multi;
final Executor parentExecutor;
volatile boolean upgrading; // to HTTP/2
volatile boolean upgraded; // to HTTP/2
final PushGroup<T> pushGroup;
final String dbgTag;
//…………省略大量程式碼
//上文中,MultiExchange調用的方法
// Completed HttpResponse will be null if response succeeded
// will be a non null responseAsync if expect continue returns an error
public CompletableFuture<Response> responseAsync() {
return responseAsyncImpl(null);
}
//上面方法調用的重載方法
CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
SecurityException e = checkPermissions();
if (e != null) {
return MinimalFuture.failedFuture(e);
} else {
return responseAsyncImpl0(connection);
}
}
//實際處理的方法,我們需要重點關注。
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
//此處聲明一個通過407錯誤校驗(代理伺服器認證失敗)後要執行的操作
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (request.expectContinue()) {
request.addSystemHeader("Expect", "100-Continue");
Log.logTrace("Sending Expect: 100-Continue");
// wait for 100-Continue before sending body
// 若我們構建請求設置了expectContinue(),那麼通過407校驗後,就會先發送一個等待100響應狀態碼的確認請求
after407Check = this::expectContinue;
} else {
// send request body and proceed. 絕大多數情況下,通過407校驗後,直接發送請求體
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(...) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
(ex) -> ex.sendHeadersAsync()
.handle((r,t) -> this.checkFor407(r, t, after407Check))
.thenCompose(Function.identity());
return establishExchange(connection) //首先建立連接
//校驗是否發生407錯誤,否則執行上面的afterExch407Check,即發送請求頭,然後再次校驗407錯誤,之後執行after407check操作
.handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
.thenCompose(Function.identity());
}
}
可以看到,實際處理請求的是Exchange::responseAsyncImpl0方法。此處發生的流程正如流程圖裡看到的那樣:
- 嘗試建立連接
- 校驗是否發生407錯誤
- 發送請求頭
- 再次校驗是否發生了407錯誤
- 發送100確認請求/發送請求體
我們首先關注連接的建立過程:Exchange.estableExchange(connection)
// get/set the exchange impl, solving race condition issues with
// potential concurrent calls to cancel() or cancel(IOException)
private CompletableFuture<? extends ExchangeImpl<T>>
establishExchange(HttpConnection connection) {
if (debug.on()) {
debug.log("establishing exchange for %s,%n\t proxy=%s",
request, request.proxy());
}
//檢查請求是否已取消
// check if we have been cancelled first.
Throwable t = getCancelCause();
checkCancelled();
if (t != null) {
if (debug.on()) {
debug.log("exchange was cancelled: returned failed cf (%s)", String.valueOf(t));
}
return exchangeCF = MinimalFuture.failedFuture(t);
}
CompletableFuture<? extends ExchangeImpl<T>> cf, res;
//注意,此處是關鍵,非同步返回了exhangeImpl抽象類,它有三個子類,根據請求類型來判斷
//我們將分析此方法,其中實現類連接的創建和復用
cf = ExchangeImpl.get(this, connection);
// We should probably use a VarHandle to get/set exchangeCF
// instead - as we need CAS semantics.
synchronized (this) { exchangeCF = cf; };
res = cf.whenComplete((r,x) -> {
synchronized(Exchange.this) {
if (exchangeCF == cf) exchangeCF = null;
}
});
checkCancelled();
return res.thenCompose((eimpl) -> {
// recheck for cancelled, in case of race conditions
exchImpl = eimpl;
IOException tt = getCancelCause();
checkCancelled();
if (tt != null) {
return MinimalFuture.failedFuture(tt);
} else {
// Now we're good to go. Because exchImpl is no longer
// null cancel() will be able to propagate directly to
// the impl after this point ( if needed ).
return MinimalFuture.completedFuture(eimpl);
} });
}
我們看到,ExchangeImpl的靜態方法get(Exchange, Connection)方法非同步返回了它的具體實現類(對象)。
我們跟隨進入get靜態方法,可以看到根據當前交換版本(HTTP版本)的不同,實例化不同的Http子類。如果我們在調用時,指定了Http客戶端或請求的版本號:
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1) //指定客戶端為Http1.1版本
.build();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.version(HttpClient.Version.HTTP_1_1) //或者指定請求的版本號為Http1.1
.GET().build();
那麼,下面的get方法中,將會實例化Http1交換:Http1Exchange,否則,默認嘗試建立的是Http2的交換:Stream。
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
if (exchange.version() == HTTP_1_1) {
if (debug.on())
debug.log("get: HTTP/1.1: new Http1Exchange");
//創建Http1交換
return createHttp1Exchange(exchange, connection);
} else {
Http2ClientImpl c2 = exchange.client().client2(); // #### improve
HttpRequestImpl request = exchange.request();
//獲取Http2連接
CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
if (debug.on())
debug.log("get: Trying to get HTTP/2 connection");
// local variable required here; see JDK-8223553
//創建Http2交換
CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
return fxi.thenCompose(x->x);
}
}
我們假定調用時指定了Http1.1版本號,繼續關注Exchange的創建和連接建立過程。createHttp1Exchange方法調用了Http1Exchange的構造函數,我們跟隨進入:
Http1Exchange(Exchange<T> exchange, HttpConnection connection)
throws IOException
{
super(exchange);
this.request = exchange.request();
this.client = exchange.client();
this.executor = exchange.executor();
this.operations = new LinkedList<>();
operations.add(headersSentCF);
operations.add(bodySentCF);
if (connection != null) {
this.connection = connection;
} else {
InetSocketAddress addr = request.getAddress();
//獲取連接
this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
}
this.requestAction = new Http1Request(request, this);
this.asyncReceiver = new Http1AsyncReceiver(executor, this);
}
我們看到,Http1Exchange中維持了抽象連接(connection)的引用,並在構造方法中獲取了具體的連接。根據連接類型的不同,HttpConnection總共有6個實現類,它們的區別是否使用了SSL或代理。值得注意的是,Http2Connection並不在此體系內,它內部組合了一個HttpConnection的抽象成員。這說明了,Http2Connection實際上修飾了HttpConnection。
我們回到Http1。關注在Http1Exchange構造方法中出現的獲取連接的方法HttpConnection::getConnection。
/**
* Factory for retrieving HttpConnections. A connection can be retrieved
* from the connection pool, or a new one created if none available.
*
* The given {@code addr} is the ultimate destination. Any proxies,
* etc, are determined from the request. Returns a concrete instance which
* is one of the following:
* {@link PlainHttpConnection}
* {@link PlainTunnelingConnection}
*
* The returned connection, if not from the connection pool, must have its,
* connect() or connectAsync() method invoked, which ( when it completes
* successfully ) renders the connection usable for requests.
*/
public static HttpConnection getConnection(InetSocketAddress addr,
HttpClientImpl client,
HttpRequestImpl request,
Version version) {
// The default proxy selector may select a proxy whose address is
// unresolved. We must resolve the address before connecting to it.
InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
HttpConnection c = null;
//根據請求是否加密來決定連接類型
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();
if (!secure) {
//非加密連接
//嘗試從連接池中獲取
c = pool.getConnection(false, addr, proxy);
if (c != null && c.checkOpen() /* may have been eof/closed when in the pool */) {
final HttpConnection conn = c;
if (DEBUG_LOGGER.on())
DEBUG_LOGGER.log(conn.getConnectionFlow()
+ ": plain connection retrieved from HTTP/1.1 pool");
return c;
} else {
//連接池中取不到連接,創建新連接
return getPlainConnection(addr, proxy, request, client);
}
} else { // secure
if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
//有代理的Http1.1鏈接
c = pool.getConnection(true, addr, proxy);
}
if (c != null && c.isOpen()) {
final HttpConnection conn = c;
if (DEBUG_LOGGER.on())
DEBUG_LOGGER.log(conn.getConnectionFlow()
+ ": SSL connection retrieved from HTTP/1.1 pool");
return c;
} else {
String[] alpn = null;
if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
alpn = new String[] { "h2", "http/1.1" };
}
//創建SSL連接
return getSSLConnection(addr, proxy, alpn, request, client);
}
}
}
可以看到,連接到獲取過程運用了池化技術,首先嘗試從連接池中獲取連接,獲取不到再新建連接。使用連接池的好處,不在於減少對象創建的時間,而在於大大減少TCP連接「三次握手」的時間開銷。
那麼,HTTP1.1連接是怎樣快取和復用的呢?我們可以關注連接池類(ConnectionPool)。連接池在客戶端初始化時被初始化,它內部使用了散列表來維護路由和之前建立的HTTP連接列表的關係。其中,加密連接存在名為sslPool的HashMap中,而普通連接存在plainPool中。取連接時,將請求地址和代理地址資訊組合成快取鍵,根據鍵去散列表中取出對應的第一個連接,返回給調用者。
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {
//20分鐘的默認keepalive時間
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
"jdk.httpclient.keepalive.timeout", 1200); // seconds
//連接池大小不做限制
static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
"jdk.httpclient.connectionPoolSize", 0); // unbounded
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
// Pools of idle connections
//用散列表來維護路由和Http連接的映射關係
private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
private final ExpiryList expiryList;
private final String dbgTag; // used for debug
boolean stopped;
/**
連接池中路由——連接映射表的快取鍵。使用了目的地址和代理地址組合作為快取鍵。
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
* case 2: plain TCP via proxy (proxy only)
* case 3: SSL not via proxy (destination only)
* case 4: SSL over tunnel (destination and proxy)
*/
static class CacheKey {
final InetSocketAddress proxy;
final InetSocketAddress destination;
CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
this.proxy = proxy;
this.destination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final CacheKey other = (CacheKey) obj;
if (!Objects.equals(this.proxy, other.proxy)) {
return false;
}
if (!Objects.equals(this.destination, other.destination)) {
return false;
}
return true;
}
@Override
public int hashCode() {
return Objects.hash(proxy, destination);
}
}
ConnectionPool(long clientId) {
this("ConnectionPool("+clientId+")");
}
/**
* There should be one of these per HttpClient.
*/
private ConnectionPool(String tag) {
dbgTag = tag;
plainPool = new HashMap<>();
sslPool = new HashMap<>();
expiryList = new ExpiryList();
}
//省略部分程式碼
//從連接池獲取連接的方法
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
if (stopped) return null;
// for plain (unsecure) proxy connection the destination address is irrelevant.
addr = secure || proxy == null ? addr : null;
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//System.out.println ("getConnection returning: " + c);
assert c == null || c.isSecure() == secure;
return c;
}
private HttpConnection findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//從連接池中取出對應的連接列表
LinkedList<HttpConnection> l = pool.get(key);
if (l == null || l.isEmpty()) {
return null;
} else {
//對應請求地址的第一個連接,即是最老的一個連接
HttpConnection c = l.removeFirst();
//從過期時間列表中移除這個連接
expiryList.remove(c);
return c;
}
}
//暫時省略
}
上面就是Http1.1的池化連接獲取過程。而Http2連接的獲取有所不同,它是將scheme::host::port組成一個字元串,從自身維護的池裡取的。這裡就不展開了。
在這之後,我們的分析都以Http1.1,PlainHttpConnection為基準。
我們回到HttpConnection的getPlainConnection方法,此方法在當從連接池取不到連接,或取出的連接已關閉時被調用。該方法的目的是獲取新的連接。可以看到,這裡還是會根據請求類型和是否有代理來實例化不同的連接:
private static HttpConnection getPlainConnection(InetSocketAddress addr,
InetSocketAddress proxy,
HttpRequestImpl request,
HttpClientImpl client) {
if (request.isWebSocket() && proxy != null)
return new PlainTunnelingConnection(addr, proxy, client,
proxyTunnelHeaders(request));
if (proxy == null)
//創建最基本的Http連接
return new PlainHttpConnection(addr, client);
else
return new PlainProxyConnection(proxy, client);
}
我們進入PlainHttpConnection的構造函數:
/**
* Plain raw TCP connection direct to destination.
* The connection operates in asynchronous non-blocking mode.
* All reads and writes are done non-blocking.
*/
class PlainHttpConnection extends HttpConnection {
//部分成員變數,可見這裡維護了NIO的Socket通道
private final Object reading = new Object();
protected final SocketChannel chan;
//雙向socket管道
private final SocketTube tube; // need SocketTube to call signalClosed().
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
private volatile ConnectTimerEvent connectTimerEvent; // may be null
private volatile int unsuccessfulAttempts;
// Indicates whether a connection attempt has succeeded or should be retried.
// If the attempt failed, and shouldn't be retried, there will be an exception
// instead.
private enum ConnectState { SUCCESS, RETRY }
//構造函數
PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
super(addr, client);
try {
//打開一個socket通道,實例化chan屬性,並設置為非阻塞模式
this.chan = SocketChannel.open();
chan.configureBlocking(false);
//設置緩衝區的大小
if (debug.on()) {
int bufsize = getSoReceiveBufferSize();
debug.log("Initial receive buffer size is: %d", bufsize);
bufsize = getSoSendBufferSize();
debug.log("Initial send buffer size is: %d", bufsize);
}
if (trySetReceiveBufferSize(client.getReceiveBufferSize())) {
if (debug.on()) {
int bufsize = getSoReceiveBufferSize();
debug.log("Receive buffer size configured: %d", bufsize);
}
}
if (trySetSendBufferSize(client.getSendBufferSize())) {
if (debug.on()) {
int bufsize = getSoSendBufferSize();
debug.log("Send buffer size configured: %d", bufsize);
}
}
//設置禁用TCP粘包演算法
chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
// wrap the channel in a Tube for async reading and writing
//將nio socket通道包裹在實例化的socket管道成員變數中
//稍後將分析其內部結構和功能
tube = new SocketTube(client(), chan, Utils::getBuffer);
} catch (IOException e) {
throw new InternalError(e);
}
}
}
可見,對PlainHttpConnection的實例化過程中,開啟了一個非阻塞模式的socket通道,並將其包裹在一個實例化的socketTube管道中,而socketTube管道,就是我們下一節要分析的重點。在此之前,我們先分析連接的降級過程。
3.2 連接的降級和升級
在上一小節中,我們提到,ExchangeImpl的靜態get方法,通過判斷版本號來決定實例化自身的那個子類。如果我們在調用時沒有指定Http1.1版本,那麼get方法將嘗試實例化Stream(Http2的流)。可是,我們調用的是Http連接,為什麼會實例化Http2呢?不是註定失敗嗎?
其實,Http2規範並未規定一定要建立在SSL(TLS)上。在Http2已經普及的今天,HttpClient自然首選嘗試Http2。在連接建立時,客戶端和伺服器會通過alpn(Application Layer Protocol Negotiation, 應用層協議協商)進行溝通,確定要建立的連接類型。伺服器告知只支援Http1.1連接時,HttpClient也必須進行連接的降級。
我們跟隨程式碼,進行分析:
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
if (exchange.version() == HTTP_1_1) {
if (debug.on())
debug.log("get: HTTP/1.1: new Http1Exchange");
return createHttp1Exchange(exchange, connection);
} else {
//獲取HttpclientImpl的成員變數Httpclient2Impl
Http2ClientImpl c2 = exchange.client().client2(); // #### improve
HttpRequestImpl request = exchange.request();
//嘗試非同步獲取Http2連接,如果失敗,那麼c2f中的結果將為空
CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
if (debug.on())
debug.log("get: Trying to get HTTP/2 connection");
// local variable required here; see JDK-8223553
//對可能獲取到,也可能獲取不到的Http2連接的處理,決定實例化Stream還是Http1Exchange
//我們稍後進入
CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
return fxi.thenCompose(x->x);
}
}
我們進入Http2ClientImpl.getConnectionFor方法。在我們要訪問的url不支援http2時,有兩種情況:http開頭的地址,直接獲取Http2連接失敗;https開頭的地址,會嘗試建立Http2連接,但協商失敗後,以異常告終。
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
Exchange<?> exchange) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
synchronized (this) {
//嘗試從Http2連接池中獲取連接,當然是獲取不到的
Http2Connection connection = connections.get(key);
if (connection != null) {
try {
if (connection.closed || !connection.reserveStream(true)) {
if (debug.on())
debug.log("removing found closed or closing connection: %s", connection);
deleteConnection(connection);
} else {
// fast path if connection already exists
if (debug.on())
debug.log("found connection in the pool: %s", connection);
return MinimalFuture.completedFuture(connection);
}
} catch (IOException e) {
// thrown by connection.reserveStream()
return MinimalFuture.failedFuture(e);
}
}
//情況1:訪問的是http連接。因為ALPN是對SSL/TLS協議的拓展,
//那麼這裡就不用考慮了,直接返回null,獲取http2連接失敗
if (!req.secure() || failures.contains(key)) {
// secure: negotiate failed before. Use http/1.1
// !secure: no connection available in cache. Attempt upgrade
if (debug.on()) debug.log("not found in connection pool");
return MinimalFuture.completedFuture(null);
}
}
return Http2Connection
//情況2:嘗試繼續獲取Http2連接,後續將看到,這裡也會以失敗告終
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
synchronized (Http2ClientImpl.this) {
if (conn != null) {
try {
conn.reserveStream(true);
} catch (IOException e) {
throw new UncheckedIOException(e); // shouldn't happen
}
offerConnection(conn);
} else {
Throwable cause = Utils.getCompletionCause(t);
if (cause instanceof Http2Connection.ALPNException)
failures.add(key);
}
}
});
}
我們跟蹤Http2Connection.createAsync方法,會跟蹤到Http2Connection::checkSSLConfig方法。下方可以看到,當嘗試使用alpn協商用Http2連接無果時,會以失敗終結建立Http2Connection對象的completableFuture。
//檢查ssl握手情況,在https連接時會被調用
private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
assert aconn.isSecure();
Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
SSLEngine engine = aconn.getEngine();
String engineAlpn = engine.getApplicationProtocol();
assert Objects.equals(alpn, engineAlpn)
: "alpn: %s, engine: %s".formatted(alpn, engineAlpn);
DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn );
//嘗試alpn協商,結果不是"h2",說明伺服器不支援http2,只有嘗試降級
if (alpn == null || !alpn.equals("h2")) {
String msg;
if (alpn == null) {
Log.logSSL("ALPN not supported");
msg = "ALPN not supported";
} else {
switch (alpn) {
case "":
Log.logSSL(msg = "No ALPN negotiated");
break;
case "http/1.1":
Log.logSSL( msg = "HTTP/1.1 ALPN returned");
break;
default:
Log.logSSL(msg = "Unexpected ALPN: " + alpn);
cf.completeExceptionally(new IOException(msg));
}
}
//以異常終結Http2連接的嘗試
cf.completeExceptionally(new ALPNException(msg, aconn));
return cf;
}
cf.complete(null);
return cf;
};
return aconn.getALPN()
.whenComplete((r,t) -> {
if (t != null && t instanceof SSLException) {
// something went wrong during the initial handshake
// close the connection
aconn.close();
}
})
.thenCompose(checkAlpnCF);
}
在Http2協商連接失敗的情況下,非同步返回給ExchangeImpl的get方法的c2f,不會有結果。可以預想的是,之後便是Http1.1交換的建立過程。除此之外,還會發生什麼呢?
我們將看到,HttpClient可謂是「鍥而不捨」,對無法Alpn協商的http請求,也會對請求頭進行修飾,嘗試進行協議的升級。
由於ExchangeImpl::get方法調用了createExchangeImpl方法,我們跟隨進入:
private static <U> CompletableFuture<? extends ExchangeImpl<U>>
createExchangeImpl(Http2Connection c,
Throwable t,
Exchange<U> exchange,
HttpConnection connection)
{
if (debug.on())
debug.log("handling HTTP/2 connection creation result");
boolean secure = exchange.request().secure();
if (t != null) {
if (debug.on())
debug.log("handling HTTP/2 connection creation failed: %s",
(Object)t);
t = Utils.getCompletionCause(t);
if (t instanceof Http2Connection.ALPNException) {
//如果我們訪問的是Http1.1的https開頭的連接,那麼會進入該分支
Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
AbstractAsyncSSLConnection as = ee.getConnection();
if (debug.on())
debug.log("downgrading to HTTP/1.1 with: %s", as);
//建立Http1Exchange,會復用原來的AsyncSSLConnection
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, as);
return ex;
} else {
if (debug.on())
debug.log("HTTP/2 connection creation failed "
+ "with unexpected exception: %s", (Object)t);
return MinimalFuture.failedFuture(t);
}
}
if (secure && c== null) {
if (debug.on())
debug.log("downgrading to HTTP/1.1 ");
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, null);
return ex;
}
if (c == null) {
//在我們要訪問的地址是http開頭時,會進入該分支,此時建立Http1.1連接,並嘗試連接升級
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
if (debug.on())
debug.log("new Http1Exchange, try to upgrade");
return createHttp1Exchange(exchange, connection)
.thenApply((e) -> {
//嘗試連接升級,其實就是在請求頭加上Connection、Upgrade和Http2-Settings欄位
exchange.h2Upgrade();
return e;
});
} else {
if (debug.on()) debug.log("creating HTTP/2 streams");
Stream<U> s = c.createStream(exchange);
CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
return ex;
}
}
我們看到,對Http開頭的地址的訪問會嘗試進行Http2連接的升級,即先用Http1請求的方式向伺服器請求升級成Http2,若伺服器響應,則會進行升級。升級相關步驟在一次請求——響應過程之後。為了和本節主題貼切,我們也過一眼:
private CompletableFuture<Response>
checkForUpgradeAsync(Response resp,
ExchangeImpl<T> ex) {
int rcode = resp.statusCode();
//響應狀態碼是101時,代表伺服器接收協議升級到Http2
if (upgrading && (rcode == 101)) {
Http1Exchange<T> e = (Http1Exchange<T>)ex;
// check for 101 switching protocols
// 101 responses are not supposed to contain a body.
// => should we fail if there is one?
if (debug.on()) debug.log("Upgrading async %s", e.connection());
return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
.thenCompose((T v) -> {// v is null
debug.log("Ignored body");
// we pass e::getBuffer to allow the ByteBuffers to accumulate
// while we build the Http2Connection
ex.upgraded();
upgraded = true;
//建立Http2連接
return Http2Connection.createAsync(e.connection(),
client.client2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
boolean cached = c.offerConnection();
if (cached) connectionAborter.disable();
Stream<T> s = c.getStream(1);
//省略………………
);
}
return MinimalFuture.completedFuture(resp);
}
在此,Http連接降級和升級的過程就介紹完畢。我們將進入激動人心的環節:數據是怎樣被發送的。
4. 響應式讀寫流的連接
看到上面Http連接的建立,我們似乎沒有看到對應的TCP連接到建立?沒錯,是的。在初次請求建立連接時,JDK HttpClient把socket連接的建立推遲到了發送請求頭的相關方法中。
我們承接上面對建立PlainHttpConnection連接的分析,看看最後實例化的SocketTube是什麼。從下方的UML圖中可以看到,socketTube是FlowTube介面的實現,它的另一個實現類是SSLTube。
4.1 socket管道的結構和功能
那麼,FlowTube是什麼呢?從FlowTube的結構和注釋上看,其同時扮演了JAVA Flow Api(Reactive Streams)中的發布者和訂閱者。作為一個」連接者「,它一端連接了Socket通道的讀寫,另一端連接了Http報文的讀寫。
/**
Google翻譯原注釋:
FlowTube 是一種 I/O 抽象,允許非同步讀取和寫入目標。 這不是 Flow.Processor<List<ByteBuffer>, List<ByteBuffer>>,而是在雙向流中對發布者源和訂閱者接收器進行建模。
應該調用 connectFlows 方法來連接雙向流。 FlowTube 支援隨著時間的推移將相同的讀取訂閱移交給不同的順序讀取訂閱者。 當 connectFlows(writePublisher, readSubscriber 被調用時,FlowTube 將在其以前的 readSubscriber 上調用 dropSubscription,在其新的 readSubscriber 上調用 onSubscribe。
*/
public interface FlowTube extends
Flow.Publisher<List<ByteBuffer>>,
Flow.Subscriber<List<ByteBuffer>> {
/**
* 用於從雙向流中讀取的訂閱者。 TubeSubscriber 是可以通過調用 dropSubscription() 取消的 Flow.Subscriber。 一旦調用 dropSubscription(),TubeSubscriber 就應該停止調用其訂閱的任何方法。
*/
static interface TubeSubscriber extends Flow.Subscriber<List<ByteBuffer>> {
default void dropSubscription() { }
default boolean supportsRecycling() { return false; }
}
/**
一個向雙向流寫入的發布者
* A publisher for writing to the bidirectional flow.
*/
static interface TubePublisher extends Flow.Publisher<List<ByteBuffer>> {
}
/**
* 將雙向流連接到寫入發布者和讀取訂閱者。 可以多次順序調用此方法以將現有發布者和訂閱者切換為新的寫入訂閱者和讀取發布者對。
* @param writePublisher A new publisher for writing to the bidirectional flow.
* @param readSubscriber A new subscriber for reading from the bidirectional
* flow.
*/
default void connectFlows(TubePublisher writePublisher,
TubeSubscriber readSubscriber) {
this.subscribe(readSubscriber);
writePublisher.subscribe(this);
}
/**
* Returns true if this flow was completed, either exceptionally
* or normally (EOF reached).
* @return true if the flow is finished
*/
boolean isFinished();
}
這裡再稍微提一下Reactive Streams反應式流的交互方式:
- 發布者(Publisher) 接受訂閱者(Subscriber)的訂閱:publisher.subscribe(Subscriber)
- 發布者將一個訂閱關係(Subscription)交給訂閱者:subscriber.onSubscribe(Subscription)
- 訂閱者請求n個訂閱:subscription.request(n)
- 訂閱者接受至多n個訂閱品:subscriber.onNext(T item)
- 訂閱者可取消訂閱:subscription.cancel()
- 訂閱者接收 接收完成 和 發生錯誤 的通知:subscriber.onError(Throwable); subscriber.onComplete()
我們看下SocketTube的構造函數:
/**
* A SocketTube is a terminal tube plugged directly into the socket.
* The read subscriber should call {@code subscribe} on the SocketTube before
* the SocketTube is subscribed to the write publisher.
*/
final class SocketTube implements FlowTube {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
static final AtomicLong IDS = new AtomicLong();
private final HttpClientImpl client;
//nio 的 socket 通道
private final SocketChannel channel;
private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
private final InternalWriteSubscriber writeSubscriber;
private final long id = IDS.incrementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
Supplier<ByteBuffer> buffersFactory) {
this.client = client;
this.channel = channel;
this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
//這裡實例化了兩個對象作為屬性:內部讀發布者和內部寫接受者
this.readPublisher = new InternalReadPublisher();
this.writeSubscriber = new InternalWriteSubscriber();
}
}
在構造方法中,SocketTube實例化了readPublisher和writeSubscriber。它們的類型分別是SocketTube的內部類InternalReadPublisher和InternalWriteSubscriber,從名稱就可以看出它們的位置和作用:
- ReadPublisher從socket通道讀取內容,並」發布「到管道中,等待消費者接收並將內容解析成Http請求頭和請求體
- WriteSubscriber」訂閱「Http報文,它等待Http內容的發布者將報文寫入到SocketTube後,取出報文並寫入socket通道
這些我們將在稍後到分析中繼續深入。
4.2 socket 連接的建立
鋪墊了這麼多,socket連接究竟是如何建立的呢?答案就蘊含在FlowTube的默認方法connectFlows中(SocketTube重寫了這一方法,但只是加了一行日誌列印)。該方法要求調用方傳入一個來源於一個」源「的發布者和一個訂閱者,這樣,調用方和SocketTube之間就建立了雙向訂閱關係。
@Override
public void connectFlows(TubePublisher writePublisher,
TubeSubscriber readSubscriber) {
//socketTube類的connectFlow方法
if (debug.on()) debug.log("connecting flows");
this.subscribe(readSubscriber);
writePublisher.subscribe(this);
}
為了見證這一歷程,我們必須回過頭來,回到Exchange的responseAsyncImpl0方法中。
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (request.expectContinue()) {
request.addSystemHeader("Expect", "100-Continue");
Log.logTrace("Sending Expect: 100-Continue");
// wait for 100-Continue before sending body
after407Check = this::expectContinue;
} else {
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(...) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
//現在,讓我們關注這個名為非同步發送請求頭的方法,連接建立的過程就蘊含其中
(ex) -> ex.sendHeadersAsync()
.handle((r,t) -> this.checkFor407(r, t, after407Check))
.thenCompose(Function.identity());
return establishExchange(connection) //首先建立連接
.handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
.thenCompose(Function.identity());
}
我們進入ExchangeImpl::sendHeadersAsync方法。這裡展示的是Http1Exchange的重寫方法:
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (debug.on()) debug.log("Sending headers only");
// If the first attempt to read something triggers EOF, or
// IOException("channel reset by peer"), we're going to retry.
// Instruct the asyncReceiver to throw ConnectionExpiredException
// to force a retry.
asyncReceiver.setRetryOnError(true);
if (response == null) {
//這裡生成了響應對象。內部,asyncReceiver完成了對請求頭的「訂閱」
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (debug.on()) debug.log("response created in advance");
CompletableFuture<Void> connectCF;
if (!connection.connected()) {
//注意,首次建立連接時,socket連接時沒有建立的,會在這裡建立連接
if (debug.on()) debug.log("initiating connect async");
//非同步建立並完成socket連接,我們即將進入分析
connectCF = connection.connectAsync(exchange)
//非同步將連接標記為已連接
.thenCompose(unused -> connection.finishConnect());
Throwable cancelled;
synchronized (lock) {
if ((cancelled = failed) == null) {
operations.add(connectCF);
}
}
if (cancelled != null) {
if (client.isSelectorThread()) {
executor.execute(() ->
connectCF.completeExceptionally(cancelled));
} else {
connectCF.completeExceptionally(cancelled);
}
}
} else {
connectCF = new MinimalFuture<>();
connectCF.complete(null);
}
return connectCF
.thenCompose(unused -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
asyncReceiver.whenFinished.whenComplete((r,t) -> {
if (t != null) {
if (debug.on())
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
if (!headersSentCF.isDone())
headersSentCF.completeAsync(() -> this, executor);
}
});
//這裡最終調用了FlowTube::connectFlows方法,建立了雙向的連接
//我們即將分析
connectFlows(connection);
if (debug.on()) debug.log("requestAction.headers");
//從請求中取出請求頭數據
List<ByteBuffer> data = requestAction.headers();
synchronized (lock) {
state = State.HEADERS;
}
if (debug.on()) debug.log("setting outgoing with headers");
assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
//放到輸出的隊列裡面,我們下一節將分析
appendToOutgoing(data);
cf.complete(null);
return cf;
} catch (Throwable t) {
if (debug.on()) debug.log("Failed to send headers: %s", t);
headersSentCF.completeExceptionally(t);
bodySentCF.completeExceptionally(t);
connection.close();
cf.completeExceptionally(t);
return cf;
} })
.thenCompose(unused -> headersSentCF);
}
該方法名為」發送請求頭「,實際上做了幾件事:
- 非同步建立socket連接
- 與管道(不是socket通道)建立雙向訂閱關係
- 取出請求頭,放入到隊列,並通知管道端的訂閱者消費
我們將在本節分析前兩個步驟。首先看下非同步socket連接的建立:PlainHttpConnection::connectAsync方法
//PlainHttpConnection類實現的HttpConnection抽象類的connectAsync方法
@Override
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
CompletableFuture<ConnectState> cf = new MinimalFuture<>();
try {
assert !connected : "Already connected";
assert !chan.isBlocking() : "Unexpected blocking channel";
boolean finished;
if (connectTimerEvent == null) {
//連接超時計時器的註冊,這一步會喚醒阻塞的selector執行緒
connectTimerEvent = newConnectTimer(exchange, cf);
if (connectTimerEvent != null) {
if (debug.on())
debug.log("registering connect timer: " + connectTimerEvent);
client().registerTimer(connectTimerEvent);
}
}
//解析DNS地址,然後將該通道的套接字與對應的地址連接
//由於設置了非阻塞模式,這裡會立即返回,
//返回時,可能已經連接成功(finished = true),或者之後還需要繼續連接(finished = false)
PrivilegedExceptionAction<Boolean> pa =
() -> chan.connect(Utils.resolveAddress(address));
try {
finished = AccessController.doPrivileged(pa);
} catch (PrivilegedActionException e) {
throw e.getCause();
}
if (finished) {
//如果直接就已經連接成功,那麼這個非同步操作相當於同步了
if (debug.on()) debug.log("connect finished without blocking");
cf.complete(ConnectState.SUCCESS);
} else {
//否則的話,這裡需要註冊一個連接事件(稍後分析),等待事件就緒後,選擇器管理執行緒分發該事件,
//並調用該事件的handle方法完成連接的建立。
if (debug.on()) debug.log("registering connect event");
client().registerEvent(new ConnectEvent(cf, exchange));
}
cf = exchange.checkCancelled(cf, this);
} catch (Throwable throwable) {
cf.completeExceptionally(Utils.toConnectException(throwable));
try {
close();
} catch (Exception x) {
if (debug.on())
debug.log("Failed to close channel after unsuccessful connect");
}
}
return cf.handle((r,t) -> checkRetryConnect(r, t,exchange))
.thenCompose(Function.identity());
}
閱讀上面的方法,我們可以看到,socket連接的建立有兩種可能:直接成功;或需等待相應通道就緒後(可連接事件)才成功。這時,便要註冊一個連接事件,稍後由選擇器執行緒來調用該事件的handle方法完成連接。流程圖如下:
關於選擇器管理執行緒(SelectorManager)的工作過程,在《HttpClient客戶端的構建和啟動》一篇中有詳細介紹。
我們看下ConnectEvent的實現:它是位於PlainHttpConnection的一個內部類。
final class ConnectEvent extends AsyncEvent {
private final CompletableFuture<ConnectState> cf;
private final Exchange<?> exchange;
ConnectEvent(CompletableFuture<ConnectState> cf, Exchange<?> exchange) {
this.cf = cf;
this.exchange = exchange;
}
@Override
public SelectableChannel channel() {
return chan;
}
@Override
public int interestOps() {
//該事件感興趣的操作是連接事件。
return SelectionKey.OP_CONNECT;
}
//事件處理方法,在連接事件就緒時,選擇器管理執行緒(SelectorManager)會調用
@Override
public void handle() {
try {
assert !connected : "Already connected";
assert !chan.isBlocking() : "Unexpected blocking channel";
if (debug.on())
debug.log("ConnectEvent: finishing connect");
//調用java nio channel 通道的finishConnect方法,在連接就緒(現在)時完成連接
boolean finished = chan.finishConnect();
if (debug.on())
debug.log("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",
finished, exchange.multi.requestCancelled(), chan.getLocalAddress());
assert finished || exchange.multi.requestCancelled() : "Expected channel to be connected";
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> ConnectState.SUCCESS, client().theExecutor());
} catch (Throwable e) {
if (canRetryConnect(e)) {
unsuccessfulAttempts++;
cf.completeAsync(() -> ConnectState.RETRY, client().theExecutor());
return;
}
Throwable t = Utils.toConnectException(e);
client().theExecutor().execute( () -> cf.completeExceptionally(t));
close();
}
}
@Override
public void abort(IOException ioe) {
client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
close();
}
}
這裡的操作相對簡單,就是調用了Channel::finishConnect方法完成連接。至此,非同步socket連接的過程已分析完畢。
4.3 雙向讀寫關係的建立
接著,我們再看下雙向連接的建立過程:
//Http1Exchange的私有connectFlows方法
private void connectFlows(HttpConnection connection) {
FlowTube tube = connection.getConnectionFlow();
if (debug.on()) debug.log("%s connecting flows", tube);
// Connect the flow to our Http1TubeSubscriber:
// asyncReceiver.subscriber().
tube.connectFlows(writePublisher,
asyncReceiver.subscriber());
}
理解了4.1節,該方法的目的就顯而易見:
-
SocketTube的InternalReadPublisher和Http1Exchange中的asyncReceiver的訂閱器(Http1TubeSubscriber)連接
-
Http1Exchange中的writePublisher(Http1Publisher)和SocketTube的InternalWriteSubScriber連接
注意,這兩步是存在先後順序的。否則,就可能出現往socket通道寫入了數據,而取出的響應數據沒有訂閱者的問題。
我們看下對應類的部分源碼來看看訂閱時做了什麼:
Http1Publisher
final class Http1Publisher implements FlowTube.TubePublisher {
final Logger debug = Utils.getDebugLogger(this::dbgString);
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
volatile boolean cancelled;
//Http1內容的發布者持有Http1寫入這一「訂閱」
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
//這裡用了一個自定義的調度器來保證讀寫順序,我們下一節將關注writeTask
final SequentialScheduler writeScheduler =
SequentialScheduler.lockingScheduler(new WriteTask());
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
assert state == State.INITIAL;
Objects.requireNonNull(s);
assert subscriber == null;
subscriber = s;
if (debug.on()) debug.log("got subscriber: %s", s);
//(在PlainHttpConnection里)使socketTube裡面的internalWriteSubscriber接收訂閱
s.onSubscribe(subscription);
}
//………………
}
SocketTube的InternalWriteSubscriber(內部類):
private final class InternalWriteSubscriber
implements Flow.Subscriber<List<ByteBuffer>> {
volatile WriteSubscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
//這裡維持了觸發對初片Http內容的請求所需要的事件
final AsyncTriggerEvent startSubscription =
new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
WriteSubscription previous = this.subscription;
if (debug.on()) debug.log("subscribed for writing");
try {
//若是新訂閱,則需要註冊訂閱事件,等待選擇器執行緒分發處理,以觸發寫入操作
boolean needEvent = current == null;
if (needEvent) {
//若之前已訂閱了別的發布者,則丟棄之前的訂閱
if (previous != null && previous.upstreamSubscription != subscription) {
previous.dropSubscription();
}
}
//接收新的訂閱。這裡並沒有直接把它作為成員變數,而是薄封裝了一個新的訂閱作為代理
this.subscription = new WriteSubscription(subscription);
if (needEvent) {
if (debug.on())
debug.log("write: registering startSubscription event");
client.registerEvent(startSubscription);
}
} catch (Throwable t) {
signalError(t);
}
}
}
SocketTube的InternalReadPublisher(內部類):
private final class InternalReadPublisher
implements Flow.Publisher<List<ByteBuffer>> {
//socket內容的發布者持有內部的讀取socket資訊的訂閱
private final InternalReadSubscription subscriptionImpl
= new InternalReadSubscription();
AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
private volatile ReadSubscription subscription;
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
Objects.requireNonNull(s);
TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
ReadSubscription previous = pendingSubscription.getAndSet(target);
//這裡還是判斷之前若已經被訂閱了,就丟棄之前的訂閱者
if (previous != null && previous != target) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
} else {
previous.subscriber.dropSubscription();
}
}
if (debug.on()) debug.log("read publisher got subscriber");
//這一步內部是看sequentialScheduler的情況,選擇註冊一個訂閱事件實現非同步訂閱,
//最終調用訂閱者的onSubscribe方法;或者直接調用subscriber.onSubscribe
subscriptionImpl.signalSubscribe();
debugState("leaving read.subscribe: ");
}
}
Http1AsyncReceiver的Http1TubeSubscriber(內部成員):
final class Http1TubeSubscriber implements TubeSubscriber {
volatile Flow.Subscription subscription;
volatile boolean completed;
volatile boolean dropped;
public void onSubscribe(Flow.Subscription subscription) {
// supports being called multiple time.
// doesn't cancel the previous subscription, since that is
// most probably the same as the new subscription.
if (debug.on()) debug.log("Received onSubscribed from upstream");
if (Log.channel()) {
Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe());
}
assert this.subscription == null || dropped == false;
//接受訂閱時,這裡只是簡單維持了對訂閱的引用
this.subscription = subscription;
dropped = false;
canRequestMore.set(true);
if (delegate != null) {
scheduler.runOrSchedule(executor);
} else {
if (debug.on()) debug.log("onSubscribe: read delegate not present yet");
}
}
}
可以看到,調用connectFlows方法後,發布者和訂閱者已經基本處於就緒狀態了,但也可能還需要SelectorManager執行緒的協助才能真正完成訂閱。
在上面的程式碼中,出現了一個scheduler成員,它事實上控制了整個讀寫的流程。我們必須清楚其作用,否則便會迷失在多變的「調度」過程中。
4.4 順序調度器簡析
上面程式碼里出現的secheduler變數,屬於SequentialScheduler類型。SequentialScheduler類位於jdk.internal.net.http.common包下,是JDK為了簡化同一個可重複執行的任務的順序執行而設計的。它保證了相同的任務互斥執行。
下面是類上的注釋說明和翻譯。
A scheduler of ( repeatable ) tasks that MUST be run sequentially.
This class can be used as a synchronization aid that assists a number of parties in running a task in a mutually exclusive fashion.
To run the task, a party invokes runOrSchedule. To permanently prevent the task from subsequent runs, the party invokes stop.
The parties can, but do not have to, operate in different threads.
The task can be either synchronous ( completes when its run method returns ), or asynchronous ( completed when its DeferredCompleter is explicitly completed ).
The next run of the task will not begin until the previous run has finished.
The task may invoke runOrSchedule itself, which may be a normal situation.必須按順序運行的(可重複的)任務的調度程式。 此類可用作同步輔助工具,幫助多方以互斥方式運行任務。
為了運行任務,一方調用 runOrSchedule。 為了永久阻止該任務在後續運行,該方調用 stop。
各方可以但不必在不同的執行緒中運作。 任務可以是同步的(在其 run 方法返回時完成),也可以是非同步的(在其 DeferredCompleter 顯式完成時完成)。
在上一次運行完成之前,不會開始下一次任務運行。
任務可能會調用runOrSchedule本身,這可能是正常情況。
我們看下SequentialScheduler的uml類圖:
簡而言之,該類通過state這個原子整形變數,控制新調用的runOrSchedule方法是否執行。值得注意的是,在多執行緒調用runOrSchedule方法時,只能有一個任務處於等待狀態,其它任務則不會執行。
5. 請求頭和請求體的發送
見識了精巧的連接建立過程,我們將要見證請求頭和請求體的發送流程。 簡而言之,對請求頭的發送就是遵循了Reactive Streams規範的發布——訂閱過程,其使用了一個執行緒安全的雙向隊列維護要輸出的數據(請求頭+體)。
在請求頭的發送過程中,請求頭數據的流轉可用以下數據流圖來描述:
由於本節涉及到經典的響應式流交互,相對複雜,我們先對涉及的相關成員做簡要介紹。
5.1 發布和訂閱者介紹
在Plain Http1.1 請求——響應過程中,Http1Exchange扮演了請求的發布者和響應的訂閱者的角色;而SocketTube則作為了請求的訂閱者和響應的發布者。實際上,這些功能是由它們的內部類成員完成的。我們先放出Http1Exchange和SocketTube的UML類圖,然後對涉及到的成員做簡要介紹。
Http1Exchange的uml類圖:
SocketTube的uml類圖:
類名 | 父類(介面) | 外部類 | 角色 | 功能 |
---|---|---|---|---|
Http1Exchange | ExchangeImpl | – | 一次Http1.1請求——響應交換的管理者 | 實現父類方法,管理一次Http1.1請求——響應交換,將具體職責交由內部成員實現 |
Http1Publisher | FlowTube.TubePublisher、Flow.Publisher | Http1Exchange | Http請求內容的發布者 | 接受管道訂閱者的註冊;從請求數據緩衝隊列取出內容交給訂閱者 |
Http1WriteSubscription | Flow.Subscription | Http1Publisher | Http請求內容的訂閱(關係) | 作為媒介,連接發布者和訂閱者,接受管道訂閱者對Http請求內容的需求並傳遞給發布者 |
WriteTask | Runnable | Http1Publisher | 任務 | 作為Http1請求數據發布者(Http1Publisher)要執行的發布任務 |
DataPair | – | Http1Exchange | 數據組合 | 組合要傳輸的Http請求分片數據和該分片上的錯誤,被存放在緩衝隊列中等待發布者取出,實現錯誤通知 |
SocketTube | Flow.Publisher,Flow.Subscriber | – | socket管道 | 管理和維護socket通道端的發布者和訂閱者 |
InternalWriteSubscriber | Flow.Subscriber | SocketTube | Http請求內容的訂閱者 | 接收Http請求數據,將其寫入socket通道 |
WriteSubscription | Flow.Subscription | InternalWriteSubscriber | Http請求內容訂閱的包裝 | 代理並修飾Http1WriteSubscription的行為,參與開始要求Http請求數據的行為 |
SocketFlowEvent | AsyncEvent | SocketTube | 通道流事件 | 通道讀/寫事件的父類,通過改變感興趣的操作可暫停或啟動對通道數據的讀寫 |
WriteEvent | SocketFlowEvent | InternalWriteSubscriber | 通道寫事件 | 對OP_Write操作感興趣,用於註冊到socket通道上啟動寫入操作 |
5.2 請求頭髮送的啟動過程
我們回到Http1Exchange的sendHeadersAsync方法:
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
//留意這裡的注釋,在發送請求頭的時候創建響應,這樣響應對象就能設置正確的接收者
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (debug.on()) debug.log("Sending headers only");
// If the first attempt to read something triggers EOF, or
// IOException("channel reset by peer"), we're going to retry.
// Instruct the asyncReceiver to throw ConnectionExpiredException
// to force a retry.
asyncReceiver.setRetryOnError(true);
if (response == null) {
//創建響應
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (debug.on()) debug.log("response created in advance");
CompletableFuture<Void> connectCF;
//這裡是非同步的建立socket連接過程,前面分析過了,就省略了
//…………
return connectCF
.thenCompose(unused -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
asyncReceiver.whenFinished.whenComplete((r,t) -> {
if (t != null) {
if (debug.on())
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
if (!headersSentCF.isDone())
headersSentCF.completeAsync(() -> this, executor);
}
});
connectFlows(connection);
if (debug.on()) debug.log("requestAction.headers");
//從請求中構建並取出請求頭,裡面有headers字元串和ByteBuffer的構建過程,略過
List<ByteBuffer> data = requestAction.headers();
//設置這個交換(Exchange)的狀態是在發送請求頭
synchronized (lock) {
state = State.HEADERS;
}
if (debug.on()) debug.log("setting outgoing with headers");
assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
//這一步把請求頭包裝成一份dataPair,放入到一個緩衝隊列中,並通知訂閱者接受
//我們將進入分析
appendToOutgoing(data);
cf.complete(null);
return cf;
} catch (Throwable t) {
if (debug.on()) debug.log("Failed to send headers: %s", t);
headersSentCF.completeExceptionally(t);
bodySentCF.completeExceptionally(t);
connection.close();
cf.completeExceptionally(t);
return cf;
} })
.thenCompose(unused -> headersSentCF);
}
我們看到,請求頭資訊被取出後,調用了appendToOutgoing(List
class Http1Exchange<T> extends ExchangeImpl<T> {
//…………省略
/** Holds the outgoing data, either the headers or a request body part. Or
* an error from the request body publisher. At most there can be ~2 pieces
* of outgoing data ( onComplete|onError can be invoked without demand ).*/
//承載輸出數據的隊列,充當了響應式流中的「緩衝區」作用
final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
/** A carrier for either data or an error. Used to carry data, and communicate
* errors from the request ( both headers and body ) to the exchange. */
//Http1Exchange內的靜態嵌套類,組合了一個「分片」的數據和錯誤
static class DataPair {
Throwable throwable;
List<ByteBuffer> data;
DataPair(List<ByteBuffer> data, Throwable throwable){
this.data = data;
this.throwable = throwable;
}
@Override
public String toString() {
return "DataPair [data=" + data + ", throwable=" + throwable + "]";
}
}
//這些方法都位於Http1Exchange內
void appendToOutgoing(List<ByteBuffer> item) {
//將輸出的byteBuffer包裝成一個DataPair數據對
appendToOutgoing(new DataPair(item, null));
}
private void appendToOutgoing(DataPair dp) {
if (debug.on()) debug.log("appending to outgoing " + dp);
//往名為outgoing的成員變數隊列裡面添加數據對
outgoing.add(dp);
//通知進行「發布」操作,我們繼續跟蹤,它位於下面的發布者內部類里
writePublisher.writeScheduler.runOrSchedule();
}
final class Http1Publisher implements FlowTube.TubePublisher {
final Logger debug = Utils.getDebugLogger(this::dbgString);
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
volatile boolean cancelled;
//請求的發布者持有的訂閱(關係)資訊。響應式流中,訂閱(關係)都由發布者生成,並交給訂閱者
//改類就位於該Http1Publisher內,為了不至於影響閱讀,我們稍後分析
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
//這裡使用了上節簡析過的順序調度器包裝了寫任務。我們中的關注寫任務
final SequentialScheduler writeScheduler =
SequentialScheduler.lockingScheduler(new WriteTask());
//我們要關注的重點寫任務,寫請求頭和請求體都會調用它
final class WriteTask implements Runnable {
@Override
public void run() {
assert state != State.COMPLETED : "Unexpected state:" + state;
if (debug.on()) debug.log("WriteTask");
if (cancelled) {
if (debug.on()) debug.log("handling cancellation");
//如果因錯誤等原因取消了,那麼調用調度器的停止方法,之後該任務便永遠不會執行
//同時非同步地將請求頭髮送和請求體發送過程標誌為完成
writeScheduler.stop();
getOutgoing();
return;
}
if (checkRequestCancelled()) return;
if (subscriber == null) {
//如果還沒有訂閱者,那就先不理會
if (debug.on()) debug.log("no subscriber yet");
return;
}
if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
//這是從緩衝隊列讀取請求數據的條件,即隊列不為空,而且接到了訂閱者的請求
while (hasOutgoing() && demand.tryDecrement()) {
//獲取下一個需要輸出的數據對。注意:請求頭只是一份數據。
//getOutgoing()中會對當前交換狀態進行切換,在讀取完僅佔有一份DataPair數據的請求頭後,
//便會將當前交換對象的狀態設置成"BODY",即發送請求體;還會設置「發送請求頭」的佔位符
//headersCentCF 為完成狀態
DataPair dp = getOutgoing();
if (dp == null)
break;
if (dp.throwable != null) {
//出錯了,便永遠停止該寫任務
if (debug.on()) debug.log("onError");
// Do not call the subscriber's onError, it is not required.
writeScheduler.stop();
} else {
List<ByteBuffer> data = dp.data;
if (data == Http1BodySubscriber.COMPLETED) {
//如果取出的要寫入的數據是標誌請求體結束的空byteBuffer,那麼就標記已完成寫請求任務
synchronized (lock) {
assert state == State.COMPLETING : "Unexpected state:" + state;
state = State.COMPLETED;
}
if (debug.on())
debug.log("completed, stopping %s", writeScheduler);
writeScheduler.stop();
// Do nothing more. Just do not publish anything further.
// The next Subscriber will eventually take over.
} else {
if (checkRequestCancelled()) return;
if (debug.on())
debug.log("onNext with " + Utils.remaining(data) + " bytes");
//在寫入請求頭、請求體過程中,不斷調用訂閱者(Plain Http1 中是 SocketTube)的onNext方法,
//使其向socket通道寫入
subscriber.onNext(data);
}
}
}
}
}
}
}
我們看到,請求(頭)發布者的操作還是相對明了的:
- 從請求中構建並取出請求頭,包裝成一份數據對,放入Http1Exchange中維持的一個雙向緩衝隊列
- 進行請求寫入的「通知」,在符合條件時,從隊列中取出數據,並通知訂閱者消費
那麼,「符合條件時」是怎樣的條件呢?
while (hasOutgoing() && demand.tryDecrement())
我們看到,這一「條件」一方面是緩衝隊列不為空,另一方面是「需求」(demand)不為空。那麼,這個需求是什麼鬼,又是怎樣、何時被初始化和操作的呢?其實,這個需求只是封裝了一個AtomicLong類型的變數,它充當了響應式流(反應式流)中訂閱者向發布者的請求的(但還未被送達的)物品數量。我們看下Demand的結構,然後重點看下在請求頭(體)發送的過程中,SocketTube里的InternalWriteSubscriber的行為。
/**
* Encapsulates operations with demand (Reactive Streams).
*
* <p> Demand is the aggregated number of elements requested by a Subscriber
* which is yet to be delivered (fulfilled) by the Publisher.
*/
public final class Demand {
private final AtomicLong val = new AtomicLong();
public boolean increase(long n) {
//增加n個需求
}
/**
* Increases this demand by 1 but only if it is fulfilled.
* @return true if the demand was increased, false otherwise.
*/
public boolean increaseIfFulfilled() {
//當前需求為空時,增加1個需求,用於啟動
return val.compareAndSet(0, 1);
}
public long decreaseAndGet(long n) {
//盡量將需求減少n個,如果不足,則減為零
}
//將需求數量減少1個
public boolean tryDecrement() {
return decreaseAndGet(1) == 1;
}
public boolean isFulfilled() {
return val.get() == 0;
}
public void reset() {
val.set(0);
}
public long get() {
return val.get();
}
}
下面是SocketTube的InternalWriteScheduler,我們首先分析下onSubScribe方法。該方法訂閱Http1Publisher後,接收一個訂閱(關係),薄包裝該訂閱後,開始向訂閱關係請求接受List
//翻譯下:此類假設發布者將按順序調用 onNext,並且如果 request(1) 未增加需求,則不會調用 onNext。
//它有一個「長度為1的隊列」,意味著它會在 onSubscribe 中調用 request(1),
//並且只有在它的 'current' 緩衝區列表被完全寫入後, current 才會被設置為 null ;
// This class makes the assumption that the publisher will call onNext
// sequentially, and that onNext won't be called if the demand has not been
// incremented by request(1).
// It has a 'queue of 1' meaning that it will call request(1) in
// onSubscribe, and then only after its 'current' buffer list has been
// fully written and current set to null;
private final class InternalWriteSubscriber
implements Flow.Subscriber<List<ByteBuffer>> {
//對接受到的訂閱的薄包裝
volatile WriteSubscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
//初次發起消費數據請求的事件,由選擇器管理執行緒分發處理。我們要關注的是this::startSubscription
final AsyncTriggerEvent startSubscription =
new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
//我們再次關注接受訂閱的方法,若之前有訂閱,則拋棄它
WriteSubscription previous = this.subscription;
if (debug.on()) debug.log("subscribed for writing");
try {
boolean needEvent = current == null;
if (needEvent) {
if (previous != null && previous.upstreamSubscription != subscription) {
previous.dropSubscription();
}
}
//薄包裝了接受到的訂閱。這一步主要是為了控制
this.subscription = new WriteSubscription(subscription);
if (needEvent) {
if (debug.on())
debug.log("write: registering startSubscription event");
//向SelectorManager的待註冊隊列放置一個開始消費請求頭(體)的事件,等待被
//選擇器管理執行緒分發執行
client.registerEvent(startSubscription);
}
} catch (Throwable t) {
signalError(t);
}
}
@Override
public void onNext(List<ByteBuffer> bufs) {
//我們稍後再分析onNext方法,該方法負責寫數據到socket通道
}
//……
//startSubscription成員變數構造函數第二個參數對應的方法句柄
void startSubscription() {
try {
if (debug.on()) debug.log("write: starting subscription");
if (Log.channel()) {
Log.logChannel("Start requesting bytes for writing to channel: {0}",
channelDescr());
}
assert client.isSelectorThread();
//確保之前讀取socket響應功能的發布者已經註冊(以便發送後能確保接收響應)
// make sure read registrations are handled before;
readPublisher.subscriptionImpl.handlePending();
if (debug.on()) debug.log("write: offloading requestMore");
// start writing; 請求發布者給它消息內容(訂閱品)
client.theExecutor().execute(this::requestMore);
} catch(Throwable t) {
signalError(t);
}
}
void requestMore() {
WriteSubscription subscription = this.subscription;
//最後還是調用了訂閱的請求更多方法
subscription.requestMore();
}
//薄包裝接收到的Http1Publisher的訂閱(關係)
final class WriteSubscription implements Flow.Subscription {
//上游訂閱,即Http1Publisher的subscribe方法中調用s.onSubscribe(subscription)時的訂閱
final Flow.Subscription upstreamSubscription;
volatile boolean cancelled;
WriteSubscription(Flow.Subscription subscription) {
this.upstreamSubscription = subscription;
}
@Override
public void request(long n) {
if (cancelled) return;
upstreamSubscription.request(n);
}
@Override
public void cancel() {
if (cancelled) return;
if (debug.on()) debug.log("write: cancel");
if (Log.channel()) {
Log.logChannel("Cancelling write subscription");
}
dropSubscription();
upstreamSubscription.cancel();
}
void dropSubscription() {
synchronized (InternalWriteSubscriber.this) {
cancelled = true;
if (debug.on()) debug.log("write: resetting demand to 0");
writeDemand.reset();
}
}
//終於到重點了
void requestMore() {
try {
if (completed || cancelled) return;
boolean requestMore;
long d;
// don't fiddle with demand after cancel.
// see dropSubscription.
synchronized (InternalWriteSubscriber.this) {
if (cancelled) return;
//下面這一步就是初始化請求,即請求初始為0,便請求更多
d = writeDemand.get();
requestMore = writeDemand.increaseIfFulfilled();
}
if (requestMore) {
//向上游訂閱請求要獲取一個訂閱品
if (debug.on()) debug.log("write: requesting more...");
//這個方法向上游要求多一個物品(請求數據分片),對應上游的訂閱要增加一個「需求」Demand
upstreamSubscription.request(1);
} else {
if (debug.on())
debug.log("write: no need to request more: %d", d);
}
} catch (Throwable t) {
if (debug.on())
debug.log("write: error while requesting more: " + t);
signalError(t);
} finally {
debugState("leaving requestMore: ");
}
}
}
}
上面的程式碼封裝稍深,但在理解了響應式流的工作過程的情況下,仔細閱讀還是不難理解其邏輯:就是收到訂閱後做一層包裝,然後註冊一個「要求接收訂閱品」的事件,等待選擇器管理執行緒分發執行該事件,向上游訂閱增加一個「需求」;上游的Http1Publisher便會開始將緩衝隊列中的請求分片數據通過subscriber.onNext方法交給訂閱者(當前這個InternalWriteSubscriber)。我們回看Http1Publisher的內部類里的內部類Http1WriteSubscription,即前面提到的「上游訂閱” 在接到request(1)時會如何操作:
final class Http1WriteSubscription implements Flow.Subscription {
@Override
public void request(long n) {
if (cancelled)
return; //no-op
//這裡對Http1Publisher裡面的成員變數demand增加1,即增加了需求
demand.increase(n);
if (debug.on())
debug.log("subscription request(%d), demand=%s", n, demand);
//啟動上面提過的」從緩衝隊列取出數據交給訂閱者「的寫任務
writeScheduler.runOrSchedule(client.theExecutor());
}
@Override
public void cancel() {
if (debug.on()) debug.log("subscription cancelled");
if (cancelled)
return; //no-op
cancelled = true;
writeScheduler.runOrSchedule(client.theExecutor());
}
}
至此,Http1請求頭(也包括請求體)的發送啟動流程已經被介紹完畢了。接下來,我們還是要關注訂閱者(SocketTube內的InternalWriteSubscriber)的onNext方法,看看它是如何寫入數據到socket通道的。
5.3 寫入數據到NIO-Socket通道
我們接著關注InternalWriteSubscriber的onNext方法,裡面涉及向發布者請求數據並向socket通道寫入的操作:
final class SocketTube implements FlowTube {
//省略………………
//請求頭和請求體的訂閱者
private final class InternalWriteSubscriber
implements Flow.Subscriber<List<ByteBuffer>> {
volatile WriteSubscription subscription;
//當前持有的請求數據(請求頭/請求體)
volatile List<ByteBuffer> current;
volatile boolean completed;
final AsyncTriggerEvent startSubscription =
new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
//該方法上節分析過了
}
//這是我們要分析的方法
@Override
public void onNext(List<ByteBuffer> bufs) {
assert current == null : dbgString() // this is a queue of 1.
+ "w.onNext current: " + current;
assert subscription != null : dbgString()
+ "w.onNext: subscription is null";
current = bufs;
//將當前的請求數據沖刷到NIO socket通道
tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
// For instance in HTTP/2, a received SETTINGS frame might trigger
// the sending of a SETTINGS frame in turn which might cause
// onNext to be called from within the same selector thread that the
// original SETTINGS frames arrived on. If rs is the read-subscriber
// and ws is the write-subscriber then the following can occur:
// ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
// client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
debugState("leaving w.onNext");
}
//沖刷數據到socket通道的方法
// If this method is invoked in the selector manager thread (because of
// a writeEvent), then the executor will be used to invoke request(1),
// ensuring that onNext() won't be invoked from within the selector
// thread. If not in the selector manager thread, then request(1) is
// invoked directly.
void tryFlushCurrent(boolean inSelectorThread) {
List<ByteBuffer> bufs = current;
if (bufs == null) return;
try {
assert inSelectorThread == client.isSelectorThread() :
"should " + (inSelectorThread ? "" : "not ")
+ " be in the selector thread";
//獲取待寫入的數據長度
long remaining = Utils.remaining(bufs);
if (debug.on()) debug.log("trying to write: %d", remaining);
//儘可能一次往socket通道寫入所有待寫入數據,然而由於nio通道可能已慢,
//在非阻塞模式下會「盡最大努力」寫入,之後直接返回已寫的位元組數
long written = writeAvailable(bufs);
if (debug.on()) debug.log("wrote: %d", written);
assert written >= 0 : "negative number of bytes written:" + written;
assert written <= remaining;
if (remaining - written == 0) {
//在本批請求數據已全部寫入的情況下,向發布者請求更多數據
current = null;
if (writeDemand.tryDecrement()) {
Runnable requestMore = this::requestMore;
if (inSelectorThread) {
assert client.isSelectorThread();
client.theExecutor().execute(requestMore);
} else {
assert !client.isSelectorThread();
requestMore.run();
}
}
} else {
//由於沒有全部寫完,說明nio通道已滿,那麼這時要註冊一個寫事件,
//等待寫就緒後由SelectorManager執行緒分發並處理事件,繼續寫入
resumeWriteEvent(inSelectorThread);
}
} catch (Throwable t) {
signalError(t);
}
}
//"最大努力"寫入方法
private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
final long remaining = Utils.remaining(srcs);
long written = 0;
while (remaining > written) {
try {
//最終調用的是java.nio.channels.SocketChannel的write(ByteBuffer[]) 方法,
//寫入請求數據到socket通道
long w = channel.write(srcs);
assert w >= 0 : "negative number of bytes written:" + w;
if (w == 0) {
break;
}
written += w;
} catch (IOException x) {
if (written == 0) {
// no bytes were written just throw
throw x;
} else {
// return how many bytes were written, will fail next time
break;
}
}
}
return written;
}
//調用外部類(SocketChannel)的「繼續寫事件」方法,註冊事件
void resumeWriteEvent(boolean inSelectorThread) {
if (debug.on()) debug.log("scheduling write event");
resumeEvent(writeEvent, this::signalError);
}
}
//方法所做的是視情況註冊或更新事件
private void resumeEvent(SocketFlowEvent event,
Consumer<Throwable> errorSignaler) {
boolean registrationRequired;
synchronized(lock) {
registrationRequired = !event.registered();
event.resume();
}
try {
if (registrationRequired) {
client.registerEvent(event);
} else {
client.eventUpdated(event);
}
} catch(Throwable t) {
errorSignaler.accept(t);
}
}
}
至此,我們已經看到了整個請求頭資訊寫入的閉環。
5.4 請求體的發送
請求體的發送和請求頭類似,寫入socket通道的主要步驟都在上面呈現了。不過,請求體的從調用者發送到Http1Publisher,又是另一個「發布——訂閱」的過程。我們將追蹤整個請求體寫入的過程。
我們先回顧下攜帶請求體時發送請求時的調用程式碼:
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("//openjdk.java.net/")) //設置目的url
.timeout(Duration.ofMinutes(1)) //超時時間
.header("Content-Type", "application/json") //設置請求體為json格式
//.POST(BodyPublishers.ofFile(Paths.get("file.json"))) //設置從json文件裡面讀取內容
.POST(HttpRequest.BodyPublishers.ofString( //發送json字元串
JSONUtil.toJsonStr(Map.of("country", "China"))))
.build()
可以看到,我們可以指定POST方法時的請求體,指定方式是傳入一個HttpRequest::BodyPublisher對象,具體操作是用字元串或者文件(還可以是輸入流或者另外的發布者)通過輔助工具類BodyPublishers去實例化它的不同實現類。HttpRequest中的BodyPublisher介面定義如下。
public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {
/**
* Returns the content length for this request body. May be zero
* if no request body being sent, greater than zero for a fixed
* length content, or less than zero for an unknown content length.
*
* <p> This method may be invoked before the publisher is subscribed to.
* This method may be invoked more than once by the HTTP client
* implementation, and MUST return the same constant value each time.
*
* @return the content length for this request body, if known
*/
long contentLength();
}
可以看到,BodyPublisher介面繼承了Flow.Publisher介面,充當響應式流模型中的發布者的角色。它的實現類分布在RequestPublisher類中,如下圖所示。後續將看到,該介面通過區分contentLength返回值是正數、負數還是0,來決定後續它的訂閱者是固定長度的還是流式訂閱者。
我們略微分析一下ByteArrayPublisher,它是作為調用者的我們最常用的BodyPublishers.ofString()返回的StringPublisher的父類。可以看到,位元組發布者ByteArrayPublisher裡面使用了byte數組存儲了調用者的數據,在接受訂閱時,生成一個真正的發布者PullPublisher,接受訂閱者的發布。
//該類也是BodyPublishers.ofString()返回的StringPublisher的父類
public static class ByteArrayPublisher implements BodyPublisher {
private final int length;
//從調用者接受到的位元組數組
private final byte[] content;
private final int offset;
private final int bufSize;
public ByteArrayPublisher(byte[] content) {
this(content, 0, content.length);
}
public ByteArrayPublisher(byte[] content, int offset, int length) {
this(content, offset, length, Utils.BUFSIZE);
}
/* bufSize exposed for testing purposes */
ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
this.content = content;
this.offset = offset;
this.length = length;
this.bufSize = bufSize;
}
//被訂閱時的操作,將位元組數組轉化為ByteBuffer
List<ByteBuffer> copy(byte[] content, int offset, int length) {
List<ByteBuffer> bufs = new ArrayList<>();
while (length > 0) {
ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));
int max = b.capacity();
int tocopy = Math.min(max, length);
b.put(content, offset, tocopy);
offset += tocopy;
length -= tocopy;
b.flip();
bufs.add(b);
}
return bufs;
}
//接受訂閱方法,該方法會在請求體發布後被調用,完成訂閱
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
List<ByteBuffer> copy = copy(content, offset, length);
//對於位元組型的Http請求體來說,這個才是真正的發布者
var delegate = new PullPublisher<>(copy);
delegate.subscribe(subscriber);
}
@Override
//這裡返回定長的請求體長度,意味著之後會創建固定長度的訂閱者
public long contentLength() {
return length;
}
}
在PullPublisher的subscribe方法中,會調用訂閱者的onSubscribe方法。
我們將目光轉向訂閱者。在請求頭髮送完成,並校驗407錯誤通過後,Http1Exchange會立刻進行請求體的發送,而訂閱的過程就發生在Exchange和ExchangeImpl的發送請求體相關方法中。調用的鏈路流程是:
Exchange::responseAsyncImpl0 -> Exchange::sendRequestBody -> Http1Exchange::sendBodyAsync
我們查看Http1Exchange和Http1Request中的相關方法:
/**
* Encapsulates one HTTP/1.1 request/response exchange.
*/
class Http1Exchange<T> extends ExchangeImpl<T> {
//省略…………
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
assert headersSentCF.isDone();
if (debug.on()) debug.log("sendBodyAsync");
try {
//如果有請求體,則裡面實例化了Http請求體的訂閱者,完成了訂閱操作
bodySubscriber = requestAction.continueRequest();
if (debug.on()) debug.log("bodySubscriber is %s",
bodySubscriber == null ? null : bodySubscriber.getClass());
if (bodySubscriber == null) {
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
//沒有請求體發送,直接發送「完成」的標誌位到前面提到過的緩衝隊列中,
//即是把「完成」這個標誌給了Http1Publisher這個前面提到過的
//連接socketTube的下一個發布者
appendToOutgoing(Http1BodySubscriber.COMPLETED);
} else {
// start
bodySubscriber.whenSubscribed
.thenAccept((s) -> cancelIfFailed(s))
//非同步訂閱成功後,向發布者請求Http請求體
.thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
}
//省略…………
}
//表示Http1具體請求動作的外部類,和Http1Exchange相互引用
class Http1Request {
//省略…………
Http1BodySubscriber continueRequest() {
Http1BodySubscriber subscriber;
if (streaming) {
//如果是流式的請求體,就使用這個流式訂閱者,否則採用定長的訂閱者
subscriber = new StreamSubscriber();
requestPublisher.subscribe(subscriber);
} else {
if (contentLength == 0)
return null;
subscriber = new FixedContentSubscriber();
//完成上面對BodyPublisher的訂閱
requestPublisher.subscribe(subscriber);
}
return subscriber;
}
//省略…………
}
再看下作為訂閱者之一的FixedContentSubscriber,其位於Http1Request中,我們重點關注下其onNext(ByteBuffer item)方法:
//該方法訂閱成功後就會立即被調用
@Override
public void onNext(ByteBuffer item) {
if (debug.on()) debug.log("onNext");
Objects.requireNonNull(item);
if (complete) {
Throwable t = new IllegalStateException("subscription already completed");
http1Exchange.appendToOutgoing(t);
} else {
long writing = item.remaining();
long written = (contentWritten += writing);
if (written > contentLength) {
cancelSubscription();
String msg = connection.getConnectionFlow()
+ " [" + Thread.currentThread().getName() +"] "
+ "Too many bytes in request body. Expected: "
+ contentLength + ", got: " + written;
http1Exchange.appendToOutgoing(new IOException(msg));
} else {
//將請求體放到上文提到過的緩衝隊列中,交給了下一個響應式流,
//等待被下一個流沖刷到socket通道中
http1Exchange.appendToOutgoing(List.of(item));
}
}
}
到這裡,發送請求體的響應式過程便也一目了然了。
5.5 發送小結
請求頭和請求體的發送過程類似,但也略有差別。以無加密的Http1.1連接為例,它們的前置條件都是請求內容的發布者和SocketTube間建立了雙向訂閱關係。
請求頭的發送過程如下:
- 從用戶請求中過濾並構建請求頭
- 將請求頭放入緩衝隊列
- 建立雙向訂閱關係後,SocketTube中寫訂閱者向Http1Publisher發布者請求請求頭數據
- Http1Publisher發布者將請求頭從緩衝隊列中取出,交給SocketTube中的寫訂閱者
- 寫訂閱者向socket通道寫入請求頭數據
請求體的發送過程如下:
- 根據調用方傳入的請求體實例化不同的請求體發布者
- 在請求頭髮布完成後,根據請求體情況實例化定長或流式請求體訂閱者
- 請求頭髮布者接受請求體訂閱者的訂閱,請求體訂閱者請求請求頭資訊
- 請求體訂閱者將請求體數據放入緩衝隊列,通知Http1Publisher啟動發布——訂閱任務
- Http1Publisher發布者將請求體從緩衝隊列中取出,交給SocketTube中的寫訂閱者
- 寫訂閱者向socket通道寫入請求體數據
兩者的不同,主要還是在於請求體的寫入過程涉及了兩個「發布——訂閱」過程,而請求頭只涉及一個。
6. 響應的創建和響應頭的解析
由於一個用戶請求由於重定向等原因可能生成多個請求——響應交換,HttpClient總是在每個響應收到時,只解析請求頭,只有確定這個請求是最終請求時,才會解析響應體。
毫無疑問的是,響應頭(和響應體)的解析過程又是一輪響應式流的「發布——訂閱」過程。發布者是SocketTube中的InternalReadPublisher,訂閱者則是Http1AsyncReceiver中的Http1TubeSubscriber。不過,接收到響應內容的Http1TubeScriber,會將響應內容放入一個隊列,等待後續的消費者去解析、處理,這時候便是Reader、Parser等一系列組件的舞台了。
相對請求來說,響應的接收和解析更加複雜凌亂。我們首先介紹下涉及響應解析的幾個類的功能,再追蹤響應頭解析的過程。
6.1 發布和訂閱者介紹
下圖是Http1Response和Http1AsyncReceiver的類圖,以及涉及響應頭和響應體解析的幾個組件的功能介紹:
類名 | 父類(介面) | 外部類 | 角色 | 功能 |
---|---|---|---|---|
Http1TubeSunscriber | TubeSubscriber(Flow.Sunscriber) | Http1AsyncReceiver | 響應數據的直接接收者 | 接收SocketTube收到並發布的響應數據 |
Http1AsyncDelegate | – | Http1AsyncReceiver | 抽象接收介面 | 接收或處理上游Http響應數據 |
HttpAsyncDelegateSubscription | Abstraction(Flow.Subscription) | Http1AsyncReceiver | 代理訂閱關係 | – |
HttpAsyncReceiver | – | – | 響應數據接收輔助類 | 輔助接收響應數據,同時 |
Http1Response | – | – | Http1響應對象 | 處理一個Http1.1響應對象 |
HeadersReader | Http1AsyncDelegate | Http1Response | 響應頭讀取者 | 參與ByteBuffer格式的響應頭的讀取 |
BodyReader | Http1AsyncDelegate | Http1Response | 響應體讀取者 | 參與ByteBuffer格式的響應體的讀取 |
Http1HeaderParser | – | – | Http1響應頭解析器 | 解析Http1響應頭文本資訊 |
BodyParser | Consumer介面 | ResponseContent | 響應體解析介面 | 解析Http1響應體 |
BodyHandler | – | HttpResponse | 響應體結果組裝者 | 組裝解析後的Http1響應體 |
6.2 響應頭解析
我們首先關注響應頭的解析過程。簡而言之:響應頭的解析鏈路如下:
-
在請求數據發送前,HeadersReader訂閱HttpAsyncReceiver,做好接收數據的準備,並告知需求量
-
SocketTube中的InternalReadPublisher從socket通道中讀取響應數據
-
SocketTube中的InternalReadPublisher將響應數據以發布——訂閱(響應式流)的方式交給Http1TubeSubscriber
-
Http1TubeSubscriber接到數據後,將響應頭數據交給HeadersReader處理
-
HeadersReader調用Http1HeaderParser完成響應頭的解析
-
切換為讀取響應體的狀態,並組裝響應對象Response返回
我們將逐一跟進。響應頭的解析可以追溯到Http1Exchange中,Http1Response的實例化過程。
在這裡先提一下,在HttpClient中,Response類和Http1Response類並不是繼承的關係:
- Response類表示響應頭和狀態碼
- Http1Response類表示一次Http1.1響應,包含響應頭和響應體,其中組合了一個Response的成員變數
我們首先看下Http1Response響應的實例化過程,前面已經簡要提到過,該過程位於Http1Exchange的sendHeadersAsync發送請求頭方法中。
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (debug.on()) debug.log("Sending headers only");
// If the first attempt to read something triggers EOF, or
// IOException("channel reset by peer"), we're going to retry.
// Instruct the asyncReceiver to throw ConnectionExpiredException
// to force a retry.
asyncReceiver.setRetryOnError(true);
//這裡提前創建了一個Http1Response
if (response == null) {
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (debug.on()) debug.log("response created in advance");
//省略…………
}
我們於是分析Http1Response的構造過程。該過程至關重要:響應頭讀取者對Http1響應體非同步接收者的訂閱、對響應頭的讀取和解析過程的準備,就潛藏在Http1Response的構造函數中。
class Http1Response<T> {
private volatile ResponseContent content;
private final HttpRequestImpl request;
private Response response;
private final HttpConnection connection;
private HttpHeaders headers;
private int responseCode;
//這裡維持了一個對Http1Exchange的引用,在HttpClient中,相互關聯關係比較常見
private final Http1Exchange<T> exchange;
private boolean return2Cache; // return connection to cache when finished
//響應頭讀取者
private final HeadersReader headersReader; // used to read the headers
//響應體讀取者
private final BodyReader bodyReader; // used to read the body
//Http響應內容的直接接收者(從socket管道)
private final Http1AsyncReceiver asyncReceiver;
private volatile EOFException eof;
//響應體解析者
private volatile BodyParser bodyParser;
// max number of bytes of (fixed length) body to ignore on redirect
private final static int MAX_IGNORE = 1024;
// Revisit: can we get rid of this?
static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
private volatile State readProgress = State.INITIAL;
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final static AtomicLong responseCount = new AtomicLong();
final long id = responseCount.incrementAndGet();
//響應頭解析者
private Http1HeaderParser hd;
Http1Response(HttpConnection conn,
Http1Exchange<T> exchange,
Http1AsyncReceiver asyncReceiver) {
this.readProgress = State.INITIAL;
this.request = exchange.request();
this.exchange = exchange;
this.connection = conn;
this.asyncReceiver = asyncReceiver;
//實例化了響應頭和響應體的讀取者
//advance回調方法用於讀取完響應頭或響應體時升級讀取狀態
headersReader = new HeadersReader(this::advance);
bodyReader = new BodyReader(this::advance);
hd = new Http1HeaderParser();
readProgress = State.READING_HEADERS;
//讓響應頭讀取者維持解析者的引用
headersReader.start(hd);
//讓響應頭讀取者訂閱Http1響應內容非同步接收者
asyncReceiver.subscribe(headersReader);
}
}
由於訂閱的過程和之前的類似,此處便不再跟隨進入。我們只需要知道headersReader此時已經準備好讀取響應頭數據了。
接下來便是SocketTube中的InternalReadPublisher從socket通道中讀取響應數據的過程。注意,這裡是一個編程上非同步,I/O上同步非阻塞的過程:在發送請求體後,當socket通道對應的選擇鍵可讀時,選擇器管理執行緒便會分發並執行讀事件,最終調用了SocketTube中的InternalReadPublisher中的InternalReadSubscription中的read()方法。
那麼,讀事件是在什麼時候註冊的呢?其實,讀時間的註冊就發生在前文提到的connectFlows()方法中。當雙向讀寫關係建立,Http1AsyncReceiver中的Http1TubeSubscriber訂閱SocketTube中的InternalReadPublisher時,Http1TubeSubscriber的onSubscribe方法最終經過多重曲折, 調用了subscription.request(1)方法。對應的訂閱InternalReadSubscription中,就註冊了一個讀事件到SelectorManager的待辦事件列表,即註冊到socket通道上。
我們簡要關注下SocketTube的內部類InternalReadPublisher的內部類InternalReadSubscription的request方法及read方法:
final class InternalReadSubscription implements Flow.Subscription {
private final Demand demand = new Demand();
final SequentialScheduler readScheduler;
private volatile boolean completed;
private final ReadEvent readEvent;
private final AsyncEvent subscribeEvent;
@Override
public final void request(long n) {
if (n > 0L) {
boolean wasFulfilled = demand.increase(n);
if (wasFulfilled) {
if (debug.on()) debug.log("got some demand for reading");
//該方法註冊了一個讀事件到通道上
resumeReadEvent();
// if demand has been changed from fulfilled
// to unfulfilled register read event;
}
} else {
signalError(new IllegalArgumentException("non-positive request"));
}
debugState("leaving request("+n+"): ");
}
/** The body of the task that runs in SequentialScheduler. */
final void read() {
// It is important to only call pauseReadEvent() when stopping
// the scheduler. The event is automatically paused before
// firing, and trying to pause it again could cause a race
// condition between this loop, which calls tryDecrementDemand(),
// and the thread that calls request(n), which will try to resume
// reading.
try {
while(!readScheduler.isStopped()) {
if (completed) return;
// make sure we have a subscriber
if (handlePending()) {
if (debug.on())
debug.log("pending subscriber subscribed");
return;
}
// If an error was signaled, we might not be in the
// the selector thread, and that is OK, because we
// will just call onError and return.
ReadSubscription current = subscription;
Throwable error = errorRef.get();
if (current == null) {
assert error != null;
if (debug.on())
debug.log("error raised before subscriber subscribed: %s",
(Object)error);
return;
}
TubeSubscriber subscriber = current.subscriber;
if (error != null) {
completed = true;
// safe to pause here because we're finished anyway.
pauseReadEvent();
if (debug.on())
debug.log("Sending error " + error
+ " to subscriber " + subscriber);
if (Log.channel()) {
Log.logChannel("Raising error with subscriber for {0}: {1}",
channelDescr(), error);
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
}
// If we reach here then we must be in the selector thread.
assert client.isSelectorThread();
if (demand.tryDecrement()) {
// we have demand.
try {
//這一步裡面是從socket通道中讀取可讀的響應數據
List<ByteBuffer> bytes = readAvailable(current.bufferSource);
if (bytes == EOF) {
//收到eof,說明該通道讀完成,在readAvailable方法中對應read()的結果為-1
if (!completed) {
if (debug.on()) debug.log("got read EOF");
if (Log.channel()) {
Log.logChannel("EOF read from channel: {0}",
channelDescr());
}
completed = true;
// safe to pause here because we're finished
// anyway.
//停止讀事件,並標記讀完成
pauseReadEvent();
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
return;
} else if (Utils.remaining(bytes) > 0) {
// the subscriber is responsible for offloading
// to another thread if needed.
if (debug.on())
debug.log("read bytes: " + Utils.remaining(bytes));
assert !current.completed;
//將接收到的數據交給Http1TubeSubscriber
subscriber.onNext(bytes);
// we could continue looping until the demand
// reaches 0. However, that would risk starving
// other connections (bound to other socket
// channels) - as other selected keys activated
// by the selector manager thread might be
// waiting for this event to terminate.
// So resume the read event and return now...
//這裡按照注釋,並沒有直接讓循環讀完,而是註冊一個讀事件等待再次分發,
//為的是其它通道的「公平」起見
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop after onNext: ");
return;
} else {
// nothing available!
if (debug.on()) debug.log("no more bytes available");
// re-increment the demand and resume the read
// event. This ensures that this loop is
// executed again when the socket becomes
// readable again.
//沒有讀到怎麼辦?說明該通道不可讀,註冊新的讀事件
demand.increase(1);
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
}
} catch (Throwable x) {
signalError(x);
continue;
}
} else {
if (debug.on()) debug.log("no more demand for reading");
// the event is paused just after firing, so it should
// still be paused here, unless the demand was just
// incremented from 0 to n, in which case, the
// event will be resumed, causing this loop to be
// invoked again when the socket becomes readable:
// This is what we want.
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no demand");
break;
}
}
} catch (Throwable t) {
if (debug.on()) debug.log("Unexpected exception in read loop", t);
signalError(t);
} finally {
if (readScheduler.isStopped()) {
if (debug.on()) debug.log("Read scheduler stopped");
if (Log.channel()) {
Log.logChannel("Stopped reading from channel {0}", channelDescr());
}
}
handlePending();
}
}
}
我們看到,在read()方法中,調用了subscriber.onNext(bytes)方法。我們關注作為訂閱者的Http1TubeSubscriber是如何處理接到的響應數據的。
我們一路追蹤,將會發現管道訂閱者Http1TubeSubscriber在收到響應數據後,將其放到了外部對象維護的一個隊列queue中,然後通知外部類Http1AsyncReceiver進行讀操作,並將數據交付給headersReader:
//Http1AsyncReceiver的內部類
final class Http1TubeSubscriber implements TubeSubscriber {
volatile Flow.Subscription subscription;
volatile boolean completed;
volatile boolean dropped;
@Override
public void onNext(List<ByteBuffer> item) {
canRequestMore.set(item.isEmpty());
for (ByteBuffer buffer : item) {
//調用了外部類Http1AsyncReceiver的asyncReceive方法
//我們將分析
asyncReceive(buffer);
}
}
}
/**
* A helper class that will queue up incoming data until the receiving
* side is ready to handle it.
*/
class Http1AsyncReceiver {
//暫存收到的響應數據的隊列
private final ConcurrentLinkedDeque<ByteBuffer> queue
= new ConcurrentLinkedDeque<>();
//一個運行取響應數據的調度器,我們重點關注flush方法
private final SequentialScheduler scheduler =
SequentialScheduler.lockingScheduler(this::flush);
final MinimalFuture<Void> whenFinished;
private final Executor executor;
//維持的對內部的管道訂閱者的實例的引用
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
//……省略
//該方法即為內部成員subscriber調用的外部方法,我們進行分析
// Callback: Consumer of ByteBuffer
private void asyncReceive(ByteBuffer buf) {
if (debug.on())
debug.log("Putting %s bytes into the queue", buf.remaining());
received.addAndGet(buf.remaining());
//將響應數據放入到緩衝隊列中
queue.offer(buf);
//調度進行flush方法,從隊列中取數據並進行後續的消費
//我們將深入分析
//注釋說這裡由於是SelectorManager執行緒直接分發的讀事件,
//為了防止阻塞選擇器管理執行緒,將該任務交給其它執行緒
// This callback is called from within the selector thread.
// Use an executor here to avoid doing the heavy lifting in the
// selector.
scheduler.runOrSchedule(executor);
}
//重點要分析的方法,進行數據從隊列的讀取和消費
private void flush() {
ByteBuffer buf;
try {
// we should not be running in the selector here,
// except if the custom Executor supplied to the client is
// something like (r) -> r.run();
assert !client.isSelectorThread()
|| !(client.theExecutor().delegate() instanceof ExecutorService) :
"Http1AsyncReceiver::flush should not run in the selector: "
+ Thread.currentThread().getName();
//該handle pending方法調用了headersReader的onsubscribe方法
// First check whether we have a pending delegate that has
// just subscribed, and if so, create a Subscription for it
// and call onSubscribe.
handlePendingDelegate();
//從隊列中取響應數據
// Then start emptying the queue, if possible.
while ((buf = queue.peek()) != null && !stopRequested) {
Http1AsyncDelegate delegate = this.delegate;
if (debug.on())
debug.log("Got %s bytes for delegate %s",
buf.remaining(), delegate);
if (!hasDemand(delegate)) {
// The scheduler will be invoked again later when the demand
// becomes positive.
return;
}
assert delegate != null;
if (debug.on())
debug.log("Forwarding %s bytes to delegate %s",
buf.remaining(), delegate);
// The delegate has demand: feed it the next buffer.
//注意,這一步即是將從隊列中取出的數據交給headersReader,即請求頭讀取者
//由於之前在Http1Response的構造函數中,headers訂閱了AsyncReceiver,
//delegate即指向headersReader。我們將進入分析該方法。
//該方法返回false,即表示接收解析完成當前數據,即解析完成響應頭,該次flush運行便會停止
//如此,便可保證切換響應體接收的正確性
if (!delegate.tryAsyncReceive(buf)) {
final long remaining = buf.remaining();
if (debug.on()) debug.log(() -> {
// If the scheduler is stopped, the queue may already
// be empty and the reference may already be released.
String remstr = scheduler.isStopped() ? "" :
" remaining in ref: "
+ remaining;
remstr += remstr
+ " total remaining: " + remaining();
return "Delegate done: " + remaining;
});
canRequestMore.set(false);
// The last buffer parsed may have remaining unparsed bytes.
// Don't take it out of the queue.
return; // done.
}
// removed parsed buffer from queue, and continue with next
// if available
ByteBuffer parsed = queue.remove();
canRequestMore.set(queue.isEmpty() && !stopRequested);
assert parsed == buf;
}
//隊列清空後,請求更多響應數據,這一步會觸發讀事件的註冊
//由於flush方法在onSubscribe()中也會被調用,實際上觸發了讀過程
// queue is empty: let's see if we should request more
checkRequestMore();
} catch (Throwable t) {
Throwable x = error;
if (x == null) error = t; // will be handled in the finally block
if (debug.on()) debug.log("Unexpected error caught in flush()", t);
} finally {
// Handles any pending error.
// The most recently subscribed delegate will get the error.
checkForErrors();
}
}
我們看到,作為Http響應內容直接接收者的Http1TubeSubscriber,在onNext(bytes)方法中,促成了headersReader對Http1AsyncReceiver的訂閱的完成,然後將響應數據(先是響應頭,然後是響應體)交給了headersReader。
我們追蹤headersReader的行為,最終調用的是headersReader的handle方法。該方法調用了parser的parse方法來解析響應頭。
@Override
final void handle(ByteBuffer b,
Http1HeaderParser parser,
CompletableFuture<State> cf) {
assert cf != null : "parsing not started";
assert parser != null : "no parser";
try {
count += b.remaining();
if (debug.on())
debug.log("Sending " + b.remaining() + "/" + b.capacity()
+ " bytes to header parser");
//調用了響應頭解析器的parse方法來解析響應頭
//其內部多次循環,使用狀態state記錄解析狀態,最終完成解析時返回true
if (parser.parse(b)) {
count -= b.remaining();
if (debug.on())
debug.log("Parsing headers completed. bytes=" + count);
//解析完成響應頭後,進行狀態的升級,即是讓Http1Response進入到讀取響應體的狀態
onComplete.accept(State.READING_HEADERS);
//將佔位符標誌為完成請求頭讀取
cf.complete(State.READING_HEADERS);
}
} catch (Throwable t) {
if (debug.on())
debug.log("Header parser failed to handle buffer: " + t);
cf.completeExceptionally(t);
}
}
在上面的程式碼中,我們看到,只是調用了一次解析器的parse方法,就完成了響應頭的解析。可是響應數據不是一次到齊的呀?其實,是Http1HeaderParser內部使用了一個狀態位state來支援parse方法多次被調用,在最終解析完成時返回true,否則返回false:
/**
* Parses HTTP/1.X status-line and headers from the given bytes. Must be
* called successive times, with additional data, until returns true.
*
* All given ByteBuffers will be consumed, until ( possibly ) the last one
* ( when true is returned ), which may not be fully consumed.
*
* @param input the ( partial ) header data
* @return true iff the end of the headers block has been reached
*/
boolean parse(ByteBuffer input) throws ProtocolException {
requireNonNull(input, "null input");
while (canContinueParsing(input)) {
//這裡通過state狀態維護了一個初值為initial的狀態機,一次調用parse,會根據讀取的內容,在while循環中更新狀態state,
//直到讀完input中可讀內容,state停在某個狀態。多次調用,state會繼續改變,直至讀取完成,方法返回true
switch (state) {
case INITIAL -> state = State.STATUS_LINE;
case STATUS_LINE -> readResumeStatusLine(input);
case STATUS_LINE_FOUND_CR, STATUS_LINE_FOUND_LF -> readStatusLineFeed(input);
case STATUS_LINE_END -> maybeStartHeaders(input);
case STATUS_LINE_END_CR, STATUS_LINE_END_LF -> maybeEndHeaders(input);
case HEADER -> readResumeHeader(input);
case HEADER_FOUND_CR, HEADER_FOUND_LF -> resumeOrLF(input);
case HEADER_FOUND_CR_LF -> resumeOrSecondCR(input);
case HEADER_FOUND_CR_LF_CR -> resumeOrEndHeaders(input);
default -> throw new InternalError("Unexpected state: " + state);
}
}
//僅在完成時返回true
return state == State.FINISHED;
}
以上,我們就完成了對響應頭讀取和解析的大部分過程。最後的過程是組裝一個Reponse對象。這一步就發生在Exchange類中,發送請求體之後。相關方法如下:
//Exchange類的sendReqeustBody方法,發送請求體然後調用具體實現類的接收並解析響應頭方法
// After sending the request headers, if no ProxyAuthorizationRequired
// was raised and the expectContinue flag is off, we can immediately
// send the request body and proceed.
private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
assert !request.expectContinue();
if (debug.on()) debug.log("sendRequestBody");
CompletableFuture<Response> cf = ex.sendBodyAsync()
//我們跟蹤進入該方法
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
//協議升級和日誌,協議升級對http1.1類型的exchange不適用
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
}
我們如果進入ExchangeImpl的實現類Http1Exchange的getResponseAsync方法,會發現其主要做的就是調用Http1Response的readHeadersAsync方法。於是,我們直接進入Http1Response的readHeadersAsync方法。
該方法定義解析Http響應頭後要進行的操作:
//Http1Response的readHeadersAsync方法
public CompletableFuture<Response> readHeadersAsync(Executor executor) {
if (debug.on())
debug.log("Reading Headers: (remaining: "
+ asyncReceiver.remaining() +") " + readProgress);
//第一次解析
if (firstTimeAround) {
if (debug.on()) debug.log("First time around");
firstTimeAround = false;
} else {
// with expect continue we will resume reading headers + body.
asyncReceiver.unsubscribe(bodyReader);
bodyReader.reset();
hd = new Http1HeaderParser();
readProgress = State.READING_HEADERS;
headersReader.reset();
headersReader.start(hd);
asyncReceiver.subscribe(headersReader);
}
//這一步是什麼呢?它返回了一個響應頭讀取的佔位符。我們稍後分解
CompletableFuture<State> cf = headersReader.completion();
assert cf != null : "parsing not started";
if (debug.on()) {
debug.log("headersReader is %s",
cf == null ? "not yet started"
: cf.isDone() ? "already completed"
: "not yet completed");
}
//定義好解析響應頭完成後的操作:返回一個Response對象
Function<State, Response> lambda = (State completed) -> {
assert completed == State.READING_HEADERS;
if (debug.on())
debug.log("Reading Headers: creating Response object;"
+ " state is now " + readProgress);
//解除響應頭讀取者對Http響應接收者的訂閱
asyncReceiver.unsubscribe(headersReader);
responseCode = hd.responseCode();
headers = hd.headers();
response = new Response(request,
exchange.getExchange(),
headers,
connection,
responseCode,
HTTP_1_1);
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
Log.dumpHeaders(sb, " ", headers);
Log.logHeaders(sb.toString());
}
return response;
};
//在解析完響應頭後執行該操作
if (executor != null) {
return cf.thenApplyAsync(lambda, executor);
} else {
return cf.thenApply(lambda);
}
}
由此,響應頭的接收和解析過程就已經分析完成。採用了非同步編程和非阻塞I/O的編程模式,也是該節的主要特點之一。
7. 響應體的解析
在上一節中看到,在解析響應頭完成時, Http1Response的狀態會升級為READING_BODY,然而,這並不意味著這一響應的響應體會被立刻解析返回。正如上一篇所呈現:一次用戶請求可能會帶來多次的請求——響應交換,只有當HttpClient應用響應頭過濾器,發現沒有新的請求生成,才會解析最終的響應。
我們將看到,對響應體的解析,又將引入新的響應式發布——訂閱過程。
先回顧下我們發起請求時的調用方式:在send()或者sendAsync()中傳入的第二個參數,是調用HttpResponse.BodyHandlers::ofString方法生成的一個對象。這個對象的作用,一句話說,是構造、生成出一個真正的響應體訂閱者。
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
還是從uml類圖開始:我們可以看到,BodyHandler是一個位於HttpResponse介面內部的函數式介面,它只有一個名為apply的方法,用於生成響應體的真正訂閱者:BodySubscriber的具體實現。HttpResponse里的兩個工具類HttpHandlers和HttpSubscribers,提供了便捷的靜態方法,分別用於生成BodyHandler和BodySubscriber的具體實現。
我們跟蹤一下調用BodyHandlers::ofString,發生了什麼:
public static BodyHandler<String> ofString() {
return (responseInfo) -> BodySubscribers.ofString(charsetFrom(responseInfo.headers()));
}
可以看到,如果該BodyHandler被調用,那麼將會響應頭中獲取字符集。我們再跟蹤BodySubscribers::ofString方法。
public static BodySubscriber<String> ofString(Charset charset) {
Objects.requireNonNull(charset);
return new ResponseSubscribers.ByteArraySubscriber<>(
bytes -> new String(bytes, charset)
);
}
可以看到,如果該BodyHandler被調用,會生成一個BodyHandler的具體實現ByteArraySubscriber。它是一個標準的Flow-api中的訂閱者,其中維持了一個接收響應體數據的緩衝隊列,以及一個用於收集響應數據的函數:finisher。響應數據在onNext方法中被接收後,放入緩衝列表received,再接收完成時,onComplete方法會使用finisher收集響應數據成為最終給用戶的結果。在上面的BodySubscribers::ofString方法中,finisher只是簡單地把收到的byte數組生成了字元串。
//BodyHandler的具體實現
public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
private final Function<byte[], T> finisher;
private final CompletableFuture<T> result = new MinimalFuture<>();
//響應體接收列表
private final List<ByteBuffer> received = new ArrayList<>();
private volatile Flow.Subscription subscription;
public ByteArraySubscriber(Function<byte[],T> finisher) {
this.finisher = finisher;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
subscription.cancel();
return;
}
this.subscription = subscription;
//接受訂閱時,請求接收最大長度的響應體數據
// We can handle whatever you've got
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(List<ByteBuffer> items) {
// incoming buffers are allocated by http client internally,
// and won't be used anywhere except this place.
// So it's free simply to store them for further processing.
assert Utils.hasRemaining(items);
//接收響應數據時,存到緩衝列表中
received.addAll(items);
}
@Override
public void onError(Throwable throwable) {
received.clear();
result.completeExceptionally(throwable);
}
static private byte[] join(List<ByteBuffer> bytes) {
int size = Utils.remaining(bytes, Integer.MAX_VALUE);
byte[] res = new byte[size];
int from = 0;
for (ByteBuffer b : bytes) {
int l = b.remaining();
b.get(res, from, l);
from += l;
}
return res;
}
@Override
public void onComplete() {
try {
//接收完成時,先將byteBuffer數據轉成byte數組,再收集
result.complete(finisher.apply(join(received)));
received.clear();
} catch (IllegalArgumentException e) {
result.completeExceptionally(e);
}
}
@Override
public CompletionStage<T> getBody() {
return result;
}
}
響應體訂閱者就分析完畢了。那麼,發布者是什麼呢?響應體處理的流程又是怎樣的呢?
我們回到HttpClient處理多重響應的後的方法:MultiExchange.responseAsync0(CompletableFuture
private CompletableFuture<HttpResponse<T>>
responseAsync0(CompletableFuture<Void> start) {
//之前分析的全部內容,包括對多次交換的管理、對單次請求的發送和響應接收、響應頭解析
//都發生在這個responseAsyncImpl方法中
return start.thenCompose( v -> responseAsyncImpl())
.thenCompose((Response r) -> {
//獲取當前最終的交換
Exchange<T> exch = getExchange();
//檢查204狀態碼,及無需(不可)有響應體的清清褲
if (bodyNotPermitted(r)) {
if (bodyIsPresent(r)) {
IOException ioe = new IOException(
"unexpected content length header with 204 response");
exch.cancel(ioe);
return MinimalFuture.failedFuture(ioe);
} else
//處理沒有響應體的情況
return handleNoBody(r, exch);
}
//我們著重要分析的方法
return exch.readBodyAsync(responseHandler)
.thenApply((T body) -> {
//解析響應體完成後,返回一個最終要給調用者的響應對象
this.response =
new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
return this.response;
});
}).exceptionallyCompose(this::whenCancelled);
}
我們看到,這過程調用了Exchange類的readBodyAsync(responseHandler)方法。
我們繼續跟蹤,發現直接調用了http1Exchange的responseAsync方法,我們跟隨進入。
@Override
CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
//應用傳入的BodyHandler,生成具體的的bodySubscriber
BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
response.responseHeaders(),
HTTP_1_1));
//讀取響應,我們將稍後進入
CompletableFuture<T> bodyCF = response.readBody(bs,
returnConnectionToPool,
executor);
return bodyCF;
}
Http1Response.readBody方法時我們分析的重點,我們跟隨進入:
//Http1Response.readBody方法
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
boolean return2Cache,
Executor executor) {
if (debug.on()) {
debug.log("readBody: return2Cache: " + return2Cache);
if (request.isWebSocket() && return2Cache && connection != null) {
debug.log("websocket connection will be returned to cache: "
+ connection.getClass() + "/" + connection );
}
}
assert !return2Cache || !request.isWebSocket();
this.return2Cache = return2Cache;
//這裡將我們傳入的BodyHandler生成的bodySubscriber包進了一個Http1BodySubscriber對象
//目的是防止發生錯誤時,onError方法被多次調用
//該subscriber做的基本只是代理轉發的作用,會把主要功能交給我們的BodySubscriber
final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
final CompletableFuture<U> cf = new MinimalFuture<>();
//確定content-length,可能為定長/-1(分塊)或-2(未知)
long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
final long clen = fixupContentLen(clen0);
//解除響應體讀取者對Http1響應非同步接收者的訂閱(其實之前解除過)
// expect-continue reads headers and body twice.
// if we reach here, we must reset the headersReader state.
asyncReceiver.unsubscribe(headersReader);
headersReader.reset();
ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
// We need to keep hold on the client facade until the
// tracker has been incremented.
connection.client().reference();
executor.execute(() -> {
try {
//生成Http1響應體對象
content = new ResponseContent(
connection, clen, headers, subscriber,
this::onFinished
);
if (cf.isCompletedExceptionally()) {
// if an error occurs during subscription
connection.close();
return;
}
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the body is fully read.
refCountTracker.acquire();
//響應體解析者實例化,入參是完成時的回調,會設置bodyReader的future佔位符為讀取完成
//會根據前面的clen0來確定是實例化那種類型(定長,分塊,不定長)的BodyParser
bodyParser = content.getBodyParser(
(t) -> {
try {
if (t != null) {
try {
subscriber.onError(t);
} finally {
cf.completeExceptionally(t);
}
}
} finally {
bodyReader.onComplete(t);
if (t != null) {
connection.close();
}
}
});
bodyReader.start(bodyParser);
CompletableFuture<State> bodyReaderCF = bodyReader.completion();
//設置響應體讀取者訂閱Http1響應非同步接收者
//裡面會調用flush方法,內部的handlePendingDelegate方法,
//會促使訂閱的完成
asyncReceiver.subscribe(bodyReader);
assert bodyReaderCF != null : "parsing not started";
// Make sure to keep a reference to asyncReceiver from
// within this
CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> {
t = Utils.getCompletionCause(t);
try {
if (t == null) {
if (debug.on()) debug.log("Finished reading body: " + s);
assert s == State.READING_BODY;
}
if (t != null) {
subscriber.onError(t);
cf.completeExceptionally(t);
}
} catch (Throwable x) {
// not supposed to happen
asyncReceiver.onReadError(x);
} finally {
// we're done: release the ref count for
// the current operation.
refCountTracker.tryRelease();
}
});
connection.addTrailingOperation(trailingOp);
} catch (Throwable t) {
if (debug.on()) debug.log("Failed reading body: " + t);
try {
subscriber.onError(t);
cf.completeExceptionally(t);
} finally {
asyncReceiver.onReadError(t);
}
} finally {
connection.client().unreference();
}
});
//獲取最終的響應體,對於ByteArraySubscriber而言,只是簡單取個result
ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> {
cf.completeExceptionally(t);
asyncReceiver.setRetryOnError(false);
asyncReceiver.onReadError(t);
});
return cf.whenComplete((s,t) -> {
if (t != null) {
// If an exception occurred, release the
// ref count for the current operation, as
// it may never be triggered otherwise
// (BodySubscriber ofInputStream)
// If there was no exception then the
// ref count will be/have been released when
// the last byte of the response is/was received
refCountTracker.tryRelease();
}
});
}
我們可以看到:Http1Response的readBody方法設置了響應體讀取者BodyReader對Http1AsyncReceiver的訂閱。在這之後,Http1AsyncReceiver接收到的數據便會源源不斷地交給bodyReader處理。這一過程,便發生在上一節提到的,flush方法中。在所有響應數據都被解析完成後,便會將最終的結果返回。
我們回顧下上節提到的Http1AsyncReceiver的flush方法,它的delegate.tryAsyncReceive方法要求我們關注BodyReader的執行過程。
//我們只截取一小段, 還是這個tryAsyncReceive方法
//由於設置了bodyReader訂閱Http1AsyncReceiver,此處delegate即為bodyReader
//if條件成立的結果,是響應體被接收並解析完成
if (!delegate.tryAsyncReceive(buf)) {
final long remaining = buf.remaining();
if (debug.on()) debug.log(() -> {
// If the scheduler is stopped, the queue may already
// be empty and the reference may already be released.
String remstr = scheduler.isStopped() ? "" :
" remaining in ref: "
+ remaining;
remstr += remstr
+ " total remaining: " + remaining();
return "Delegate done: " + remaining;
});
canRequestMore.set(false);
// The last buffer parsed may have remaining unparsed bytes.
// Don't take it out of the queue.
return; // done.
}
我們一路跟蹤,會發現最終調用的方法是BodyReader.handle重寫方法:
@Override
final void handle(ByteBuffer b,
BodyParser parser,
CompletableFuture<State> cf) {
assert cf != null : "parsing not started";
assert parser != null : "no parser";
try {
if (debug.on())
debug.log("Sending " + b.remaining() + "/" + b.capacity()
+ " bytes to body parser");
//最後還是調用了BodyParser的parse方法,和前面解析響應頭類似
//我們跟蹤進入
parser.accept(b);
} catch (Throwable t) {
if (debug.on())
debug.log("Body parser failed to handle buffer: " + t);
if (!cf.isDone()) {
cf.completeExceptionally(t);
}
}
}
我們跟隨parser.accept(b)這一行程式碼,看看響應體解析器是如何解析響應體的。前文提到:總共有三種類型的響應體解析器,分別是:
FixedLengthBodyParser,ChunkedBodyParser,UnknownLengthBodyParser
這次我們選取分塊的響應體解析器ChunkedBodyParser來看看解析過程。其實也是通過一個類型為ChunkState狀態來支援多次調用間的狀態流轉,每次都解析收到的數據,將解析後的結果交給bodySubscriber。
//下面的程式碼都位於ResponseContent內部類中
//這裡定義了分塊狀態枚舉
static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
//分塊的響應體解析器
class ChunkedBodyParser implements BodyParser {
final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
final Consumer<Throwable> onComplete;
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
volatile Throwable closedExceptionally;
volatile int partialChunklen = 0; // partially read chunk len
volatile int chunklen = -1; // number of bytes in chunk
volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
volatile boolean cr = false; // tryReadChunkLength has found CR
volatile int chunkext = 0; // number of bytes already read in the chunk extension
volatile int digits = 0; // number of chunkLength bytes already read
volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
//初始化分塊狀態為讀取分塊長度。通過該變數來完成讀取狀態的切換
volatile ChunkState state = ChunkState.READING_LENGTH; // current state
volatile AbstractSubscription sub;
ChunkedBodyParser(Consumer<Throwable> onComplete) {
this.onComplete = onComplete;
}
@Override
public void accept(ByteBuffer b) {
if (closedExceptionally != null) {
if (debug.on())
debug.log("already closed: " + closedExceptionally);
return;
}
// debugBuffer(b);
boolean completed = false;
try {
List<ByteBuffer> out = new ArrayList<>();
do {
//該方法讀取解析一個byteBuffer,根據換行符等來確定並切換分塊讀取狀態
//當它返回true,僅當所有分塊都被讀取完成
if (tryPushOneHunk(b, out)) {
// We're done! (true if the final chunk was parsed).
if (!out.isEmpty()) {
// push what we have and complete
// only reduce demand if we actually push something.
// we would not have come here if there was no
// demand.
boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand;
//最後一個分塊內容的交付給訂閱者,pusher即是上文提到了包裝了
//傳入的BodyHandler生成的BodySubscriber的Http1BodySubscriber
pusher.onNext(Collections.unmodifiableList(out));
if (debug.on()) debug.log("Chunks sent");
}
if (debug.on()) debug.log("done!");
assert closedExceptionally == null;
assert state == ChunkState.DONE;
//對asyncReceiver的清理
onFinished.run();
//所有分塊傳輸完成,通知訂閱者收集結果
pusher.onComplete();
if (debug.on()) debug.log("subscriber completed");
completed = true;
onComplete.accept(closedExceptionally); // should be null
break;
}
// the buffer may contain several hunks, and therefore
// we must loop while it's not exhausted.
} while (b.hasRemaining());
if (!completed && !out.isEmpty()) {
// push what we have.
// only reduce demand if we actually push something.
// we would not have come here if there was no
// demand.
boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand;
//將下一個分塊內容的交付給訂閱者,pusher即是上文提到了包裝了
//傳入的BodyHandler生成的BodySubscriber的Http1BodySubscribe
pusher.onNext(Collections.unmodifiableList(out));
if (debug.on()) debug.log("Chunk sent");
}
assert state == ChunkState.DONE || !b.hasRemaining();
} catch(Throwable t) {
if (debug.on())
debug.log("Error while processing buffer: %s", (Object)t );
closedExceptionally = t;
if (!completed) onComplete.accept(t);
}
}
}
總結一下,響應體的解析可以分為如下過程:
- SocketTube不間斷地從socket通道讀取響應數據給Http1TubeSubscriber,後者將數據放到外部對象Http1AsyncReceiver維持的一個數據緩衝隊列中
- 響應頭讀取解析結束後,Http1AsyncReceiver暫停從緩衝隊列搬運響應數據交付給下游訂閱者
- 根據用戶提供的響應體處理辦法實例化對應的訂閱者BodySubscriber,根據響應頭資訊實例化對應的BodyParser
- Http1AsyncReceiver接受bodyReader的訂閱,重啟對響應數據的搬運,將響應體交付給bodyReader
- bodyParser接收多次調用,解析響應體,將解析結果交給下游的bodySubscriber
- 響應體解析完成後,bodySubscriber組裝接收到的響應體數據成響應結果內容,返回
響應體數據的流轉,可以用一下數據流圖來描繪:
8. 小結
經過漫長的旅途,我們終於完整見識了單個無加密的Http1.1請求——響應過程的生命歷程。理解響應式流模型,是理解這一流程的關鍵。