Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      When we run the tableAPI as follows:

      val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'date,'pojo, 'string)
          val windowedTable = table
            .join(udtf2('string) as ('a, 'b))
            .window(Slide over 5.milli every 2.milli on 'long as 'w)
            .groupBy('w)
            .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end)
      

      We will get the error message:

      org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
      	at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
      	at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
      	at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
      	at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column 62: Unknown variable or type "in2"
      	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523)
      	at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292)
      	at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209)
      	at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904)
      	at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901)
      	at org.codehaus.janino.Java$Package.accept(Java.java:4074)
      	at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901)
      	at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287)
      	at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209)
      

      The reason is val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo) `physicalTypeInfo` will remove the TimeIndicator.
      I think we should fix this. What do you think Fabian Hueske Timo Walther , And hope your suggestions.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Yes, this looks like a bug that needs to be fixed, IMO.
          If your analysis is correct, this would mean that we cannot use a window after a TableFunction was applied.

          Although this is a serious limitation of the Table API, I'm not sure if this qualifies as a release blocker.
          it would be good to fix it ASAP nonetheless, so we can include it if there is another RC or in the first bugfix release of 1.3.x.

          Show
          fhueske Fabian Hueske added a comment - Yes, this looks like a bug that needs to be fixed, IMO. If your analysis is correct, this would mean that we cannot use a window after a TableFunction was applied. Although this is a serious limitation of the Table API, I'm not sure if this qualifies as a release blocker. it would be good to fix it ASAP nonetheless, so we can include it if there is another RC or in the first bugfix release of 1.3.x.
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows:

          • Add a new `rowtime` column:(works well)
            val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime)
             table
                .join(udtf2('string))
                .window(Slide over 5.milli every 2.milli on 'rt as 'w)
                .groupBy('w)
                .select('int.count, 'w.start, 'w.end)
            

          But If we using already existing column as `rowtime`, UDTF will not work.

          • Using already existing column as `rowtime` cannot work)
             val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) 
             table
                .join(udtf2('string))
                .window(Slide over 5.milli every 2.milli on 'long as 'w)
                .groupBy('w)
                .select('int.count, 'w.start, 'w.end)
            

          So I think the problem is how to handle the `rowtime` column when we using already existing column.

          Show
          sunjincheng121 sunjincheng added a comment - - edited Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: Add a new `rowtime` column:(works well) val table = stream.toTable(tEnv,' long , ' int , ' double , ' float , 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'rt as 'w) .groupBy('w) .select(' int .count, 'w.start, 'w.end) But If we using already existing column as `rowtime`, UDTF will not work. Using already existing column as `rowtime` cannot work) val table = stream.toTable(tEnv,' long .rowtime, ' int , ' double , ' float , 'bigdec, 'ts, 'date,'pojo, 'string) table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on ' long as 'w) .groupBy('w) .select(' int .count, 'w.start, 'w.end) So I think the problem is how to handle the `rowtime` column when we using already existing column.
          Hide
          twalthr Timo Walther added a comment -

          I think I found the bug that caused this exception. I will open a PR for it, it is just a one liner.

          Show
          twalthr Timo Walther added a comment - I think I found the bug that caused this exception. I will open a PR for it, it is just a one liner.
          Hide
          sunjincheng121 sunjincheng added a comment -

          Sounds very good to me!!

          Show
          sunjincheng121 sunjincheng added a comment - Sounds very good to me!!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/4008

          FLINK-6736 [table] Fix UDTF field access with time attribute record

          A wrong physical index caused the code generation to fail.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-6736

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4008.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4008


          commit 0ba039f2555c916088fe71743002e395989701ea
          Author: twalthr <twalthr@apache.org>
          Date: 2017-05-29T12:48:14Z

          FLINK-6736 [table] Fix UDTF field access with time attribute record


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4008 FLINK-6736 [table] Fix UDTF field access with time attribute record A wrong physical index caused the code generation to fail. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6736 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4008.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4008 commit 0ba039f2555c916088fe71743002e395989701ea Author: twalthr <twalthr@apache.org> Date: 2017-05-29T12:48:14Z FLINK-6736 [table] Fix UDTF field access with time attribute record
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4008

          @sunjincheng121 does it fix your issue?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 does it fix your issue?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4008#discussion_r118935465

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest {

          class TableFunc extends TableFunction[String] {
          val t = new Timestamp(0L)

          • def eval(time1: Long, time2: Timestamp): Unit = {
          • collect(time1.toString + time2.after(t))
            + def eval(time1: Long, time2: Timestamp, string: String): Unit = {
              • End diff –

          Does this change cover changes in functionality?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118935465 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction [String] { val t = new Timestamp(0L) def eval(time1: Long, time2: Timestamp): Unit = { collect(time1.toString + time2.after(t)) + def eval(time1: Long, time2: Timestamp, string: String): Unit = { End diff – Does this change cover changes in functionality?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4008#discussion_r118938233

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest {

          class TableFunc extends TableFunction[String] {
          val t = new Timestamp(0L)

          • def eval(time1: Long, time2: Timestamp): Unit = {
          • collect(time1.toString + time2.after(t))
            + def eval(time1: Long, time2: Timestamp, string: String): Unit = {
              • End diff –

          Yes, it is used in `TimeAttributeITCase`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118938233 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction [String] { val t = new Timestamp(0L) def eval(time1: Long, time2: Timestamp): Unit = { collect(time1.toString + time2.after(t)) + def eval(time1: Long, time2: Timestamp, string: String): Unit = { End diff – Yes, it is used in `TimeAttributeITCase`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4008

          @sunjincheng121 I will add another ITCase just to be sure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 I will add another ITCase just to be sure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4008

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4008
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4.0: 64ca0fa22ac0af21b3a0a193b769a24d44947f18
          Fixed in 1.3.0: 5d3bb6475650df01abefa65205bd118c044e5eb4

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4.0: 64ca0fa22ac0af21b3a0a193b769a24d44947f18 Fixed in 1.3.0: 5d3bb6475650df01abefa65205bd118c044e5eb4

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development