Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.4.2
-
None
-
None
-
java 8
flink 1.4.2
scala 2.11
Description
I have a stream like this: <_time(timestamp), uri(string), userId(int)>. The _time attribute is rowtime and I register it as a table:
tableEnv.registerDataStream("userVisitPage", stream, "_time.rowtime, uri,userId");
Then I query the table:
final String sql = "SELECT tumble_start(_time, interval '10' second) as timestart, " + " count(distinct userId) as uv, " + " uri as uri, " + " count(1) as pv " + "FROM userVisitPage " + "GROUP BY tumble(_time, interval '10' second), uri"; final Table table = tableEnv.sqlQuery(sql); tableEnv.toRetractStream(table, Row.class);
but occur exceptions:{{}}
2018-03-19 19:30:53,881 ERROR [com.qunhe.logcomplex.oceanus.util.TaskSubmitter] - main - submit task failed org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE If you think this function should be supported, you can create an issue and start a discussion for it. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1006) at scala.Option.getOrElse(Option.scala:121) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1006) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:234) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$7.apply(CodeGenerator.scala:321) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:321) at org.apache.flink.table.plan.nodes.CommonCalc$class.generateFunction(CommonCalc.scala:44) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.generateFunction(DataStreamCalc.scala:43) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113) at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupAggregate.translateToPlan(DataStreamGroupAggregate.scala:113) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:837) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:764) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:734) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:357
how can I implement this query