代码之家  ›  专栏  ›  技术社区  ›  r2evans

在不同类型的JSON值主题上定义KSQL结构

  •  0
  • r2evans  · 技术社区  · 7 年前

    ( :为更好地反映意图而进行的轻微编辑,但由于所取得的进展而进行的较大编辑。)

    "t_raw" 给定了多种类型的消息,其中所有消息都包含一个公共 "type" 关键:

    {"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
    {"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
    

    STRUCT 尽管如此,但我目前的努力让我做到了:

    create stream raw (type varchar, data varchar) \
      with (kafka_topic='t_raw', value_format='JSON');
    

    那么第一级呢

    create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
      select \
        extractjsonfield(data, '$.ts') as ts, \
        extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
      from raw where type='key1';
    create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
      select \
        extractjsonfield(data, '$.ts') as ts, \
        extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
        extractjsonfield(data, '$.d') as d \
      from raw where type='key2';
    

    这似乎是可行的,但最近增加了 结构 extractjsonfield 如上所述?

    ksql> select * from key1;
    1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
    1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
    ^CQuery terminated
    ksql> select * from key2;
    1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
    1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
    

    如果没有 结构 ksql 因此 标签)?

    有没有一种更为卡夫卡式/高效/优雅的方式来解析这一点? 我不能把它定义为空的 STRUCT<>

    ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
          WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
    line 1:52: extraneous input '<>' expecting {',', ')'}
    

    some (not-so-recent) discussion

    CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
      SELECT data->* from some_input where type = 'key1';
    

    仅供参考:上述解决方案在confluent-5.0.0中不起作用, a recent patch 修正了 错误并启用此解决方案。

    “类型” "data" 键(没有其他顶级键),并且几乎所有键都具有 "ts" 嵌套在 “数据” .

    1 回复  |  直到 7 年前
        1
  •  2
  •   Robin Moffatt    7 年前

    是的,您可以这样做-KSQL不介意列不存在,您只需要得到一个 null

    测试数据设置

    将一些测试数据填充到主题中:

    kafkacat -b kafka:29092 -t t_raw -P <<EOF
    {"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
    {"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
    {"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
    EOF
    

    ksql> PRINT 't_raw' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
    {"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
    {"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
    ^CTopic printing ceased
    ksql>
    

    对数据源流进行建模

    STRUCT 以及每个可能列的参考:

    CREATE STREAM T (TYPE VARCHAR, \
                     DATA STRUCT< \
                          TS VARCHAR, \
                          A INT, \
                          B VARCHAR, \
                          C INT, \
                          D VARCHAR>) \
            WITH (KAFKA_TOPIC='t_raw',\
                  VALUE_FORMAT='JSON');
    

    将offset设置为“最早”,以便查询整个主题,然后使用KSQL访问整个流:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    ksql>
    ksql> SELECT * FROM T;
    1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
    1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
    1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
    1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
    ^CQuery terminated
    

    -> 用于访问嵌套元素的运算符:

    ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
    1 | hello
    2 | hello2
    
    ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
    1 | 11 | goodbye
    3 | 22 | goodbye2
    

    将数据保存在单独的卡夫卡主题中:

    ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    

    新流的架构:

    ksql> DESCRIBE TYPE_1;
    
    Name                 : TYPE_1
    Field    | Type
    --------------------------------------
    ROWTIME  | BIGINT           (system)
    ROWKEY   | VARCHAR(STRING)  (system)
    DATA__TS | VARCHAR(STRING)
    DATA__A  | INTEGER
    DATA__B  | VARCHAR(STRING)
    --------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ksql> DESCRIBE TYPE_2;
    
    Name                 : TYPE_2
    Field    | Type
    --------------------------------------
    ROWTIME  | BIGINT           (system)
    ROWKEY   | VARCHAR(STRING)  (system)
    DATA__TS | VARCHAR(STRING)
    DATA__A  | INTEGER
    DATA__C  | INTEGER
    DATA__D  | VARCHAR(STRING)
    --------------------------------------
    

    主题是每个KSQL流的基础:

    ksql> LIST TOPICS;
    
    Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    ---------------------------------------------------------------------------------------------------------
    t_raw                       | true       | 1          | 1                  | 2         | 2
    TYPE_1                      | true       | 4          | 1                  | 0         | 0
    TYPE_2                      | true       | 4          | 1                  | 0         | 0
    ---------------------------------------------------------------------------------------------------------
    
    推荐文章