代码之家  ›  专栏  ›  技术社区  ›  Sid

需要根据某个关键字过滤掉Kafka记录

  •  0
  • Sid  · 技术社区  · 7 年前

    {
      "header": {
        "schemaVersionNo": "1",
      },
      "payload": {
        "modifiedDate": 1552334325212,
        "createdDate": 1552334325212,
        "createdBy": "A",
        "successful": true,
        "source_order_id": "3411976933214",
      }
    }
    

    现在我想筛选出一个具有特定源顺序id的记录,但无法找到正确的方法。 我们试过用镜头和卡夫卡工具。

    我们在镜头中尝试的一个示例查询如下:

    SELECT * FROM `TEST`
    WHERE _vtype='JSON' AND _ktype='BYTES'
    AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='fms'
    

    此查询有效,但是如果我们尝试使用如下所示的源id,则会得到一个错误:

    SELECT * FROM `TEST`
    WHERE _vtype='JSON' AND _ktype='BYTES'
    AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='3411976911924'
    
    
    
     Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
    

    0 回复  |  直到 7 年前
        1
  •  10
  •   Robin Moffatt    7 年前

    既然你说你对其他解决方案持开放态度,这里有一个 KSQL .

    首先,让我们将一些示例记录放入源主题:

    $ kafkacat -P -b localhost:9092 -t TEST <<EOF
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214" } }
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215" } }
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216" } }
    EOF
    

    使用KSQL,我们可以使用 PRINT

    ksql> PRINT 'TEST' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"}}
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"}}
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"}}
    

    然后在主题上声明一个模式,这使我们能够对它运行SQL:

    ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>, 
                              payload STRUCT<modifiedDate BIGINT, 
                                            createdDate BIGINT, 
                                            createdBy VARCHAR, 
                                            successful BOOLEAN, 
                                            source_order_id VARCHAR>) 
                              WITH (KAFKA_TOPIC='TEST', 
                                    VALUE_FORMAT='JSON');
    
    Message
    ----------------
    Stream created
    ----------------
    

    告诉KSQL使用主题中的所有数据:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    

    现在我们可以选择所有数据:

    ksql> SELECT * FROM TEST;
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215}
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
    ^CQuery terminated
    

    或者我们可以使用 -> 访问架构中嵌套字段的符号:

    ksql> SELECT * FROM TEST 
            WHERE PAYLOAD->CREATEDBY='A';
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
    

    ksql> SELECT payload FROM TEST 
            WHERE PAYLOAD->source_order_id='3411976933216';
    {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
    

    使用KSQL,您可以编写任何 SELECT 选择 声明:

    ksql> CREATE STREAM TEST_CREATED_BY_A AS
            SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    

    Kafka集群上的列表主题:

    ksql> SHOW TOPICS;
    
    Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    ----------------------------------------------------------------------------------------------------
    orders                 | true       | 1          | 1                  | 1         | 1
    pageviews              | false      | 1          | 1                  | 0         | 0
    products               | true       | 1          | 1                  | 1         | 1
    TEST                   | true       | 1          | 1                  | 1         | 1
    TEST_CREATED_BY_A      | true       | 4          | 1                  | 0         | 0
    

    打印新主题的内容:

    ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":{"SCHEMAVERSIONNO":"1"},"PAYLOAD":{"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"}}
    
    推荐文章