代码之家  ›  专栏  ›  技术社区  ›  Felipe

为什么Flink中的MapState变量没有持久化以前的值?

  •  0
  • Felipe  · 技术社区  · 7 年前

    我正在用Java实现一个Flink程序,以使用 MapStateDescriptor source . 由于某种原因,这个 MapState averageTemp 里面总是空的,我从来没有找到钥匙。我的实现缺少什么?

    import java.util.Map;
    
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.sense.flink.mqtt.MqttTemperature;
    import org.sense.flink.mqtt.TemperatureMqttConsumer;
    
    public class SensorsMultipleReadingMqttEdgentQEP {
    
        public SensorsMultipleReadingMqttEdgentQEP() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    
            DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
            DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
            DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
            DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
                    .union(temperatureStream03);
    
            DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
                    .map(new AverageTempMapper());
            average.print();
    
            env.execute("SensorsMultipleReadingMqttEdgentQEP");
        }
    
        public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {
    
            private static final long serialVersionUID = 5905504239899133953L;
    
            @Override
            public Integer getKey(MqttTemperature value) throws Exception {
                return value.getId();
            }
        }
    
        public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {
    
            private static final long serialVersionUID = -5489672634096634902L;
            private MapState<String, Double> averageTemp;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                averageTemp = getRuntimeContext()
                        .getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
            }
    
            @Override
            public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
                String key = "no-room";
                Double temp = value.getTemp();
    
                if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
                    key = "room-A";
                } else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
                    key = "room-B";
                } else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
                    key = "room-C";
                }
                // NEVER ITERATES ON THE averageTemp
                for (Map.Entry<String, Double> entry: averageTemp.entries()) {
                    System.out.println(entry.getKey() + " - " + entry.getValue());
                }
    
                System.out.println("value: " + value);
                if (averageTemp.contains(key)) { // NEVER CONTAINS A KEY
                    System.out.println("yes: " + key);
                    temp = (averageTemp.get(key) + value.getTemp()) / 2;
                } else {
                    averageTemp.put(key, temp);
                }
                return Tuple2.of(key, temp);
            }
        }
    }
    

    **编辑:**好的。我误解了这个问题。代码正在MapState上保存以前的状态。我错了,因为我在调试代码。但实际上我遇到的问题是,它启动了多个线程,并且在开始计算平均值之前,它至少覆盖了我的映射值三次。

    1 回复  |  直到 7 年前
        1
  •  3
  •   kkrugler    7 年前

    映射函数中的状态位于 每键 原因因此,当调用map函数并获得map状态时,它将用于 MqttTemperature 正在处理的记录。

    考虑到每个房间的平均温度,我的处理方法如下:

    1. 改变 TemperatureKeySelector 归来 room-A , room-B room-C 基于id字段。
    2. AverageTempMapper ,有两个 ValueState 变量-一个是温度的总和(双精度),另一个是计数。当你的 map() 价值状态 变量为null,将其初始化为0,然后求和/递增。
    推荐文章