Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10261

INSERT INTO does not work with ORDER BY clause

    XMLWordPrintableJSON

Details

    Description

      It seems that INSERT INTO and ORDER BY do not work well together.

      An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.

      Example:

      @Test
        def testInsertIntoMemoryTable(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          val tEnv = TableEnvironment.getTableEnvironment(env)
          MemoryTableSourceSinkUtil.clear()
      
          val t = StreamTestData.getSmall3TupleDataStream(env)
              .assignAscendingTimestamps(x => x._2)
            .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
          tEnv.registerTable("sourceTable", t)
      
          val fieldNames = Array("d", "e", "f", "t")
          val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
            .asInstanceOf[Array[TypeInformation[_]]]
          val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
          tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
      
          val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
          tEnv.sqlUpdate(sql)
          env.execute()
      

      Error:

      java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
      FROM `sourceTable` AS `sourceTable`
      ORDER BY `a`
      ORDER BY `a`
      
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
      	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
      	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      

      Attachments

        Issue Links

          Activity

            People

              xueyu xueyu
              twalthr Timo Walther
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: