代码之家  ›  专栏  ›  技术社区  ›  Peter Lyons

kafka连接接收器中的extractfield和parse json

  •  0
  • Peter Lyons  · 技术社区  · 7 年前

    我有一个mongodb的kafka connect流->kafka connect->elasticsearch端到端发送数据ok,但是有效负载文档是json编码的。这是我的源MongoDB文档。

    {
      "_id": "1541527535911",
      "enabled": true,
      "price": 15.99,
      "style": {
        "color": "blue"
      },
      "tags": [
        "shirt",
        "summer"
      ]
    }
    

    下面是我的MongoDB源连接器配置:

    {
      "name": "redacted",
      "config": {
        "connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
        "databases": "redacted.redacted",
        "initial.import": "true",
        "topic.prefix": "redacted",
        "tasks.max": "8",
        "batch.size": "1",
        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
        "key.serializer.schemas.enable": false,
        "value.serializer.schemas.enable": false,
        "compression.type": "none",
        "mongo.uri": "mongodb://redacted:27017/redacted",
        "analyze.schema": false,
        "schema.name": "__unused__",
        "transforms": "RenameTopic",
        "transforms.RenameTopic.type":
          "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.RenameTopic.regex": "redacted.redacted_Redacted",
        "transforms.RenameTopic.replacement": "redacted"
      }
    }
    

    在ElasticSearch中,结果是这样的:

    {
      "_index" : "redacted",
      "_type" : "kafka-connect",
      "_id" : "{\"schema\":{\"type\":\"string\",\"optional\":true},\"payload\":\"1541527535911\"}",
      "_score" : 1.0,
      "_source" : {
        "ts" : 1541527536,
        "inc" : 2,
        "id" : "1541527535911",
        "database" : "redacted",
        "op" : "i",
        "object" : "{ \"_id\" : \"1541527535911\", \"price\" : 15.99,
          \"enabled\" : true, \"tags\" : [\"shirt\", \"summer\"],
          \"style\" : { \"color\" : \"blue\" } }"
      }
    }
    

    我想使用两个单一消息转换:

    1. ExtractField 攫取 object ,这是一个json字符串
    2. 把json解析成一个对象,或者让普通的jsonconverter来处理它,只要它在elasticsearch中的结构是正确的。

    我试着用 萃取场 在我的接收器配置中,但是我看到kafka记录了这个错误

    kafka-connect_1       | org.apache.kafka.connect.errors.ConnectException:
    Bulk request failed: [{"type":"mapper_parsing_exception",
    "reason":"failed to parse", 
    "caused_by":{"type":"not_x_content_exception",
    "reason":"Compressor detection can only be called on some xcontent bytes or
    compressed xcontent bytes"}}]
    

    这是我的ElasticSearch接收器连接器配置。在这个版本中,我有一些工作,但我必须编写一个自定义的parsejson smt。它工作得很好,但是如果有更好的方法或者是用一些内置的东西(转换器,SMT,不管什么工作)来做的话,我很乐意看到。

    {
      "name": "redacted",
      "config": {
        "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "batch.size": 1,
        "connection.url": "http://redacted:9200",
        "key.converter.schemas.enable": true,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "schema.ignore": true,
        "tasks.max": "1",
        "topics": "redacted",
        "transforms": "ExtractFieldPayload,ExtractFieldObject,ParseJson,ReplaceId",
        "transforms.ExtractFieldPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.ExtractFieldPayload.field": "payload",
        "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.ExtractFieldObject.field": "object",
        "transforms.ParseJson.type": "reaction.kafka.connect.transforms.ParseJson",
        "transforms.ReplaceId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.ReplaceId.renames": "_id:id",
        "type.name": "kafka-connect",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
      }
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    7 年前

    我不确定你的Mongo连接器。我不认识这个类或配置…大多数人可能使用 Debezium Mongo connector

    不过,我会这样安排的

    "connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
    
    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
    "key.serializer.schemas.enable": false,
    "value.serializer.schemas.enable": true,
    

    这个 schemas.enable 重要的是,这样内部连接数据类就可以知道如何转换为其他格式。

    然后,在接收器中,再次需要使用json 扩散系数 序列化器(通过转换器),以便它创建完整对象而不是明文字符串,正如您在弹跳搜索中看到的那样。 {\"schema\":{\"type\":\"string\" )

    "connector.class":
      "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true
    

    如果这不起作用,那么您可能需要提前在elasticsearch中手动创建索引映射,以便它知道如何实际解析您发送的字符串

    推荐文章