代码之家  ›  专栏  ›  技术社区  ›  Muhammad Affan

Kafka Connect中面临“由于序列化错误,将byte[]转换为Kafka Connect数据失败”

  •  0
  • Muhammad Affan  · 技术社区  · 3 年前

    在使用debezium作为源连接器和Kafka-Connect(Jdbc-Sink连接器)作为接收器连接器时,我面临着“由于序列化错误,将byte[]转换为Kafka-Connect数据失败”。我正在处理以下文件:

    1. connect-standalone-file.properties
    bootstrap.servers=localhost:9092
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=true
    
    offset.storage.file.filename=/tmp/connect.offsets
    
    plugin.path=/dir/kafka_2.12-3.2.3/libs
    
    
    1. Source Connator
    name=inventory-connector-test31
    connector.class=io.debezium.connector.oracle.OracleConnector
    tasks.max=1
    database.server.name=server1
    database.hostname=***.**.*.*
    database.port=1521
    database.user=username
    database.password=password
    database.dbname=dbname
    database.pdb.name=ORCLPDB1
    database.connection.adapter=logminer
    database.history.kafka.bootstrap.servers=localhost:9092
    database.history.kafka.topic=schema-changes.inventory
    #snapshot.mode=SCHEMA_ONLY_RECOVERY
    transforms=filter,route
    transforms.filter.type=io.debezium.transforms.Filter
    transforms.filter.language=jsr223.groovy
    transforms.filter.condition=value.source.table == 'CUSTOMERS' && ((value.before != null && value.before.ID >= 1000 && value.before.ID <= 2000) || (value.after != null && value.after.ID >= 1000 && value.after.ID <= 2000))
    transforms.filter.topic.regex=server1.DEBEZIUM.*
    transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
    transforms.route.replacement=$3
    
    1. 接收器连接器:
    name=students-sink14
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    topics=CUSTOMERS
    connection.url=jdbc:postgresql://ipaddr:5432/productorcl?user=username&password=pass
    transforms=unwrap
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones=false
    auto.create=true
    insert.mode=upsert
    delete.enabled=true
    pk.mode=record_key
    

    我面临的完全错误是:

    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
    Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
        ... 13 more
    Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
     at [Source: (byte[])"Struct{ID=1001}"; line: 1, column: 8]
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
        ... 17 more
    Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
     at [Source: (byte[])"Struct{ID=1001}"; line: 1, column: 8]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
        ... 18 more
    

    你能指出我哪里错了吗?

    0 回复  |  直到 3 年前