代码之家  ›  专栏  ›  技术社区  ›  Spons

如何读取无效的JSON格式amazon firehose

  •  2
  • Spons  · 技术社区  · 6 年前

    我有一个最可怕的场景,我想读取kinesis firehose在S3上创建的文件。

    Kinesis firehose创建的文件不是每一个json对象都在一个新行上,而是一个json对象连接的文件。

    {"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}{"param1":"value1","param2":numericvalue2,"param3":"nested {bracket}"}

    现在这是一个普通JSON.parse不支持的场景,我已经尝试使用以下正则表达式:.scan(/({(\”)。 ?\":.

    但扫描似乎只在没有嵌套括号的情况下工作。

    有人知道解决这个问题的有效/更好/更优雅的方法吗?

    3 回复  |  直到 6 年前
        1
  •  2
  •   Spons    6 年前

    初始anwser中的一个用于不带引号的JSON,这种情况有时会发生。这个:

    ({((\\?\".*?\\?\")*?)})

    除此之外,为了使它更简单,这使它有了一些改进。。因为可以有整数值和正常值。。由于双重捕获组,字符串文本中的任何内容都将被忽略。

    https://regex101.com/r/kPSc0i/1

        2
  •  1
  •   Lars Haugseth    6 年前

    将输入修改为一个大型JSON数组,然后解析:

    input = File.read("input.json")
    json = "[#{input.rstrip.gsub(/\}\s*\{/, '},{')}]"
    data = JSON.parse(json)
    

    您可能希望将前两种方法结合起来以节省一些内存:

    json = "[#{File.read('input.json').rstrip.gsub(/\}\s*\{/, '},{')}]"
    data = JSON.parse(json)
    

    这是假设 } { 决不会在JSON编码数据的键或值内发生。

        3
  •  0
  •   mowienay    6 年前

    正如您在最近的评论中总结的那样,firehose中的put_records_批次要求您在记录中手动放置分隔符,以便消费者轻松解析。您可以添加一个新行或一些专门用于解析的特殊字符,例如,不应在有效负载中使用。

    import json
    def send_to_firehose(records):
      firehose_client = boto3.client('firehose')
      for record in records:
        data = json.dumps(record)
        firehose_client.put_record(DeliveryStreamName=<your stream>,
                                   Record={
                                           'Data': data
                                          }
                                  )
    

    默认情况下,Firehose在将数据发送到您的bucket之前会对数据进行缓冲,并且最终应该是这样的。这将很容易解析并加载到首选数据结构中的内存中。

    [
        {
            "metadata": {
                "schema_id": "4096"
            },
            "payload": {
                "zaza": 12,
                "price": 20,
                "message": "Testing sendnig the data in message attribute",
                "source": "coming routing to firehose"
            }
        },
        {
            "metadata": {
                "schema_id": "4096"
            },
            "payload": {
                "zaza": 12,
                "price": 20,
                "message": "Testing sendnig the data in message attribute",
                "source": "coming routing to firehose"
            }
        }
    ]