Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10261

INSERT INTO does not work with ORDER BY clause

    Details

      Description

      It seems that INSERT INTO and ORDER BY do not work well together.

      An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.

      Example:

      @Test
        def testInsertIntoMemoryTable(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          val tEnv = TableEnvironment.getTableEnvironment(env)
          MemoryTableSourceSinkUtil.clear()
      
          val t = StreamTestData.getSmall3TupleDataStream(env)
              .assignAscendingTimestamps(x => x._2)
            .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
          tEnv.registerTable("sourceTable", t)
      
          val fieldNames = Array("d", "e", "f", "t")
          val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
            .asInstanceOf[Array[TypeInformation[_]]]
          val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
          tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
      
          val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
          tEnv.sqlUpdate(sql)
          env.execute()
      

      Error:

      java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
      FROM `sourceTable` AS `sourceTable`
      ORDER BY `a`
      ORDER BY `a`
      
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
      	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
      	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                xueyu xueyu
                Reporter:
                twalthr Timo Walther
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: