Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13944

Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.8.1, 1.9.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:

      Description

      (The project in which I face the error is attached.)

      Using: Scala streaming API and the StreamTableEnvironment.

      Given the classes:

      object EntityType extends Enumeration {
        type EntityType = Value
        val ACTIVITY = Value
       }
      sealed trait Entity extends Serializable
      
      case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity
      

      What I try to do to convert a table after selection to an appendStream:

      /** activity table **/
      val activityDataStream = partialComputation1
        .filter(_._1 == EntityType.ACTIVITY)
        .map(x => x._3.asInstanceOf[Activity])
      tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)
      
      
      val selectedTable = tableEnv.scan("activity").select("card_id, second")
      selectedTable.printSchema()
      // root
      //   |-- card_id: BIGINT
      //   |-- second: BIGINT
      
      // ATTEMPT 1
      //    val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)
      //    output.print
      
      // ATTEMPT 2
      //    val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)
      //    output.print
      
      // ATTEMPT 3
      //    val output = tableEnv.toAppendStream[Row](selectedTable)
      //    output.print
      
      // ATTEMPT 4
      case class Test(card_id: Long, second: Long) extends Entity
      val output = tableEnv.toAppendStream[Test](selectedTable)
      output.print
      

      In any of the attempts the error I get is always the same:

      $ flink run target/scala-2.11/app-assembly-0.1.jar 
      
      Starting execution of program
      root
       |-- card_id: BIGINT
       |-- second: BIGINT
      
      
      ------------------------------------------------------------
       The program finished with the following exception:
      
      org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9954823e0b55a8140f78be6868c85399)
      	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
      	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
      	at bds_comparison.flink.metrocard.App$.main(App.scala:141)
      	at bds_comparison.flink.metrocard.App.main(App.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
      	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
      	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
      	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
      	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
      	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
      	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
      	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
      	... 23 more
      Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
      	at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
      	at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
      	at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
      	at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.codehaus.commons.compiler.CompileException: Line 26, Column 21: Unexpected selector 'package' after "."
      	at org.codehaus.janino.Parser.compileException(Parser.java:3482)
      	at org.codehaus.janino.Parser.parseSelector(Parser.java:3147)
      	at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2761)
      	at org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2717)
      	at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2696)
      	at org.codehaus.janino.Parser.parseShiftExpression(Parser.java:2675)
      	at org.codehaus.janino.Parser.parseRelationalExpression(Parser.java:2599)
      	at org.codehaus.janino.Parser.parseEqualityExpression(Parser.java:2573)
      	at org.codehaus.janino.Parser.parseAndExpression(Parser.java:2552)
      	at org.codehaus.janino.Parser.parseExclusiveOrExpression(Parser.java:2531)
      	at org.codehaus.janino.Parser.parseInclusiveOrExpression(Parser.java:2510)
      	at org.codehaus.janino.Parser.parseConditionalAndExpression(Parser.java:2489)
      	at org.codehaus.janino.Parser.parseConditionalOrExpression(Parser.java:2468)
      	at org.codehaus.janino.Parser.parseConditionalExpression(Parser.java:2449)
      	at org.codehaus.janino.Parser.parseAssignmentExpression(Parser.java:2428)
      	at org.codehaus.janino.Parser.parseExpression(Parser.java:2413)
      	at org.codehaus.janino.Parser.parseBlockStatement(Parser.java:1611)
      	at org.codehaus.janino.Parser.parseBlockStatements(Parser.java:1544)
      	at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1381)
      	at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:834)
      	at org.codehaus.janino.Parser.parseClassBody(Parser.java:732)
      	at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:638)
      	at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:366)
      	at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:237)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
      	at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
      	... 10 more
      

       

        Attachments

        1. app.zip
          13 kB
          Stefano

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nyxgear Stefano
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: