代码之家  ›  专栏  ›  技术社区  ›  Davide Icardi i3arnon

akka stream kafka:找不到密钥“kafka clients”的配置设置

  •  1
  • Davide Icardi i3arnon  · 技术社区  · 7 年前

    我正在尝试创建一个简单的原型 Alpakka Kafka connector (Akka Stream Kafka) .

    运行应用程序时,出现以下错误:

    com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'kafka-clients'
    

    我有以下代码 ./src/main/scala/App.scala :

    import akka.Done
    import akka.actor.ActorSystem
    import akka.kafka.ProducerSettings
    import akka.kafka.scaladsl.Producer
    import akka.stream.{ActorMaterializer, Materializer}
    import akka.stream.scaladsl.Source
    import com.typesafe.config.ConfigFactory
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    import scala.concurrent.Future
    
    object App {
      def main(args: Array[String]): Unit = {
        println("Hello from producer")
    
        implicit val system = ActorSystem("fakeProducer")
        implicit val materializer: Materializer = ActorMaterializer()
    
        val config = system.settings.config // ConfigFactory.load()
    
        val producerSettings =
          ProducerSettings(config, new StringSerializer, new StringSerializer)
          .withBootstrapServers("localhost:9092")
    
        val done: Future[Done] =
          Source(1 to 100)
            .map(_.toString)
            .map(value => new ProducerRecord[String, String]("test-basic-numbers", value))
            .runWith(Producer.plainSink(producerSettings))
    
    
        println("Done")
      }
    }
    

    以下 build.sbt :

    name := "test-akka-stream"
    
    version := "0.1"
    
    scalaVersion := "2.11.8"
    
    libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
    

    我使用运行应用程序 sbt run . 我没有配置任何uber/assembly jar。

    我可能漏掉了一些显而易见的东西,但我看不见…我怀疑Akka依赖关系有问题。

    更新

    正如“终结者的寒冷呼唤”所建议的 ProducerSettings(system, new StringSerializer, new StringSerializer) (通过 ActorSystem 而不是配置)解决问题。我只是不明白这是设计的还是个错误。

    更新2

    我创造了一个 github issue 已经修好了。现在文档更准确,并解释了正确的创建方法。 ProducerSettings / ConsumerSettings .

    val config = system.settings.config.getConfig("akka.kafka.producer")
    val producerSettings =
          ProducerSettings(config, new StringSerializer, new StringSerializer)
          .withBootstrapServers("localhost:9092")
    

    或者你可以通过 小精灵 如上所述。

    4 回复  |  直到 7 年前
        1
  •  2
  •   Enno    7 年前

    感谢您在alpakka-kafka连接器项目中发现并提交问题。 文档现在已更新: https://doc.akka.io/docs/akka-stream-kafka/current/producer.html

        2
  •  1
  •   emran    7 年前

    通常我将所有配置保存在akkasystem实例中。我不使用alpakka来实现kafka,但我的许多实现都是基于源代码的。

    使用加载typesafe配置对象 val config = ConfigFactory.load() 然后通过 config val system = ActorSystem("fakeProducer", config) .

    最后,通过 system.settings.config ProducerSettings .

    当前代码不传递任何设置,因为您尚未将配置加载到AKKA系统中。你的 val config = system.settings.config 正在引用一个空配置,该配置没有kafka clients部分(最佳猜测)。

        3
  •  1
  •   Murray Todd Williams    7 年前

    我想我遇到了和你一样的问题(几乎是在同一时间),尽管我试图创建一个基本的“你好世界”卡夫卡消费者,而不是生产者。我猜你只是在看 Alpakka Kafka connector documentation 并遵循他们最初定义的示例

    val config = system.settings.config
    

    然后将其传递到新的consumersettings对象中。我想在线文档可能有缺陷,但我对akka streams(这是我第一次尝试通过示例学习)还不够熟悉,我没有资格搞清楚到底是什么对错。

    我试图创建和application.conf文件,然后执行configfactory.load(),然后在创建时手动将其传递给actorsystem,然后将该系统传递给consumersettings构造函数,关于丢失的“kafka客户端”的错误消失了,但显然我根本不需要这么做。正如您所说,只需传递'system'变量而不是'config'变量就可以了。

    希望这能帮助那些在黑暗中摸索的人。我不得不说,尽管Akka流周围有这么多的传言,但似乎确实缺少文档。我可能要写一篇博客文章,一旦我搞清楚这些东西!

        4
  •  1
  •   Davide Icardi i3arnon    7 年前

    谢谢你的回答。我已经做了一些进一步的研究,我试图总结如下:

    两个 ConsumerSettings ProducerSettings apply 需要 Config (见 here ActorSystem (见 here )

    问题是当使用 小精灵 代码是:

    val config = system.settings.config.getConfig("akka.kafka.consumer")
    apply(config, keyDeserializer, valueDeserializer) // call the other overload
    

    使用时 配置 代码是:

    val properties = ConfigSettings.parseKafkaClientsProperties(config.getConfig("kafka-clients"))
    

    所以当直接传递配置时,代码搜索 kafka-clients 属性,而不是在传递 听觉系统 代码检查 akka.kafka.consumer/akka.kafka.producer .

    最后,在创建 听觉系统 默认情况下,大多数设置都是从嵌入的 reference.conf 文件并与 application.conf 文件(如果有)。更多信息 here . 所以基本上,唯一需要设置的属性通常是 bootstrap.servers .

    所以你现在可以理解为什么 system.settings.config 代码不起作用。此配置实例已加载 reference.conf文件 (对于所有默认值,请参见 here 和习俗 application.conf文件 . 这个 卡夫卡客户 财产在里面 akka.kafka.consumer/akka.kafka.producer ,但代码直接检查 卡夫卡客户 .

    一些可能的解决方案:

    • 直接通过 听觉系统 使用另一个过载
    • 使用 system.settings.config.getConfig("akka.kafka.consumer")
    • 手动构造 配置 实例 卡夫卡客户 部分

    对我来说,问题是提供的官方文件 here 没有提到这些差异,并且提供的示例不完整和/或不精确。可能对于akka专家来说,这是很清楚的,但是对于新开发人员来说,这可能非常令人困惑。

    我已经创建了一个更“随时可用”的示例 in this gist 打开一个 issue .

    推荐文章