val builder = StreamsBuilder()
builder.table(CUSTOMERS_TOPIC, Materialized.`as`<String, Customer, KeyValueStore<Bytes, ByteArray>>(CUSTOMERS_STORE))
但是,为了序列化我的客户值类,我需要指定一个Json序列化程序。我可以使用StreamsBuilder中的此方法执行此操作:
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> consumed) {
消耗了这个:
Consumed.with(Serdes.String(), Serdes.serdeFrom(JsonPojoSerializer<Customer>(), JsonPojoDeserializer(Customer::class.java)
具有内部存储名称。注意,存储名称可能不可查询
通过交互式查询