Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.3.0
-
None
Description
When we run the tableAPI as follows:
val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'date,'pojo, 'string) val windowedTable = table .join(udtf2('string) as ('a, 'b)) .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end)
We will get the error message:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column 62: Unknown variable or type "in2" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) at org.codehaus.janino.Java$Package.accept(Java.java:4074) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209)
The reason is val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo) `physicalTypeInfo` will remove the TimeIndicator.
I think we should fix this. What do you think fhueske twalthr , And hope your suggestions.
Attachments
Issue Links
- links to