【Azure 事件中心】Event Hub 無法連接,出現 Did not observe any item or terminal signal within 60000ms in ‘flatMapMany’ 的錯誤消息

問題描述

使用Java SDK連接Azure Event Hub,一直出現 java.util.concurrent.TimeoutException 異常, 消息為:java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in ‘flatMapMany’ (and no fallback has been configured)。

且消息體中並沒有更多有效消息。

 ERROR .e.r.OpportunityResourceEventhubReceiver []: com.cbs.message.facade.eventhub.receive.OpportunityResourceEventhubReceiver.onError.partition:NONE. Exception:{}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

問題解答

如果使用Azure Event Hub官方的SDK, 可以通過設置日誌輸出級別為Info來查看更多詳細的日誌,這樣就可以查看更詳細的日誌輸出。比如:

2022-11-03 10:57:49.410  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:49.602  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh01"}
2022-11-03 10:57:49.603  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh01"}
2022-11-03 10:57:49.612  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:57:49.629  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:49.630  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh02"}
2022-11-03 10:57:49.698  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh02"}
2022-11-03 10:57:49.699  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"c8d655c2-d12d-4d14-a85e-e333273293d9"}
2022-11-03 10:57:49.712  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:49.713  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh02"}
2022-11-03 10:57:49.713  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh02"}
2022-11-03 10:57:49.714  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"2fd3a905-c39c-47ff-bc8d-e4b21301eeb3"}
2022-11-03 10:57:50.127  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:57:50.136  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh01"}
2022-11-03 10:57:51.016  INFO  ---  [pool-6-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.052  INFO  ---  [pool-6-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:51.060  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.061  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:51.061  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_22f21s_6940767444216","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.220  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.257  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:57:51.360  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"2fd3a905-c39c-47ff-bc8d-e4b21301eeb3"}
2022-11-03 10:57:51.360  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:57:51.361  INFO  ---  [pool-8-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.363  INFO  ---  [pool-8-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:51.364  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.366  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:51.366  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_0346b3_1667444269712","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.368  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.369  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:57:51.470  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"c8d655c2-d12d-4d14-a85e-e333273293d9"}
2022-11-03 10:57:51.470  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:57:51.471  INFO  ---  [pool-7-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.472  INFO  ---  [pool-7-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_93d32o_12dcdedeadcfe33","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.475  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.475  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:58:21.099  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:21.367  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:21.474  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:51.014  WARN  ---  [     parallel-2] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancing for event processor failed. Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
2022-11-03 10:58:51.017 ERROR  ---  [     parallel-2] c.c.m.f.e.r.OpportunityEventhubReceiver  []: com.cbs.message.facade.eventhub.receive.OpportunityEventhubReceiver.onError.partition:NONE. Exception:{}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2022-11-03 10:58:51.058  WARN  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onTransportError","connectionId":"MF_93d32o_12dcdedeadcfe33","errorCondition":"amqp:connection:framing-error","errorDescription":"org.apache.qpid.proton.engine.TransportException: connection aborted","hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:58:51.062  INFO  ---  [ctor-executor-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_93d32o_12dcdedeadcfe33","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"org.apache.qpid.proton.engine.TransportException: connection aborted, errorContext[NAMESPACE: test-eventhub.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A]"}
2022-11-03 10:58:51.075  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","state":"CLOSED","remoteState":"UNINITIALIZED"}
2022-11-03 10:58:51.077  INFO  ---  [ctor-executor-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Closing executor.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:58:51.099  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:58:51.100  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:58:51.101 ERROR  ---  [pool-6-thread-1] c.a.m.e.i.EventHubReactorAmqpConnection  []: {"az.sdk.message":"connectionId[MF_93d32o_12dcdedeadcfe33]: Connection is disposed. Cannot get management instance","exception":"connectionId[MF_22f21s_6940767444216]: Connection is disposed. Cannot get management instance","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:58:51.104  WARN  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancing for event processor failed.connectionId[MF_93d32o_12dcdedeadcfe33]: Connection is disposed. Cannot get management instance
2022-11-03 10:58:51.105 ERROR  ---  [pool-6-thread-1] c.c.m.f.e.r.OpportunityEventhubReceiver  []: com.cbs.message.facade.eventhub.receive.OpportunityEventhubReceiver.onError.partition:NONE. Exception:{}
java.lang.IllegalStateException: connectionId[MF_22f21s_6940767444216]: Connection is disposed. Cannot get management instance
    at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.getManagementNode(EventHubReactorAmqpConnection.java:90)
    at com.azure.messaging.eventhubs.EventHubAsyncClient.lambda$getProperties$0(EventHubAsyncClient.java:73)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:267)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:56)
    at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.resubscribe(FluxRepeatPredicate.java:119)
    at reactor.core.publisher.MonoRepeatPredicate.subscribeOrReturn(MonoRepeatPredicate.java:47)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8221)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8408)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8205)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8129)
    at com.azure.messaging.eventhubs.PartitionBasedLoadBalancer.loadBalance(PartitionBasedLoadBalancer.java:154)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

根據以上的日誌,可以看出:

1)在發生異常之前,有WARN日誌輸出,顯示  Load balancing for event processor failed.

2)在連接的資訊中,查看到應用連接的埠為 5671 

由於一直連接不上,就應該參考官網的客戶端連接問題排查訪問進行排查,第一步就是檢查埠 5671 是否能從執行 Java程式碼的主機上Ping 通。參考://docs.azure.cn/zh-cn/event-hubs/troubleshooting-guide#run-the-command-to-check-dropped-packets

 請運行以下命令,檢查是否存在任何丟棄的數據包或者無法ping通的問題:

.\psping.exe -n 25 -i 1 -q <yournamespacename>.servicebus.chinacloudapi.cn:5671 -nobanner     

在實際驗證中,以上錯誤就是發現 客戶端環境無法ping通 5671,5672埠。當在防火牆中放開這兩個埠後。 連接成功!

 

參考資料

排查連接問題 – Azure 事件中心: //docs.azure.cn/zh-cn/event-hubs/troubleshooting-guide#run-the-command-to-check-dropped-packets