代码之家  ›  专栏  ›  技术社区  ›  user1870400

如何在Spark Streaming 2.3.1中将每条记录写入多个kafka主题?

  •  0
  • user1870400  · 技术社区  · 7 年前

    问题 here 不谈结构化流媒体案例。我正在寻找特定的结构化流媒体。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Rishi Saraf    7 年前

    不确定您使用的是java还是scala。下面是生成两个不同主题的消息的代码。你得打电话给你

    dataset.foreachPartition(partionsrows => {
          val props = new util.HashMap[String, Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer)
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String, String](props)
          partionsrows.foreach(row => {
            val offerId = row.get(0).toString.replace("[", "").replace("]", "")
            val message1 = new ProducerRecord[String, String]("topic1", "message")
            producer.send(message1)
            val message2 = new ProducerRecord[String, String]("topic2",  "message")
            producer.send(message2)
          })
        })
    
    推荐文章