我只是添加了一些
implicit
通过添加导入,它编译:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{Consumed, Produced}
import java.time.Duration
import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes.{Long, String}
def createTopology(conf: Config, properties: Properties): Topology = {
// here we have two implicits to choose, I pick the sessionWindowedSerde because it was in your code
// implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]
val builder: StreamsBuilder = new StreamsBuilder()
builder.stream("streams-plaintext-input")
.groupBy((_, word) => word)
.windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
.count()
.toStream.to("streams-pipe-output")
builder.build()
}
如果您看到错误:
模糊的隐式值
这意味着在你的范围内定义了多个
隐性的
满足所需类型。例如对象
org.apache.kafka.streams.scala.Serdes
有两个含义:
implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
new WindowedSerdes.TimeWindowedSerde[T](tSerde)
implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.SessionWindowedSerde[T] =
new WindowedSerdes.SessionWindowedSerde[T](tSerde)
哪里
TimeWindowedSerde
延伸
Serdes.WrapperSerde<Windowed<T>>
:
static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
和
SessionWindowedSerde
延伸
塞尔德斯。WrapperSerde<窗口<T>>
:
static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
它们都扩展了相同的类型
塞尔德斯。WrapperSerde<窗口<T>>
,
行中:
implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
根据
with
函数签名:
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
ProducedJ.`with`(keySerde, valueSerde)
我们期待一些
隐性的
价值为
Serde[Windowed[String]]
编译器不能选择一个,因为它们都是
Serde[窗口化[字符串]]
.
因此,如果你只是试图将两者添加到同一范围:
implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
你会看到的
ambiguous implicit values
再一次。
结论
:进口大捆时要小心
implicits
最佳做法是只导入
隐含
你需要的。