代码之家  ›  专栏  ›  技术社区  ›  DilTeam

Kafka流:会话窗口服务器与时间窗口服务器。模糊的隐式值

  •  0
  • DilTeam  · 技术社区  · 4 年前

    我一直得到' 模糊的隐式值 '以下代码中的消息。我尝试了几件事(从我评论的几行可以看出)。有什么办法解决这个问题吗?这是在 Scala .

      def createTopology(conf: Config, properties: Properties): Topology = {
    //    implicit val sessionSerde = Serde[WindowedSerdes.SessionWindowedSerde[String]]
    //    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
        implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
        implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]
    
        val builder: StreamsBuilder = new StreamsBuilder()
        builder.stream("streams-plaintext-input")
            .groupBy((_, word) => word)
            .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
            .count()
            .toStream.to("streams-pipe-output")
    
        builder.build()
    
      }
    

    编译器错误:

    Error:(52, 78) ambiguous implicit values:
     both method timeWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
     and method sessionWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
     match expected type org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
        implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    
    Error:(52, 78) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
        implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    
    Error:(52, 78) not enough arguments for method with: (implicit keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], implicit valueSerde: org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].
    Unspecified value parameters keySerde, valueSerde.
        implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    
    0 回复  |  直到 4 年前
        1
  •  0
  •   Boris Azanov    4 年前

    我只是添加了一些 implicit 通过添加导入,它编译:

    import org.apache.kafka.common.serialization.Serde
    import org.apache.kafka.streams.Topology
    import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
    import org.apache.kafka.streams.scala.StreamsBuilder
    import org.apache.kafka.streams.scala.kstream.{Consumed, Produced}
    
    import java.time.Duration
    import java.util.Properties
    
    import org.apache.kafka.streams.scala.ImplicitConversions._
    import org.apache.kafka.streams.scala.Serdes
    import org.apache.kafka.streams.scala.Serdes.{Long, String}
    
    def createTopology(conf: Config, properties: Properties): Topology = {
      // here we have two implicits to choose, I pick the sessionWindowedSerde because it was in your code
      // implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
      implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
      implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
      implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]
    
      val builder: StreamsBuilder = new StreamsBuilder()
      builder.stream("streams-plaintext-input")
        .groupBy((_, word) => word)
        .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
        .count()
        .toStream.to("streams-pipe-output")
    
      builder.build()
    }
    

    如果您看到错误:

    模糊的隐式值

    这意味着在你的范围内定义了多个 隐性的 满足所需类型。例如对象 org.apache.kafka.streams.scala.Serdes 有两个含义:

    implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
        new WindowedSerdes.TimeWindowedSerde[T](tSerde)
    
    implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.SessionWindowedSerde[T] =
        new WindowedSerdes.SessionWindowedSerde[T](tSerde)
    

    哪里 TimeWindowedSerde 延伸 Serdes.WrapperSerde<Windowed<T>> :

    static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
    

    SessionWindowedSerde 延伸 塞尔德斯。WrapperSerde<窗口<T>> :

    static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
    

    它们都扩展了相同的类型 塞尔德斯。WrapperSerde<窗口<T>> , 行中:

    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    

    根据 with 函数签名:

    def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
        ProducedJ.`with`(keySerde, valueSerde)
    

    我们期待一些 隐性的 价值为 Serde[Windowed[String]] 编译器不能选择一个,因为它们都是 Serde[窗口化[字符串]] .

    因此,如果你只是试图将两者添加到同一范围:

    implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
    implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
    

    你会看到的

    ambiguous implicit values
    

    再一次。

    结论 :进口大捆时要小心 implicits 最佳做法是只导入 隐含 你需要的。