代码之家  ›  专栏  ›  技术社区  ›  igx

将生产者流连接到图形

  •  0
  • igx  · 技术社区  · 7 年前

    我是新使用akka streams kafka(以及一般的akka streams)的人。我试图构建一个图,以便将消息发布到不同的主题。 commitScaladsl

    object TestFoo {
      import akka.kafka.ProducerMessage.Message
      implicit val system = ActorSystem("test-kafka")
      implicit val materializer = ActorMaterializer()
      val evenNumbersTopic = "even_numbers"
      val allNumbersTopic = "all_numbers"
      lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
        .withBootstrapServers("localhost:9092")
        .withGroupId("group1")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      lazy val source =  Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
      val producerSettings = ProducerSettings(system,  new StringSerializer(), new StringSerializer())
        .withBootstrapServers("localhost:9092")
      val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
        import akka.stream.scaladsl.GraphDSL.Implicits._
        type TypedMessage =  Message[String, Int,CommittableOffset]
        val bcast = b.add(Broadcast[TypedMessage](2))
        val merge = b.add(Merge[TypedMessage](2))
    
        val evenFilter = Flow[TypedMessage].filter (  c => c.record.value() % 2 == 0)
        val justEven = Flow[TypedMessage].map{
          case Message(pr, offset) =>
          val r = new ProducerRecord[String, Int]("general", pr.value())
          Message(r, offset)
        }
        val allNumbers = Flow[TypedMessage].map{
          case Message(pr, offset) =>
          val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
          Message(r, offset)
        }
    
        val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
          val r = new ProducerRecord[String, Int]("general", msg.record.value())
          Message(r, msg.committableOffset)
        }
        source ~> toMsg ~> bcast
    
        bcast ~> evenFilter ~> justEven ~> merge
        bcast ~> allNumbers ~> merge
        merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
          result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
        }
        ClosedShape 
      })}
    
    1 回复  |  直到 7 年前
        1
  •  0
  •   Stefano Bonetti    7 年前

    PassThrough 从上一阶段键入。 Producer.flow

    merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
      result.message.passThrough.commitScaladsl()
    }
    

    我已经走了 K V 作为未绑定参数,请将生产者绑定到的任何键/值类型放在那里。如果您想正确连接上述代码,则需要匹配 producerSettings 输入来自合并阶段的内容。你需要这样的东西:

    val producerSettings = ProducerSettings(system,  new StringSerializer(), new JsonSerializer[Int])
        .withBootstrapServers("localhost:9092")