首页 技术 正文
技术 2022年11月15日
0 收藏 564 点赞 3,832 浏览 4647 个字

问题情形

使用Java SDK编写的Event Hub消费端应用,随机性遇见了某个分区没有消费消息的情况,在检查日志时候,有发现IdelTimeExpired的错误记录。在重启应用后,连接EventHub正常,并又能正常消费数据。比较怀疑的方面,在又开启Retry机制的情况下,为什么分区(Partition)连接断掉后没有重连呢?

错误消息:

{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":"", "url":"", "clientIp":"", 
"method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub",
"line":"EventHub.java:150",
"msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'.,
errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn,
PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]"}

消费端代码:

        eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.connectionString(currentEventHubProperty.getConnectionString(), this.topic)
.retry(retryOptions)
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
String currentData = "";
try {
EventData event = eventContext.getEventData();
PartitionContext partitionContext = eventContext.getPartitionContext(); EventMessage eventMessage = new EventMessage();
currentData = new String(event.getBody(), Charset.defaultCharset());
eventMessage.setContent(currentData);
eventMessage.setPartitionId(partitionContext.getPartitionId());
eventMessage.setSequenceNumber(event.getSequenceNumber());
log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic, partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime()); eventContext.updateCheckpoint();
} catch (Exception e) {
String msg = e.getMessage();
if (StringUtils.isBlank(msg)) {
msg = e.getStackTrace().toString();
}
log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData);
}
})
.processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage()))
.buildEventProcessorClient();

分析原因

第一步,需要根据日志来判断当前分区是否在问题时间点闲置了240秒,在此期间没有数据进入该分区中,如日志中有关于每一天消息进入Queue的时间(enqueued time),则可以通过日志分析,如果没有,这可以在代码日志中添加:(这是为了下一次发生问题时候,可以直接在日志中分析)

 log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());

而对于已经发生的问题,根据EventHub数据保留的设置,如果Event等信息还在保留时间期内,则可以通过SDK的receiveFromPartition方法来指定需要获取的数据范围,来查看其进入Queue的时间。(注:需要建一个不同的consumer group,不要用$Default,免得连不上),示例代码:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/5.2.0/index.html

Consume events from an Event Hub partition

To consume events, create an EventHubConsumerAsyncClient or EventHubConsumerClient for a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we create an asynchronous consumer that receives events from partitionId and only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling receiveFromPartition(String, EventPosition) with another partition id.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
// Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an EventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition’s event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();String partitionId = "<< EVENT HUB PARTITION ID >>";// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
System.out.println("Event: " + event.getData().getBodyAsString());
}

以上。 并没有发现问题是否是应用端逻辑问题还是是SDK端问题,在借鉴了GitHub上的很多相类似的情况后,大部分倾向于Java SDK问题。需要等待Github中的进一步更新:

AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://github.com/Azure/azure-sdk-for-java/issues/11233

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,964
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,486
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,331
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,114
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,747
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,781