代码之家  ›  专栏  ›  技术社区  ›  sobychacko

KTable状态存储持久性

  •  0
  • sobychacko  · 技术社区  · 6 年前

    如果在具体化ktable时使用持久存储,那么在应用程序重新启动时,状态存储是否持久?例如,如果我使用以下内容:

    StreamsBuilder builder = new StreamsBuilder();
    KeyValueBytesStoreSupplier storeSupplier =      Stores.persistentKeyValueStore("queryable-store-name");
     KTable<Long,String> table = builder.table(
       "foo",
       Materialized.as(storeSupplier)
                   .withKeySerde(Serdes.Long())
                   .withValueSerde(Serdes.String())
    

    状态存储“可查询存储名称”是否可以通过重新启动时以前运行的状态访问?比方说,我将50条记录发送到主题foo,它在状态存储中实现。然后重新启动应用程序,状态存储区中还有50条记录吗?如果没有,有没有办法做到这一点?

    谢谢!

    1 回复  |  直到 6 年前
        1
  •  3
  •   Matus Cimerman    6 年前

    application-id

    final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);
    
    final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
            .map((key,value)->{
                ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
                return new KeyValue<>(newKey,value);
            })
            .filter((key,value)->key!=null)
            .groupByKey()
            .count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);