代码之家  ›  专栏  ›  技术社区  ›  Soheil Pourbafrani

Flink-在远程集群上运行WordCount示例时出错

  •  1
  • Soheil Pourbafrani  · 技术社区  · 8 年前

    我在VirtualBox上有一个Flink集群,包括三个节点,一个主节点和两个从节点。我定制了WordCount示例,并创建了一个胖jar文件来使用VirtualBox Flink远程集群运行它,但我遇到了错误。

    注意

    更多详细信息如下:

    这是我的Java代码:

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
    
    // *************************************************************************
    //     PROGRAM
    // *************************************************************************
    
    public static void main(String[] args) throws Exception {
    
        final ParameterTool params = ParameterTool.fromArgs(args);
        final int port;
        final String ip;
        DataSet<String> text;
        try {
            ip = params.get("ip");
            port = params.getInt("port");
            final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
            text = env.readTextFile(params.get("input"));
        } catch (Exception e) {
            System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
                    " --input <input>");
            return;
        }
    
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);
    
        System.out.println("Printing result to stdout. Use --output to specify output path.");
        counts.print();
    
    }
    
    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************
    
    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
    
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
    
    }
    

    我使用命令创建了ExecutionEnvironment对象:

    final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
    

    ,我在主机上使用以下命令运行代码(该主机连接到集群节点,VirtualBox正在该主机上运行)

    java -cp FlinkWordCountClusetr.jar WordCount --ip 192.168.101.10 --port 6123 --input /usr/local/flink/LICENSE
    

    Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
    Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
    

    我怎样才能解决这个问题?

    1 回复  |  直到 8 年前
        1
  •  0
  •   Soheil Pourbafrani    8 年前

    在尝试了许多设置之后,都是因为maven依赖项与远程集群上安装的Flink版本不匹配。Maven依赖项是Flink版本 1.3.2 基于Scala构建 2.10 ,而安装在远程群集上的Flink 1.3.2 基于Scala构建 2.11 . 只是一个微小的差异,但很重要!