Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16361

FLIP-84: Improve & Refactor API of TableEnvironment & Table

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 1.11.0
    • Table SQL / API
    • None

    Description

      as the FLIP-84 document described,

      We propose to deprecate the following methods:

      TableEnvironment.sqlUpdate(String)
      TableEnvironment.insertInto(String, Table)
      TableEnvironment.execute(String)
      TableEnvironment.explain(boolean)
      TableEnvironment.fromTableSource(TableSource<?>)
      Table.insertInto(String)
      

      meanwhile, we propose to introduce the following new methods:

      interface TableEnvironment {
          // execute the given single statement, and return the execution result.
          TableResult executeSql(String statement);
           
          // get the AST and the execution plan for the given single statement (DQL, DML)
          String explainSql(String statement, ExplainDetail... extraDetails);
       
          // create a StatementSet instance which can add DML statements or Tables
          // to the set and explain or execute them as a batch.
          StatementSet createStatementSet();
      }
      
      interface Table {
          // write the Table to a TableSink that was registered
          // under the specified path.
          TableResult executeInsert(String tablePath);
           
          // write the Table to a TableSink that was registered
          // under the specified path.
          TableResult executeInsert(String tablePath, boolean overwrite);
       
          // create a StatementSet instance which can add DML statements or Tables
          // to the set and explain or execute them as a batch.
          String explain(ExplainDetail... extraDetails);
       
          // get the contents of the current table.
          TableResult execute();
      }
      
      interface TableResult {
           // return JobClient if a Flink job is submitted
           // (for DML/DQL statement), else return empty (e.g. for DDL).
          Optional<JobClient> getJobClient();
       
          // return the schema of the result
          TableSchema getTableSchema();
           
          // return the ResultKind which can avoid custom parsing of
          // an "OK" row in programming
          ResultKind getResultKind();
       
          // get the row contents as an iterable rows
          Iterator<Row> collect();
       
          // print the result contents
          void print();
      }
      
      public enum ResultKind {
          // for DDL, DCL and statements with a simple "OK"
          SUCCESS,
       
          // rows with important content are available (DML, DQL)
          SUCCESS_WITH_CONTENT
      }
      
      interface StatementSet  {
          // add single INSERT statement into the set
          StatementSet addInsertSql(String statement);
       
          // add Table with the given sink table name to the set
          StatementSet addInsert(String targetPath, Table table);
       
          // add Table with the given sink table name to the set
          StatementSet addInsert(String targetPath, Table table, boolean overwrite);
       
          // returns the AST and the execution plan to compute
          // the result of all statements and Tables
          String explain(ExplainDetail... extraDetails);
       
          // execute all statements and Tables as a batch
          TableResult execute();
      }
      
      public enum ExplainDetail {
         STATE_SIZE_ESTIMATE,
         UID,
         HINTS,
         ...
      }
      

      We unify the Flink table program trigger behavior, and propose that: for TableEnvironment and StreamTableEnvironment, you must use TableEnvironment.execute() to trigger table program execution, once you convert the table program to a DataStream program (through toAppendStream or toRetractStream method), you must use StreamExecutionEnvironment.execute to trigger the DataStream program. Similar rule for BatchTableEnvironment, you must use `TableEnvironment.execute()` to trigger batch table program execution, once you convert the table program (through `toDataSet` method) to a DataSet program, you must use `ExecutionEnvironment.execute` to trigger the DataSet program.

      Attachments

        Issue Links

          Activity

            People

              godfreyhe godfrey he
              godfreyhe godfrey he
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 20m
                  2h 20m