代码之家  ›  专栏  ›  技术社区  ›  Roman T

Kafka的Azure事件中心。2个来自同一群体的消费者进行了不折不扣的再平衡

  •  1
  • Roman T  · 技术社区  · 7 年前

    我在消费者网站上使用了用于Kafka和SpringKafka 1.3.5的Azure事件中心(出于兼容性原因)。这是我的配置:

    @EnableKafka
    @Configuration
    class EventHubsKafkaConfig(@Value("\${eventhubs.broker}") val eventHubsBroker: String,
                               @Value("\${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
                               @Value("\${eventhubs.consumer-group}") val consumerGroup: String) {
        @Bean
        fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
                ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
            val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
            factory.consumerFactory = consumerFactory
            return factory
        }
    
        @Bean
        fun consumerFactory(consumerConfigs: Map<String, Any>) =
                DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
                        JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))
    
        @Bean
        fun consumerConfigs(): Map<String, Any> {
            val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                    "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
    
            return mapOf(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                    ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                    SaslConfigs.SASL_MECHANISM to "PLAIN",
                    SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                            "username=\"\$ConnectionString\" password=\"$connectionString\";",
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
            )
        }
    }
    

    以及消费组件:

    @Component
    class NewMailEventConsumer {
        @KafkaListener(topics = ["\${eventhubs.new-mails.topic-name}"])
        fun newMails(newMailEvent: NewMailEvent) {
            logger.info { "new mail event: $newMailEvent" }
        }
    
        companion object : KLogging()
    }
    
    data class NewMailEvent(val mailbox: String, val mailUuid: String)
    

    当我用这个代码启动2个消费者应用程序时,我会看到奇怪的警告,这些警告永远不会结束:

    Successfully joined group offer-application-bff-local with generation 5
    web_1  | 2018-07-09 11:20:42.950  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
    web_1  | 2018-07-09 11:20:42.983  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
    web_1  | 2018-07-09 11:21:28.686  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    web_1  | 2018-07-09 11:21:28.687  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
    web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[offer-mail-crawler-new-mails-0]
    web_1  | 2018-07-09 11:21:28.688  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
    web_1  | 2018-07-09 11:21:29.670  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
    web_1  | 2018-07-09 11:21:43.099  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
    web_1  | 2018-07-09 11:21:43.131  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
    web_1  | 2018-07-09 11:21:43.344  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group offer-application-bff-local with generation 7
    web_1  | 2018-07-09 11:21:43.345  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
    web_1  | 2018-07-09 11:21:43.375  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
    web_1  | 2018-07-09 11:21:46.377  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    

    定期出现以下例外情况:

    2018-07-09 11:36:21.602  WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors  : Unexpected error code: 60.
    web_1  | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
    web_1  |
    web_1  | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
    web_1  |    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
    web_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    web_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    

    这个周期性的

    Failed to send SSL Close message 
    
    java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
        at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
        at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]
    

    对于单一消费者来说,它就像一个魅力,没有警告,什么都没有。 有人知道哪里出了什么问题吗?

    2 回复  |  直到 7 年前
        1
  •  2
  •   Roman T    7 年前

    最后,我发现了问题所在。 正如您在代码中看到的,我没有指定 client.id 卡夫卡消费者的财产。这对春季卡夫卡来说至关重要,因为它试图使用一些自动生成的 client.id = consumer-0 对于消费群内的两个消费者。这导致了两个同名用户之间分区的无限重新平衡。我需要把它设置成一个部分随机的字符串 ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}" 要使其正常工作:

    @Bean
        fun consumerConfigs(): Map<String, Any> {
            val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                    "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
    
            return mapOf(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                    ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
                    ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                    SaslConfigs.SASL_MECHANISM to "PLAIN",
                    SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                            "username=\"\$ConnectionString\" password=\"$connectionString\";",
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
            )
        }
    
        2
  •  0
  •   kkflf    7 年前

    使用相同组ID的使用者不能超过给定主题的分区数。

    例如,具有3个分区的主题可以有1-3个使用相同组ID的使用者。

    我假设你的主题只有一个分区,而这两个消费者一直在为这个资源而斗争。您要么删除其中一个使用者,要么在主题中添加一个额外的分区。