代码之家  ›  专栏  ›  技术社区  ›  Fizi

Kafka连接转换:从json字段中提取一个长值并插入为键

  •  1
  • Fizi  · 技术社区  · 6 年前

    {"APP_SETTING_ID":9,"APP_SETTING_NAME":"my_name","SETTING_KEY":"my_setting_key"}
    

    这是我的连接器文件

    name=data.app_setting
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    poll.interval.ms=500
    tasks.max=4
    mode=timestamp
    query=SELECT APP_SETTING_ID, APP_SETTING_NAME, SETTING_KEY,FROM MY_TABLE with (nolock)
    timestamp.column.name=LAST_MOD_DATE
    topic.prefix=data.app_setting
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    
    transforms=InsertKey
    transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
    transforms.InsertKey.fields=APP_SETTING_ID
    

    这确实添加了一个键,但它也是一个json格式,比如

    {"APP_SETTING_ID":9}
    

    我只想把9当作钥匙而不是地图。在数据库中,它作为长值存储。

    1 回复  |  直到 6 年前
        1
  •  5
  •   OneCricketeer Gabriele Mariotti    6 年前

    ValueToKey 获取值中的字段列表,并返回这些字段到其值的映射。

    我认为你需要第二次变换来只提取这些字段中的一个。

    transforms=ReplaceKey,ExtractKey
    
    # Replaces the key with fields in the value. Creates a map for all listed fields
    transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
    transforms.ReplaceKey.fields=APP_SETTING_ID
    
    # Extracts a specfic field from the key, assuming it's a map/struct type
    transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
    transforms.ExtractKey.field=APP_SETTING_ID