代码之家  ›  专栏  ›  技术社区  ›  codependent

如何创建一个KTable,指定一个json序列化器和一个存储名称及其具体化定义

  •  0
  • codependent  · 技术社区  · 7 年前

    val builder = StreamsBuilder()
            builder.table(CUSTOMERS_TOPIC, Materialized.`as`<String, Customer, KeyValueStore<Bytes, ByteArray>>(CUSTOMERS_STORE))
    

    但是,为了序列化我的客户值类,我需要指定一个Json序列化程序。我可以使用StreamsBuilder中的此方法执行此操作:

    public synchronized <K, V> KTable<K, V> table(final String topic,
                                                      final Consumed<K, V> consumed) {
    

    消耗了这个:

    Consumed.with(Serdes.String(), Serdes.serdeFrom(JsonPojoSerializer<Customer>(), JsonPojoDeserializer(Customer::class.java)

    具有内部存储名称。注意,存储名称可能不可查询 通过交互式查询

    1 回复  |  直到 7 年前
        1
  •  1
  •   Matthias J. Sax    7 年前

    在Scala API中,Serdes是通过隐式来解析的。这就是为什么没有超载可以通过 Consumed 参数。囊性纤维变性。 https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala#L88-L129

    对于Java,有一个方法重载允许您传入两个参数:

    public synchronized <K, V> KTable<K, V> table(final String topic,
                                                  final Consumed<K, V> consumed,
                                                  final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
    

    囊性纤维变性。 https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.kstream.Materialized-

    推荐文章