如果我运行下面的select,则不会产生任何结果
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
sum(price)
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
如果我运行下面的select,那么结果是正确的。
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
有人能帮忙看看吗?
package org.example.sql4
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableResult
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
`date` STRING, ---date is a key word
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'd:/stock.csv',
'format' = 'csv'
)
""".stripMargin(' ')
val result: TableResult = tenv.executeSql(ddl)
result.print()
val sql =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
sum(price)
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
Thread.sleep(20000)
}
}
这个股票.csv是:
key1,2020-09-16 20:50:15,1
key1,2020-09-16 20:50:12,2
key1,2020-09-16 20:50:11,3
key1,2020-09-16 20:50:18,4
key1,2020-09-16 20:50:13,5
key1,2020-09-16 20:50:20,6
key1,2020-09-16 20:50:14,7
key1,2020-09-16 20:50:22,8
key1,2020-09-16 20:50:40,9