代码之家  ›  专栏  ›  技术社区  ›  ForeverLearner

Kafka使用者总是给出java.nio.channels.ClosedChannelException

  •  0
  • ForeverLearner  · 技术社区  · 7 年前

    我正在尝试执行以下命令:

    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 101.10.51.1:9092,101.10.51.4:9092 --topic namespace_deep_archive_d_billing_transaction --time -2
    

    程序总是会出现以下错误:

    [2018-08-23 12:36:58,604] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(0,101.10.51.1,9092)] failed (kafka.client.ClientUtils$)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:124)
            at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
            at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
            at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:99)
            at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:98)
            at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
    [2018-08-23 12:36:59,616] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(1,101.10.51.4,9092)] failed (kafka.client.ClientUtils$)
    

    我在管理 getOffset 从另一个服务器。但是,这个服务器可以远程连接到卡夫卡经纪人。

    1 回复  |  直到 7 年前
        1
  •  1
  •   ForeverLearner    7 年前

    我查看了GetOffsetShell,并将问题追溯到 /etc/hosts

    这是GetOffsetShell.scala中的片段

       val url = new URI(options.valueOf(urlOpt))
        val topic = options.valueOf(topicOpt)
        val partition = options.valueOf(partitionOpt).intValue
        var time = options.valueOf(timeOpt).longValue
        val nOffsets = options.valueOf(nOffsetsOpt).intValue
        val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
    

    即使我传递代理的IP地址,它们也会被解析为相应的主机名。 消费者代码查找 如果找不到主机名和IP地址之间的映射,则会引发异常。

    在/etc/hosts中添加服务器名和IP后,代码现在能够从kafka代理获取偏移量并使用记录。

    参考: https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/SimpleConsumer.scala https://github.com/spujadas/elk-docker/issues/54

    推荐文章