代码之家  ›  专栏  ›  技术社区  ›  Hongbo Miao

java.lang.ClassNotFoundException:Kafka Connect中的io.apicurio.registry.serde.avro.AvroKafkaSerializer

  •  0
  • Hongbo Miao  · 技术社区  · 3 年前

    我有一个由部署的Kafka集群 Strimzi 以及用于Kafka模式注册表的Apicurio注册表。

    我希望使用 AvroConverter 在JDBC接收连接器中,将数据从Kafka接收到TimescaleDB。

    这是我的Kafka Connect Dockerfile:

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ \
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
      && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
      && rm -f jdbc-connector-for-apache-kafka.zip
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    我的卡夫卡连接:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: hm-kafka-iot-kafka-connect
      namespace: hm-kafka
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
      image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
      replicas: 1
      bootstrapServers: hm-kafka-kafka-bootstrap:9093
      tls:
        trustedCertificates:
          - secretName: hm-kafka-cluster-ca-cert
            certificate: ca.crt
      config:
        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1
        config.providers: file
        config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
      externalConfiguration:
        volumes:
          - name: hm-iot-db-credentials-volume
            secret:
              secretName: hm-iot-db-credentials
    

    我的JDBC接收器连接器:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: hm-motor-jdbc-sink-kafka-connector
      namespace: hm-kafka
      labels:
        strimzi.io/cluster: hm-kafka-iot-kafka-connect
    spec:
      class: io.aiven.connect.jdbc.JdbcSinkConnector
      tasksMax: 32
      config:
        connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
        tasks.max: 32
        topics: hm.motor
        connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
        connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
        connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
        insert.mode: multi
        batch.size: 100000
    
        # table
        table.name.format: motor
    
        # timestamp
        transforms: convertTimestamp
        transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
        transforms.convertTimestamp.field: timestamp
        transforms.convertTimestamp.target.type: Timestamp
    
        # value
        value.converter: io.apicurio.registry.utils.converter.AvroConverter
        value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
        value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
        value.converter.apicurio.registry.as-confluent: true
    

    (注意与相关的配置 apicurio.registry 很可能也有问题。)

    然而,我遇到了这个错误(我们称之为错误1):

    错误1

    2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
    org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
      at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
      at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
      at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
      at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
      at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
      at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
      at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
      at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
      at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
      at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
      at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
      at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
      ... 6 more
    

    尝试1(修复错误1,成功)

    基于这个错误,我添加了 apicurio注册表utils转换器 在我的Kafka Connect Dockerfle中:

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ \
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
      && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
      && rm -f jdbc-connector-for-apache-kafka.zip \
    
      # apicurio-registry-utils-converter
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
      && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
      && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
      && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    现在可以成功定位 io.apicurio.registry.utils.converter.AvroConverter ,但是我有一个新错误。(我们称之为错误2)

    错误2

    2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values: 
      task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
     (org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
    2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
    2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
    java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
      at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
      at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
      at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
      at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
      at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
      at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:833)
    Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
      ... 10 more
    

    尝试2(修复错误2,失败)

    基于这个错误,我添加了 apicurio注册表serdes avro serde 在我的Kafka Connect Dockerfile中:

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ \
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
      && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
      && rm -f jdbc-connector-for-apache-kafka.zip \
    
      # apicurio-registry-utils-converter
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
      && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
      && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
      && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/ \
    
      # apicurio-registry-serdes-avro-serde
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
      && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar \
      && mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/ \
      && mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    但这一次,错误2仍然存在。

    apicurio-registry-serdes-avro-serde 似乎不是修复错误2的正确依赖项。什么是正确的依赖关系?谢谢

    尝试3(不同方向)

    我遵循了@OneCricketer的建议,切换到 kafka-connect-avro-converter 并与Apicurio Registry的Confluent兼容REST API端点一起使用 /apis/ccompat/v6/ 现在

    这是我要使用的Kafka Connect io.confluent.connect.avro.AvroConverter :

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
      # kafka-connect-avro-converter
      # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
      && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
      && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && rm -f kafka-connect-avro-converter.zip \
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    关于相应的JDBC接收器连接器配置,我有一个不同的问题 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx

    更新 :我找到了 Confluent Avro format is different with vanilla Apache Avro 导致 some inconvenience for Spark 以及其他工具。所以它们是两个不同的方向。除了汇流方向,我也会继续在这个方向上寻找解决方案。

    0 回复  |  直到 3 年前
        1
  •  2
  •   Hongbo Miao    3 年前

    问题是在我添加依赖项之前 apicurio-registry-utils-converter .

    然而,正确的是 apicurio-registry-distro-connect-converter .

    这是我要使用的最后一个Kafka Connect Dockerfile io.apicurio.registry.utils.converter.AvroConverter :

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ \
      # apicurio-registry-distro-connect-converter
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-distro-connect-converter
      && wget --no-verbose --output-document=apicurio-registry-distro-connect-converter.tar.gz https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.2.Final/apicurio-registry-distro-connect-converter-2.4.2.Final.tar.gz \
      && mkdir -p /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
      && tar -x -f apicurio-registry-distro-connect-converter.tar.gz -C /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
      && rm -f apicurio-registry-distro-connect-converter.tar.gz \
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    为了进行比较,以下是使用的方法 io.confluent.connect.avro.AvroConverter

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
      # kafka-connect-avro-converter
      # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
      && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
      && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && rm -f kafka-connect-avro-converter.zip \
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    
        2
  •  0
  •   OneCricketeer Gabriele Mariotti    3 年前

    apicurio-registry-serdes-avro-serde 依赖关系对于那个类是正确的。但它应该已经是Avro转换器包的一部分。

    但是(De)Serializer类不像Converter那样是Connect“插件”。您需要导出 CLASSPATH 环境变量,以包括放置JAR文件的额外目录

    我还建议不要在这里使用多阶段构建,除非在Strimzi图像中没有wget和unzip。此外,Apicurio Registry与Confluent Converter兼容,因此我建议使用 plugins 功能,无论如何

    推荐文章