Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16068

table with keyword-escaped columns and computed_column_expression columns

    XMLWordPrintableJSON

Details

    Description

      I use sql-client to create a table with keyword-escaped column and computed_column_expression column, like this:

      CREATE TABLE source_kafka (
          log STRING,
          `time` BIGINT,
          pt as proctime()
      ) WITH (
        'connector.type' = 'kafka',       
        'connector.version' = 'universal',
        'connector.topic' = 'k8s-logs',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.zookeeper.connect' = 'zk-1.zk:2181,zk-2.zk:2181,zk-3.zk:2181/kafka',
        'connector.properties.bootstrap.servers' = 'kafka.default:9092',
        'connector.properties.group.id' = 'testGroup',
        'format.type'='json',
        'format.fail-on-missing-field' = 'true',
        'update-mode' = 'append'
      );
      

      Then I simply used it :

      SELECT * from source_kafka limit 10;

      got an exception:

      java.io.IOException: Fail to run stream sql job
      	at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:164)
      	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callSelect(FlinkStreamSqlInterpreter.java:108)
      	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:203)
      	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
      	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
      	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
      	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
      	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
      	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
      	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
      	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "time" at line 1, column 12.
      Was expecting one of:
          "ABS" ...
          "ARRAY" ...
          "AVG" ...
          "CARDINALITY" ...
          "CASE" ...
          "CAST" ...
          "CEIL" ...
          "CEILING" ...
          ......
          
      	at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
      	at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
      	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
      	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
      	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
      	at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:104)
      	... 13 more
      

      I also did some tests, the following can run:

      CREATE TABLE source_kafka (
          log STRING,
          `aaaaa` BIGINT,
          pt as proctime()
      )
      
      CREATE TABLE source_kafka (
          log STRING,
          `time` BIGINT
      )
      
      CREATE TABLE source_kafka (
          log STRING,
          pt as proctime()
      )

      can not run:

      `time` , `select`, `string`

       

      Attachments

        Issue Links

          Activity

            People

              libenchao Benchao Li
              pangliang pangliang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m