在使用debezium作为源连接器和Kafka-Connect(Jdbc-Sink连接器)作为接收器连接器时,我面临着“由于序列化错误,将byte[]转换为Kafka-Connect数据失败”。我正在处理以下文件:
-
connect-standalone-file.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/dir/kafka_2.12-3.2.3/libs
-
Source Connator
name=inventory-connector-test31
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
database.server.name=server1
database.hostname=***.**.*.*
database.port=1521
database.user=username
database.password=password
database.dbname=dbname
database.pdb.name=ORCLPDB1
database.connection.adapter=logminer
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=schema-changes.inventory
#snapshot.mode=SCHEMA_ONLY_RECOVERY
transforms=filter,route
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.source.table == 'CUSTOMERS' && ((value.before != null && value.before.ID >= 1000 && value.before.ID <= 2000) || (value.after != null && value.after.ID >= 1000 && value.after.ID <= 2000))
transforms.filter.topic.regex=server1.DEBEZIUM.*
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
-
接收器连接器:
name=students-sink14
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=CUSTOMERS
connection.url=jdbc:postgresql://ipaddr:5432/productorcl?user=username&password=pass
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
auto.create=true
insert.mode=upsert
delete.enabled=true
pk.mode=record_key
我面临的完全错误是:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"Struct{ID=1001}"; line: 1, column: 8]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
... 17 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"Struct{ID=1001}"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
... 18 more
你能指出我哪里错了吗?