代码之家  ›  专栏  ›  技术社区  ›  Igor Piddubnyi

apache flink-使用tumblingprocessingtimewindow和timecharacteristic.eventtime[关闭]

  •  -3
  • Igor Piddubnyi  · 技术社区  · 7 年前

    看起来TumblingProcessingTimeWindow总是使用“摄取时间”。 有没有办法强制事件时间加窗?

    我的用例非常简单,我接收到包含“事件时间戳”的事件,并希望它们基于事件时间进行聚合。

    例如,在以下代码中,我希望有2个输出:

    public class WindowExample {
    
    private static final SimpleDateFormat FORMAT = new SimpleDateFormat("HH:mm:ss");
    
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<Bean> beans = env.fromElements(
            new Bean(1, 1, "12:00:00"),
            new Bean(1, 2, "12:00:03"),
            new Bean(1, 1, "12:00:04"),  //window of 3 sec trigger here
            new Bean(1, 2, "12:00:05"),
            new Bean(1, 3, "12:00:06"),
            new Bean(1, 3, "12:00:07")   //window of 3 sec trigger here
        );
    
        beans.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Bean>() {
            @Override public long extractAscendingTimestamp(Bean element) {
                return element.getTs();
            }
        })
            .keyBy("id")
            .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
            .max("value")
            .addSink(new SinkFunction<Bean>() {
    
                @Override public void invoke(Bean value, Context context) {
                    System.out.println("Sync on: "+value);
                }
            });
        env.execute("Windowing test");
    }
    
    public static class Bean {
    
        private int id;
        private int value;
        private long ts;
    
        public Bean() {
        }
    
        Bean(int id, int value, String time) throws ParseException {
            this.id = id;
            this.value = value;
            this.ts = FORMAT.parse(time).toInstant().toEpochMilli();
        }
    
        long getTs() {
            return ts;
        }
        // other getters and setters
    }
    

    }

    1 回复  |  直到 7 年前
        1
  •  1
  •   David Anderson    7 年前

    Flink允许使用带有事件时间流的处理时间窗口,因为有合法的使用案例。但如果你真的想要事件时间窗口,你需要要求它。在这种情况下,你应该使用 TumblingEventTimeWindows .