Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
It seems that INSERT INTO and ORDER BY do not work well together.
An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.
Example:
@Test def testInsertIntoMemoryTable(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) MemoryTableSourceSinkUtil.clear() val t = StreamTestData.getSmall3TupleDataStream(env) .assignAscendingTimestamps(x => x._2) .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("sourceTable", t) val fieldNames = Array("d", "e", "f", "t") val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) .asInstanceOf[Array[TypeInformation[_]]] val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink) val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a" tEnv.sqlUpdate(sql) env.execute()
Error:
java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime` FROM `sourceTable` AS `sourceTable` ORDER BY `a` ORDER BY `a` at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557) at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104) at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717) at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683) at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Attachments
Issue Links
- links to