我在消费者网站上使用了用于Kafka和SpringKafka 1.3.5的Azure事件中心(出于兼容性原因)。这是我的配置:
@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("\${eventhubs.broker}") val eventHubsBroker: String,
@Value("\${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
@Value("\${eventhubs.consumer-group}") val consumerGroup: String) {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
factory.consumerFactory = consumerFactory
return factory
}
@Bean
fun consumerFactory(consumerConfigs: Map<String, Any>) =
DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"\$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
}
以及消费组件:
@Component
class NewMailEventConsumer {
@KafkaListener(topics = ["\${eventhubs.new-mails.topic-name}"])
fun newMails(newMailEvent: NewMailEvent) {
logger.info { "new mail event: $newMailEvent" }
}
companion object : KLogging()
}
data class NewMailEvent(val mailbox: String, val mailUuid: String)
当我用这个代码启动2个消费者应用程序时,我会看到奇怪的警告,这些警告永远不会结束:
Successfully joined group offer-application-bff-local with generation 5
web_1 | 2018-07-09 11:20:42.950 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:20:42.983 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:28.686 WARN 1 --- [ntainer
web_1 | 2018-07-09 11:21:28.687 WARN 1 --- [ntainer
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:28.688 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:29.670 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:43.099 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:43.131 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:43.344 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:43.345 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:43.375 INFO 1 --- [ntainer
web_1 | 2018-07-09 11:21:46.377 WARN 1 --- [ntainer
定期出现以下例外情况:
2018-07-09 11:36:21.602 WARN 1 --- [ntainer
web_1 | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer
web_1 |
web_1 | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
这个周期性的
Failed to send SSL Close message
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]
对于单一消费者来说,它就像一个魅力,没有警告,什么都没有。
有人知道哪里出了什么问题吗?