代码之家  ›  专栏  ›  技术社区  ›  skrshn

JAVA尝试在Flink中创建Kafka使用者时出现lang.NoClassDefFoundError(CheckpointedRestoring)

  •  1
  • skrshn  · 技术社区  · 8 年前

    我试图在Apache Flink中创建一个卡夫卡消费者。我一直在遵循Apache Flink文档中的设置和指南。但我得到了一个 JAVAlang.NoClassDefFoundError:org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring 错误

    错误是

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.agt.examples.KafkaConsumer.main(KafkaConsumer.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 30 more
    

    卡夫卡康萨默尔。类别为

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // For Apache Kafka Consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
    
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<String>("my-flink-topic", new SimpleStringSchema(), properties));
    
        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;
    
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        }).print();
        env.execute();
    

    pom。xml是

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>
    

    Flink版本是1.4.2,java版本是1.8,我使用的是intellij IDEA。我想这一定是因为依赖关系没有正确链接,但我无法理解这里的问题是什么。我检查了上面提到的类,但它不在我的库中的流/api/检查点中,当我在线检查时,它似乎是一个不推荐使用的类。我尝试了mvn清理安装、无效缓存和重新启动、生成源代码和更新文件夹,但仍然出现了相同的错误。我按照apache flink安装页面中的说明创建了该项目 https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/java_api_quickstart.html 但在我从不同的来源进行开发时,添加了一些其他依赖项。它是否与其中一个依赖项的版本相关?我在这里完全不知所措。

    非常感谢您的帮助:)

    1 回复  |  直到 8 年前
        1
  •  1
  •   Alex Chermenin    8 年前

    您的版本似乎不一致 flink-clients_2.10 flink-connector-kafka-0.8_2.10 依赖项。尝试使用 ${flink.version} 而不是 1.2.1 在以下部分:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>