代码之家  ›  专栏  ›  技术社区  ›  Tom

按进程时间窗口分组不会产生任何结果

  •  0
  • Tom  · 技术社区  · 4 年前

    如果我运行下面的select,则不会产生任何结果

     SELECT
           TUMBLE_START(pt, INTERVAL '4' second),
           sum(price)
       FROM sourceTable
       GROUP BY TUMBLE(pt, INTERVAL '4' second)
    

    如果我运行下面的select,那么结果是正确的。

    tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()

    有人能帮忙看看吗?

    package org.example.sql4
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.TableResult
    import org.apache.flink.table.api.bridge.scala._
    import org.apache.flink.types.Row
    
    object Sql017_ProcessTimeAttributeDDLTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val tenv = StreamTableEnvironment.create(env)
    
        val ddl =
          """
          create table sourceTable(
          key STRING,
          `date` STRING, ---date is a key word
          price DOUBLE,
          pt as PROCTIME() ---processing time
          ) with (
            'connector' = 'filesystem',
            'path' = 'd:/stock.csv',
            'format' = 'csv'
          )
          """.stripMargin(' ')
        val result: TableResult = tenv.executeSql(ddl)
        result.print()
    
    //   The following query produces correct result
    //    tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
    
        val sql =
          """
           SELECT
               TUMBLE_START(pt, INTERVAL '4' second),
               sum(price)
           FROM sourceTable
           GROUP BY TUMBLE(pt, INTERVAL '4' second)
         """.stripMargin(' ')
    
        //this query doesn't produce any result
        tenv.sqlQuery(sql).toAppendStream[Row].print()
    
        env.execute()
    
        Thread.sleep(20000)
      }
    
    }
    

    这个股票.csv是:

    key1,2020-09-16 20:50:15,1
    key1,2020-09-16 20:50:12,2
    key1,2020-09-16 20:50:11,3
    key1,2020-09-16 20:50:18,4
    key1,2020-09-16 20:50:13,5
    key1,2020-09-16 20:50:20,6
    key1,2020-09-16 20:50:14,7
    key1,2020-09-16 20:50:22,8
    key1,2020-09-16 20:50:40,9
    
    0 回复  |  直到 4 年前
        1
  •  0
  •   David Anderson    4 年前

    我认为问题是,你的工作没有运行足够长的时间窗口有机会被触发。每次从历元开始的时间被窗口间隔平均整除时,都会触发处理时间窗口。除非您的作业至少运行4秒,否则不一定会经历窗口触发。