Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10261

INSERT INTO does not work with ORDER BY clause

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersStop watchingWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      It seems that INSERT INTO and ORDER BY do not work well together.

      An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.

      Example:

      @Test
        def testInsertIntoMemoryTable(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          val tEnv = TableEnvironment.getTableEnvironment(env)
          MemoryTableSourceSinkUtil.clear()
      
          val t = StreamTestData.getSmall3TupleDataStream(env)
              .assignAscendingTimestamps(x => x._2)
            .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
          tEnv.registerTable("sourceTable", t)
      
          val fieldNames = Array("d", "e", "f", "t")
          val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
            .asInstanceOf[Array[TypeInformation[_]]]
          val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
          tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
      
          val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
          tEnv.sqlUpdate(sql)
          env.execute()
      

      Error:

      java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
      FROM `sourceTable` AS `sourceTable`
      ORDER BY `a`
      ORDER BY `a`
      
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
      	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
      	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
      	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            xueyu xueyu
            twalthr Timo Walther
            Votes:
            0 Vote for this issue
            Watchers:
            4 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment