Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19735

TableFunction can not work in Flink View

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.11.1
    • Fix Version/s: None
    • Component/s: Table SQL / API
    • Labels:
      None

      Description

      TableFunction can't be work in Flink Sql. Here is my code:

      CREATE TABLE test (
        myField   STRING,
        name      STRING
      ) WITH (
        'connector' = 'kafka-0.11',
        'topic' = 'xxxx',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'mygroup',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'csv'
      );
      CREATE TABLE print (
        myField   STRING,
        newWord   STRING,
        newLength INT
      ) WITH (
        'connector' = 'print'
      );
      
      CREATE VIEW test_view AS
      SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
      
      INSERT INTO print
      SELECT * FROM test_view;
      

      And the function code as this:

      @FunctionHint(output = @DataTypeHint("ROW<newWord  STRING, newLength INT>"))
      public class SplitFunction extends TableFunction<Row> {
      
          public void eval(String str) {
              for (String s : str.split(" ")) {
                  collect(Row.of(s, s.length()));
              }
          }
      }
      

      run the sql,cause an error:

      Unable to find source-code formatter for language: log. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlException in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
      	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
      	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
      	at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
      	at com.fcbox.streaming.sql.submit.StreamingJob.callInsertInto(StreamingJob.java:208)
      	at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:200)
      	at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:129)
      	at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:73)
      Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 17 to line 2, column 23: Column 'newWord' not found in any table
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
      	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
      	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
      	at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6154)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6140)
      	at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5574)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:452)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
      	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
      	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
      	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
      	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
      	... 10 more
      Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'newWord' not found in any table
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
      	at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
      	... 30 more
      

      But it work effect in "INSERT INTO" statement:

      INSERT INTO print
      SELECT myField, newWord, newLength FROM test, LATERAL TABLE(SplitFunction(myField));
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                tinny shizhengchao
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: