Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.8.1, 1.9.0
-
None
-
$ java -version openjdk version "1.8.0_222" OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode
------
$ scala -version Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL
------
build.sbt
[...]
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.9.0"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")[...]
$ java -version openjdk version "1.8.0_222" OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode ------ $ scala -version Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL ------ build. sbt [...] ThisBuild / scalaVersion := "2.11.12" val flinkVersion = "1.9.0" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") [...]
Description
(The project in which I face the error is attached.)
Using: Scala streaming API and the StreamTableEnvironment.
Given the classes:
object EntityType extends Enumeration { type EntityType = Value val ACTIVITY = Value } sealed trait Entity extends Serializable case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity
What I try to do to convert a table after selection to an appendStream:
/** activity table **/ val activityDataStream = partialComputation1 .filter(_._1 == EntityType.ACTIVITY) .map(x => x._3.asInstanceOf[Activity]) tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount) val selectedTable = tableEnv.scan("activity").select("card_id, second") selectedTable.printSchema() // root // |-- card_id: BIGINT // |-- second: BIGINT // ATTEMPT 1 // val output = tableEnv.toAppendStream[(Long, Long)](selectedTable) // output.print // ATTEMPT 2 // val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable) // output.print // ATTEMPT 3 // val output = tableEnv.toAppendStream[Row](selectedTable) // output.print // ATTEMPT 4 case class Test(card_id: Long, second: Long) extends Entity val output = tableEnv.toAppendStream[Test](selectedTable) output.print
In any of the attempts the error I get is always the same:
$ flink run target/scala-2.11/app-assembly-0.1.jar Starting execution of program root |-- card_id: BIGINT |-- second: BIGINT ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9954823e0b55a8140f78be6868c85399) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at bds_comparison.flink.metrocard.App$.main(App.scala:141) at bds_comparison.flink.metrocard.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: Line 26, Column 21: Unexpected selector 'package' after "." at org.codehaus.janino.Parser.compileException(Parser.java:3482) at org.codehaus.janino.Parser.parseSelector(Parser.java:3147) at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2761) at org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2717) at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2696) at org.codehaus.janino.Parser.parseShiftExpression(Parser.java:2675) at org.codehaus.janino.Parser.parseRelationalExpression(Parser.java:2599) at org.codehaus.janino.Parser.parseEqualityExpression(Parser.java:2573) at org.codehaus.janino.Parser.parseAndExpression(Parser.java:2552) at org.codehaus.janino.Parser.parseExclusiveOrExpression(Parser.java:2531) at org.codehaus.janino.Parser.parseInclusiveOrExpression(Parser.java:2510) at org.codehaus.janino.Parser.parseConditionalAndExpression(Parser.java:2489) at org.codehaus.janino.Parser.parseConditionalOrExpression(Parser.java:2468) at org.codehaus.janino.Parser.parseConditionalExpression(Parser.java:2449) at org.codehaus.janino.Parser.parseAssignmentExpression(Parser.java:2428) at org.codehaus.janino.Parser.parseExpression(Parser.java:2413) at org.codehaus.janino.Parser.parseBlockStatement(Parser.java:1611) at org.codehaus.janino.Parser.parseBlockStatements(Parser.java:1544) at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1381) at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:834) at org.codehaus.janino.Parser.parseClassBody(Parser.java:732) at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:638) at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:366) at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:237) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33) ... 10 more