代码之家  ›  专栏  ›  技术社区  ›  Cecilia

映射器和还原器之间的类型不匹配

  •  1
  • Cecilia  · 技术社区  · 6 年前

    public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>

    • 输入:Text/Text

    public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>

    • 输入:文本/枚举(可记录,我自己的类)

    我得到以下运行时异常 java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable (代码后的完整堆栈跟踪)。

    Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text 但这会导致运行时异常: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable (代码后的完整堆栈跟踪)。

    这是我的密码

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableUtils;
    
    /**
     * Writable for enum Record
     */
    public class RecordWritable implements Writable{
        public static enum Record {BUY, CLICK};
        private Record data;
    
        public void set(Record data) {
            this.data = data;
        }
    
        public Record get() {
            return this.data;
        }
    
        public void readFields(DataInput dataInput) throws IOException {
            data = WritableUtils.readEnum(dataInput, Record.class); 
        }
    
        public void write(DataOutput dataOutput) throws IOException {
            WritableUtils.writeEnum(dataOutput,data);
        }
    }
    

    映射器/还原器和主

    import java.io.IOException;
    import java.util.Scanner;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class SuccessRate {
    
        /**
         * Mapper
         * - Key = ItemID
         * - Value = The type of record is determined by number of columns
         */
        public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{
    
            private Text itemID = new Text();
            private RecordWritable record = new RecordWritable();
            Pattern itemIDpattern = Pattern.compile("^(\\d+),");
            Pattern columnPattern = Pattern.compile(",");
    
            public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
                Scanner itr = new Scanner(value.toString());
                while (itr.hasNextLine()) {
                    String line = itr.nextLine();
    
                    String id = null;
                    Matcher m = itemIDpattern.matcher(line);
                    if(m.find())
                        id = m.group(1);
    
                    RecordWritable.Record fileType;
                    int count = StringUtils.countMatches(line, ",");
                    if(count==4)
                        fileType = RecordWritable.Record.CLICK;
                    else
                        fileType = RecordWritable.Record.BUY;
    
                    if(id != null) {
                        itemID.set(id);
                        record.set(fileType);
                        context.write(itemID, record);
                    }
                }
                itr.close();
            }
        }
    
        /**
         * Reducer
         * - Key : ItemID
         * - Value : sum of buys / sum of clicks
         */
        public static class JoinSumReducer
        extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
            private DoubleWritable result = new DoubleWritable();
    
            public void reduce(Text key, Iterable<RecordWritable> values,
                    Context context
                    ) throws IOException, InterruptedException {
                int sumClick = 0;
                int sumBuy = 0;
                for (RecordWritable val : values) {
                    switch(val.get()) {
                    case CLICK:
                        sumClick += 1;
                        break;
                    case BUY:
                        sumBuy += 1;
                        break;
                    }
                }
                result.set((double)sumBuy/(double)sumClick);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {       
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "success rate");
            job.setJarByClass(SuccessRate.class);
            job.setMapperClass(RecordMapper.class);
            job.setCombinerClass(JoinSumReducer.class);
            job.setReducerClass(JoinSumReducer.class);
    //      job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading https://stackoverflow.com/q/16926783/3303546
    //      job.setMapOutputValueClass(RecordWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

    完整异常堆栈跟踪

    原始错误

    java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
    Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at SuccessRate$RecordMapper.map(SuccessRate.java:54)
        at SuccessRate$RecordMapper.map(SuccessRate.java:26)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    映射值中的类型不匹配:应为org.apache.hadoop.io.nullwriteable,收到org.apache.hadoop.io.Text 但这会导致运行时异常:

    2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
    java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
        at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
        at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
        at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
        at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
        at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
        at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    2 回复  |  直到 6 年前
        1
  •  1
  •   HbnKing    6 年前

    MR中的工作流如下

    (输出键2、输出值2、输出键3、输出值3)

    每把钥匙;值数据类型必须匹配

    Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable

    是因为你有setCombiner job.setCombinerClass(JoinSumReducer.class)

    合路器输出与减速机不匹配
    您可以删除此行,然后重试。

        2
  •  0
  •   Community CDub    5 年前

    首先,应该在主方法体中取消对这两行的注释。因为:

    调用job.setOutputKeyClass(NullWritable.class);将类型设置为map和reduce阶段的输出。

    如果映射器发出的类型与Reducer不同,则可以使用JobConf的setmaputKeyClass()和setmaputValueClass()方法设置映射器发出的类型。它们隐式地设置了Reducer所期望的输入类型。

    其次,问题是您确实关心合并器的输入和输出。

    有关更多信息,建议如下: https://stackoverflow.com/a/14225304/2137378 https://stackoverflow.com/a/30548793/2137378