Details

    • Type: Improvement
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The current UDTF leverages the table.join(expression) interface, which is not a proper interface in terms of semantics. We would like to refactor this to let UDTF use table.join(table) interface. Very briefly, UDTF's apply method will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table)

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.3 with c969237fce4fe5394e1cfdbb1186db63333d73d0

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.3 with c969237fce4fe5394e1cfdbb1186db63333d73d0
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3791

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3791
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114590580

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,13 +507,45 @@ class Table(
          }

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {

          • // check that right table belongs to the same TableEnvironment
          • if (right.tableEnv != this.tableEnv) {
            + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { + throw new ValidationException( + "TableFunctions can only be followed by Alias. e.g table.join(split('c) as ('a, 'b))") + }

            +
            + // check that the TableEnvironment of right table is not null
            + // and right table belongs to the same TableEnvironment
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) {

              • End diff –

          I would change this method as follows:
          first we check if `right` is a table function call. If no, we translate the join as before. Otherwise, we translate it as a join with a table function.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590580 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,13 +507,45 @@ class Table( } private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { + throw new ValidationException( + "TableFunctions can only be followed by Alias. e.g table.join(split('c) as ('a, 'b))") + } + + // check that the TableEnvironment of right table is not null + // and right table belongs to the same TableEnvironment + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { End diff – I would change this method as follows: first we check if `right` is a table function call. If no, we translate the join as before. Otherwise, we translate it as a join with a table function.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114556553

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala —
          @@ -35,6 +36,10 @@ class TableConversions(table: Table) {
          /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
          def toDataSet[T: TypeInformation]: DataSet[T] = {

          + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) {
          — End diff –

          We should put this check into `Table.getRelNode`. This will catch all cases when we try to translate a table.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114556553 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala — @@ -35,6 +36,10 @@ class TableConversions(table: Table) { /** Converts the [ [Table] ] to a [ [DataSet] ] of the specified type. */ def toDataSet [T: TypeInformation] : DataSet [T] = { + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) { — End diff – We should put this check into `Table.getRelNode`. This will catch all cases when we try to translate a table.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114590134

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -93,6 +103,11 @@ class Table(

          • }}}
            */
            def select(fields: Expression*): Table = {
            + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
              • End diff –

          I think we can add these checks without touching all methods of `Table`.
          We could implement a method that recursively traverses a `LogicalNode` and checks if one of this children is an unbounded table function call. This check is performed in the constructor of Table and throws an exception except, the `logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it was created with the new constructor or `as()` was applied on it.

          That way we can remove all checks in the methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114590134 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -93,6 +103,11 @@ class Table( }}} */ def select(fields: Expression*): Table = { + if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) { End diff – I think we can add these checks without touching all methods of `Table`. We could implement a method that recursively traverses a `LogicalNode` and checks if one of this children is an unbounded table function call. This check is performed in the constructor of Table and throws an exception except, the `logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it was created with the new constructor or `as()` was applied on it. That way we can remove all checks in the methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114553215

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -49,11 +49,11 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
          import org.apache.flink.table.calcite.

          {FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}

          import org.apache.flink.table.catalog.

          {ExternalCatalog, ExternalCatalogSchema}

          import org.apache.flink.table.codegen.

          {CodeGenerator, ExpressionReducer}

          -import org.apache.flink.table.expressions.

          {Alias, Expression, UnresolvedFieldReference}

          +import org.apache.flink.table.expressions.

          {Alias, Call, Expression, ExpressionParser, TableFunctionCall, UnresolvedFieldReference}

          — End diff –

          Remove the unnecessary imports

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114553215 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -49,11 +49,11 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite. {FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog. {ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen. {CodeGenerator, ExpressionReducer} -import org.apache.flink.table.expressions. {Alias, Expression, UnresolvedFieldReference} +import org.apache.flink.table.expressions. {Alias, Call, Expression, ExpressionParser, TableFunctionCall, UnresolvedFieldReference} — End diff – Remove the unnecessary imports
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114558126

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -358,4 +360,45 @@ object UserDefinedFunctionUtils

          { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) }

          +
          + /**
          + * this method is used for create a [[LogicalTableFunctionCall]]
          + * @param tableEnv
          + * @param udtf a String represent a TableFunction Call e.g "split(c)"
          + * @return
          + */
          + def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) = {
          — End diff –

          Add return type to method

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114558126 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -358,4 +360,45 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** + * this method is used for create a [ [LogicalTableFunctionCall] ] + * @param tableEnv + * @param udtf a String represent a TableFunction Call e.g "split(c)" + * @return + */ + def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) = { — End diff – Add return type to method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114204295

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.Table
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
          +
          +/**
          + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
          + *
          + * @param tf The tableFunction to convert.
          + */
          +class TableFunctionConversions[T](tf: TableFunction[T]) {
          +
          — End diff –

          I think the name `TableFunctionConversions` is OK. A `TableFunction` object is implicitly converted into a `TableFunctionConversions` and the `apply` method converts it into a `Table`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114204295 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [ [TableFunction] ] (provided by scala user) into a [ [Table] ] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions [T] (tf: TableFunction [T] ) { + — End diff – I think the name `TableFunctionConversions` is OK. A `TableFunction` object is implicitly converted into a `TableFunctionConversions` and the `apply` method converts it into a `Table`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114063517

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -56,6 +57,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
          case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
          case other => Alias(other, s"_c$i")
          }
          + // This happens when TableFunction.apply returns a Table, ex: t1.join(split('a) as 'b)
          + case alias: Alias => alias
          — End diff –

          Can we merge `Alias` before into the `LogicalTableFunctionCall` (maybe in `table.join()`?
          We should add as few special cases in the validation as possible, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114063517 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -56,6 +57,8 @@ case class Project(projectList: Seq [NamedExpression] , child: LogicalNode) extend case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) case other => Alias(other, s"_c$i") } + // This happens when TableFunction.apply returns a Table, ex: t1.join(split('a) as 'b) + case alias: Alias => alias — End diff – Can we merge `Alias` before into the `LogicalTableFunctionCall` (maybe in `table.join()`? We should add as few special cases in the validation as possible, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114199929

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala —
          @@ -60,10 +60,14 @@ class DataSetCorrelateRule
          case rel: RelSubset =>
          convertToCorrelate(rel.getRelList.get(0), condition)

          • case calc: FlinkLogicalCalc =>
            + case calc: FlinkLogicalCalc => {
              • End diff –

          We should (later?) add optimization rules to prevent that expressions are pushed below the join towards the table function (and are pushed above the join if they are already below the join).
          For now we should just ensure that we do not compute incorrect results.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114199929 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala — @@ -60,10 +60,14 @@ class DataSetCorrelateRule case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) case calc: FlinkLogicalCalc => + case calc: FlinkLogicalCalc => { End diff – We should (later?) add optimization rules to prevent that expressions are pushed below the join towards the table function (and are pushed above the join if they are already below the join). For now we should just ensure that we do not compute incorrect results.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114062529

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
              • End diff –

          Can we add a check that the root of `right.logicalPlan` is a `LogicalTableFunctionCall` if `right.tableEnv == null`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114062529 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { End diff – Can we add a check that the root of `right.logicalPlan` is a `LogicalTableFunctionCall` if `right.tableEnv == null`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114183462

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala —
          @@ -60,10 +60,14 @@ class DataSetCorrelateRule
          case rel: RelSubset =>
          convertToCorrelate(rel.getRelList.get(0), condition)

          • case calc: FlinkLogicalCalc =>
            + case calc: FlinkLogicalCalc => {
              • End diff –

          We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114183462 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala — @@ -60,10 +60,14 @@ class DataSetCorrelateRule case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) case calc: FlinkLogicalCalc => + case calc: FlinkLogicalCalc => { End diff – We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114061983

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -309,6 +308,42 @@ class Table(
          }

          /**
          + * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
          + * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
          + * the rows from the outer table (table on the left of the operator), and rows that do not match
          + * the condition from the table function (which is defined in the expression on the right
          + * side of the operator). Rows with no matching condition are filled with null values.
          + *
          + * Scala Example:
          + * {{{
          + * class MySplitUDTF extends TableFunction[String] {
          + * def eval(str: String): Unit =

          { + * str.split("#").foreach(collect) + * }

          + * }
          + *
          + * val split = new MySplitUDTF()
          + * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
          + * }}}
          + *
          + * Java Example:
          + * {{{
          + * class MySplitUDTF extends TableFunction<String> {
          + * public void eval(String str)

          { + * str.split("#").forEach(this::collect); + * }

          + * }
          + *
          + * TableFunction<String> split = new MySplitUDTF();
          + * tableEnv.registerFunction("split", split);
          + * table.leftOuterJoin(tableEnv.tableApply("split(c)").as("s"))).select("a, b, c, s");
          — End diff –

          I think the Java Syntax could be improved. Could we do something like:
          ```
          table.leftOuterJoin(new Table(tableEnv, "split(c) as s")).select("a, b, c, s");
          ```

          by adding an constructor to `Table` like `Table(tEnv: TableEnvironment, tableFunc: String)`?
          IMO, this would make it more clear that the TableFunction creates a table which is joined.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114061983 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -309,6 +308,42 @@ class Table( } /** + * Joins this [ [Table] ] to a user-defined [ [org.apache.calcite.schema.TableFunction] ]. Similar + * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all + * the rows from the outer table (table on the left of the operator), and rows that do not match + * the condition from the table function (which is defined in the expression on the right + * side of the operator). Rows with no matching condition are filled with null values. + * + * Scala Example: + * {{{ + * class MySplitUDTF extends TableFunction [String] { + * def eval(str: String): Unit = { + * str.split("#").foreach(collect) + * } + * } + * + * val split = new MySplitUDTF() + * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s) + * }}} + * + * Java Example: + * {{{ + * class MySplitUDTF extends TableFunction<String> { + * public void eval(String str) { + * str.split("#").forEach(this::collect); + * } + * } + * + * TableFunction<String> split = new MySplitUDTF(); + * tableEnv.registerFunction("split", split); + * table.leftOuterJoin(tableEnv.tableApply("split(c)").as("s"))).select("a, b, c, s"); — End diff – I think the Java Syntax could be improved. Could we do something like: ``` table.leftOuterJoin(new Table(tableEnv, "split(c) as s")).select("a, b, c, s"); ``` by adding an constructor to `Table` like `Table(tEnv: TableEnvironment, tableFunc: String)`? IMO, this would make it more clear that the TableFunction creates a table which is joined.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114062550

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") }

            +
            + val rule: PartialFunction[LogicalNode, LogicalNode] =

            Unknown macro: { + case udtf}

            +
            + val newRightPlan = right.logicalPlan.postOrderTransform(rule)
            + /**
            + * if right plan has an unresolved LogicalTableFunctionCall, correlated shall be true

              • End diff –

          format comment into one line `// if right plan ...`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114062550 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + + val rule: PartialFunction [LogicalNode, LogicalNode] = Unknown macro: { + case udtf} + + val newRightPlan = right.logicalPlan.postOrderTransform(rule) + /** + * if right plan has an unresolved LogicalTableFunctionCall, correlated shall be true End diff – format comment into one line `// if right plan ...`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114188108

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala —
          @@ -50,72 +50,85 @@ class UserDefinedTableFunctionTest extends TableTestBase {

          // Java environment
          val javaEnv = mock(classOf[JavaExecutionEnv])

          • val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
          • val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
            + val jtEnv = TableEnvironment.getTableEnvironment(javaEnv)
            + val in2 = jtEnv.fromDataStream(jDs).as("a, b, c")

          // test cross join
          val func1 = new TableFunc1

          • javaTableEnv.registerFunction("func1", func1)
            + jtEnv.registerFunction("func1", func1)
            var scalaTable = in1.join(func1('c) as 's).select('c, 's)
          • var javaTable = in2.join("func1(c).as(s)").select("c, s")
            + var javaTable = in2.join(jtEnv.tableApply("func1(c)")
            + .as("s")).select("c, s")
            verifyTableEquals(scalaTable, javaTable)

          // test left outer join
          scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)

          • javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
            + javaTable = in2.leftOuterJoin(
            + jtEnv.tableApply("func1(c)")
            + .as("s")
            + ).select("c, s")
            verifyTableEquals(scalaTable, javaTable)

          // test overloading
          scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)

          • javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
            + javaTable = in2.join(jtEnv.tableApply("func1(c, '$')")
            + .as("s")).select("c, s")
            verifyTableEquals(scalaTable, javaTable)

          // test custom result type
          val func2 = new TableFunc2

          • javaTableEnv.registerFunction("func2", func2)
            + jtEnv.registerFunction("func2", func2)
            scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
          • javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
            + javaTable = in2.join(
            + jtEnv.tableApply("func2(c)")
            + .as("name, len"))
            + .select("c, name, len")
            verifyTableEquals(scalaTable, javaTable)

          // test hierarchy generic type
          val hierarchy = new HierarchyTableFunction

          • javaTableEnv.registerFunction("hierarchy", hierarchy)
          • scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
            + jtEnv.registerFunction("hierarchy", hierarchy)
            + scalaTable = in1.join(hierarchy('c) as ('name, 'len, 'adult))
            .select('c, 'name, 'len, 'adult)
          • javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
            + javaTable = in2.join(jtEnv.tableApply("hierarchy(c)")
            + .as("name, len, adult"))
            .select("c, name, len, adult")
            verifyTableEquals(scalaTable, javaTable)

          // test pojo type
          val pojo = new PojoTableFunc

          • javaTableEnv.registerFunction("pojo", pojo)
            + jtEnv.registerFunction("pojo", pojo)
            scalaTable = in1.join(pojo('c))
            .select('c, 'name, 'age)
          • javaTable = in2.join("pojo(c)")
            + javaTable = in2.join(jtEnv.tableApply("pojo(c)"))
            .select("c, name, age")
            verifyTableEquals(scalaTable, javaTable)

          // test with filter
          scalaTable = in1.join(func2('c) as ('name, 'len))
          .select('c, 'name, 'len).filter('len > 2)

          • javaTable = in2.join("func2(c) as (name, len)")
            + javaTable = in2.join(jtEnv.tableApply("func2(c)") as ("name, len"))
            .select("c, name, len").filter("len > 2")
            verifyTableEquals(scalaTable, javaTable)

          // test with scalar function
          scalaTable = in1.join(func1('c.substring(2)) as 's)
          .select('a, 'c, 's)

          • javaTable = in2.join("func1(substring(c, 2)) as (s)")
            + javaTable = in2.join(jtEnv.tableApply("func1(substring(c, 2))") as ("s"))
            .select("a, c, s")
            verifyTableEquals(scalaTable, javaTable)

          // check scala object is forbidden
          expectExceptionThrown(
          tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
          expectExceptionThrown(

          • javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
            + jtEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
            expectExceptionThrown(
          • in1.join(ObjectTableFunction('a, 1)), "Scala object")
            + {
              • End diff –

          Please avoid unnecessary refactorings. They make PRs harder to review and might be reverted by the next person going over this code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114188108 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala — @@ -50,72 +50,85 @@ class UserDefinedTableFunctionTest extends TableTestBase { // Java environment val javaEnv = mock(classOf [JavaExecutionEnv] ) val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv) val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c") + val jtEnv = TableEnvironment.getTableEnvironment(javaEnv) + val in2 = jtEnv.fromDataStream(jDs).as("a, b, c") // test cross join val func1 = new TableFunc1 javaTableEnv.registerFunction("func1", func1) + jtEnv.registerFunction("func1", func1) var scalaTable = in1.join(func1('c) as 's).select('c, 's) var javaTable = in2.join("func1(c).as(s)").select("c, s") + var javaTable = in2.join(jtEnv.tableApply("func1(c)") + .as("s")).select("c, s") verifyTableEquals(scalaTable, javaTable) // test left outer join scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's) javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s") + javaTable = in2.leftOuterJoin( + jtEnv.tableApply("func1(c)") + .as("s") + ).select("c, s") verifyTableEquals(scalaTable, javaTable) // test overloading scalaTable = in1.join(func1('c, "$") as 's).select('c, 's) javaTable = in2.join("func1(c, '$') as (s)").select("c, s") + javaTable = in2.join(jtEnv.tableApply("func1(c, '$')") + .as("s")).select("c, s") verifyTableEquals(scalaTable, javaTable) // test custom result type val func2 = new TableFunc2 javaTableEnv.registerFunction("func2", func2) + jtEnv.registerFunction("func2", func2) scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len) javaTable = in2.join("func2(c).as(name, len)").select("c, name, len") + javaTable = in2.join( + jtEnv.tableApply("func2(c)") + .as("name, len")) + .select("c, name, len") verifyTableEquals(scalaTable, javaTable) // test hierarchy generic type val hierarchy = new HierarchyTableFunction javaTableEnv.registerFunction("hierarchy", hierarchy) scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len)) + jtEnv.registerFunction("hierarchy", hierarchy) + scalaTable = in1.join(hierarchy('c) as ('name, 'len, 'adult)) .select('c, 'name, 'len, 'adult) javaTable = in2.join("AS(hierarchy(c), name, adult, len)") + javaTable = in2.join(jtEnv.tableApply("hierarchy(c)") + .as("name, len, adult")) .select("c, name, len, adult") verifyTableEquals(scalaTable, javaTable) // test pojo type val pojo = new PojoTableFunc javaTableEnv.registerFunction("pojo", pojo) + jtEnv.registerFunction("pojo", pojo) scalaTable = in1.join(pojo('c)) .select('c, 'name, 'age) javaTable = in2.join("pojo(c)") + javaTable = in2.join(jtEnv.tableApply("pojo(c)")) .select("c, name, age") verifyTableEquals(scalaTable, javaTable) // test with filter scalaTable = in1.join(func2('c) as ('name, 'len)) .select('c, 'name, 'len).filter('len > 2) javaTable = in2.join("func2(c) as (name, len)") + javaTable = in2.join(jtEnv.tableApply("func2(c)") as ("name, len")) .select("c, name, len").filter("len > 2") verifyTableEquals(scalaTable, javaTable) // test with scalar function scalaTable = in1.join(func1('c.substring(2)) as 's) .select('a, 'c, 's) javaTable = in2.join("func1(substring(c, 2)) as (s)") + javaTable = in2.join(jtEnv.tableApply("func1(substring(c, 2))") as ("s")) .select("a, c, s") verifyTableEquals(scalaTable, javaTable) // check scala object is forbidden expectExceptionThrown( tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object") expectExceptionThrown( javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object") + jtEnv.registerFunction("func3", ObjectTableFunction), "Scala object") expectExceptionThrown( in1.join(ObjectTableFunction('a, 1)), "Scala object") + { End diff – Please avoid unnecessary refactorings. They make PRs harder to review and might be reverted by the next person going over this code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114198491

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") }

            +
            + val rule: PartialFunction[LogicalNode, LogicalNode] =

            Unknown macro: { + case udtf}

            +
            + val newRightPlan = right.logicalPlan.postOrderTransform(rule)

              • End diff –

          Before this was done in by unwrapping the TableFunctionCall and adding the alias. This step also ensured, that there are no other operations except Alias and the function call. This logic is still used in the TableFunctionParser. I think we should do similar steps here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114198491 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + + val rule: PartialFunction [LogicalNode, LogicalNode] = Unknown macro: { + case udtf} + + val newRightPlan = right.logicalPlan.postOrderTransform(rule) End diff – Before this was done in by unwrapping the TableFunctionCall and adding the alias. This step also ensured, that there are no other operations except Alias and the function call. This logic is still used in the TableFunctionParser. I think we should do similar steps here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114197458

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala —
          @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions

          { tableEnv.toDataStream[Row](table) }

          + implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
          — End diff –

          This allows to use table functions everywhere where a Table is expected, e.g., in `union`, `intersect`, `minus`, `rightOuterJoin` and `fullOuterJoin`. We should check these cases and throw a good exception explaining that TableFunctions are currently only supported for leftJoin and join.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114197458 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala — @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions { tableEnv.toDataStream[Row](table) } + implicit def tableFunctionCall2Table [T] (tf: TableFunction [T] ): TableFunctionConversions [T] = { — End diff – This allows to use table functions everywhere where a Table is expected, e.g., in `union`, `intersect`, `minus`, `rightOuterJoin` and `fullOuterJoin`. We should check these cases and throw a good exception explaining that TableFunctions are currently only supported for leftJoin and join.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114198865

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala —
          @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions

          { tableEnv.toDataStream[Row](table) }

          + implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
          — End diff –

          We should also add tests to validate that these cases are properly handled.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114198865 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala — @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions { tableEnv.toDataStream[Row](table) } + implicit def tableFunctionCall2Table [T] (tf: TableFunction [T] ): TableFunctionConversions [T] = { — End diff – We should also add tests to validate that these cases are properly handled.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114190116

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala —
          @@ -174,7 +209,8 @@ class UserDefinedTableFunctionTest extends TableTestBase {
          def testCrossJoin(): Unit = {
          val util = streamTestUtil()
          val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

          • val function = util.addFunction("func1", new TableFunc1)
            + val function = new TableFunc1
              • End diff –

          Please revert these (and similar) unnecessary changes. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114190116 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala — @@ -174,7 +209,8 @@ class UserDefinedTableFunctionTest extends TableTestBase { def testCrossJoin(): Unit = { val util = streamTestUtil() val table = util.addTable [(Int, Long, String)] ("MyTable", 'a, 'b, 'c) val function = util.addFunction("func1", new TableFunc1) + val function = new TableFunc1 End diff – Please revert these (and similar) unnecessary changes. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114197865

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
              • End diff –

          I think we need to better distinguish the regular table join cases from the table function join cases.
          We can only join and leftOuterJoin with a table function. Also, the changes below are only required for joins with a table function.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114197865 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { End diff – I think we need to better distinguish the regular table join cases from the table function join cases. We can only join and leftOuterJoin with a table function. Also, the changes below are only required for joins with a table function.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114063647

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") }

            +
            + val rule: PartialFunction[LogicalNode, LogicalNode] =

            Unknown macro: { + case udtf}

            +
            + val newRightPlan = right.logicalPlan.postOrderTransform(rule)

              • End diff –

          I think we have to make sure at this point that the right plan is only a `LogicalTableFunctionCall` and an `Alias`. We cannot translate anything else yet and should give a meaningful error message.
          Also, it would be good, if we could merge the `Alias` into the `LogicalTableFunctionCall` to get rid of many special cases in the validation process.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114063647 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + + val rule: PartialFunction [LogicalNode, LogicalNode] = Unknown macro: { + case udtf} + + val newRightPlan = right.logicalPlan.postOrderTransform(rule) End diff – I think we have to make sure at this point that the right plan is only a `LogicalTableFunctionCall` and an `Alias`. We cannot translate anything else yet and should give a meaningful error message. Also, it would be good, if we could merge the `Alias` into the `LogicalTableFunctionCall` to get rid of many special cases in the validation process.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114183388

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala —
          @@ -59,10 +59,14 @@ class DataStreamCorrelateRule
          case rel: RelSubset =>
          convertToCorrelate(rel.getRelList.get(0), condition)

          • case calc: FlinkLogicalCalc =>
            + case calc: FlinkLogicalCalc => {
              • End diff –

          We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114183388 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala — @@ -59,10 +59,14 @@ class DataStreamCorrelateRule case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) case calc: FlinkLogicalCalc => + case calc: FlinkLogicalCalc => { End diff – We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114063404

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment

          • if (right.tableEnv != this.tableEnv) {
            + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") }

            +
            + val rule: PartialFunction[LogicalNode, LogicalNode] = {
            + case udtf: LogicalTableFunctionCall if udtf.child == null =>

            { + new LogicalTableFunctionCall( + udtf.functionName, + udtf.tableFunction, + udtf.parameters, + udtf.resultType, + udtf.fieldNames, + this.logicalPlan + ).validate(tableEnv) + }

            + case other: LogicalNode => other.validate(tableEnv)

              • End diff –

          Split rule into one rule to set the logicalPlan to the root and another rule to validate all nodes

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114063404 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { + if (right.tableEnv != null && right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + + val rule: PartialFunction [LogicalNode, LogicalNode] = { + case udtf: LogicalTableFunctionCall if udtf.child == null => { + new LogicalTableFunctionCall( + udtf.functionName, + udtf.tableFunction, + udtf.parameters, + udtf.resultType, + udtf.fieldNames, + this.logicalPlan + ).validate(tableEnv) + } + case other: LogicalNode => other.validate(tableEnv) End diff – Split rule into one rule to set the logicalPlan to the root and another rule to validate all nodes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114063055

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalNode.scala —
          @@ -79,15 +79,21 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
          protected[logical] def construct(relBuilder: RelBuilder): RelBuilder

          def validate(tableEnv: TableEnvironment): LogicalNode = {

          • val resolvedNode = resolveExpressions(tableEnv)
          • resolvedNode.expressionPostOrderTransform {
          • case a: Attribute if !a.valid =>
          • val from = children.flatMap(.output).map(.name).mkString(", ")
          • failValidation(s"Cannot resolve [$ {a.name}

            ] given input [$from].")
            -

          • case e: Expression if e.validateInput().isFailure =>
          • failValidation(s"Expression $e failed on input check: " +
          • s"$ {e.validateInput().asInstanceOf[ValidationFailure].message}

            ")
            + // A tableFunction may not contain the tableEnv when created by scala user
            + // We do not validate operators (select, as etc.)
            + // if they are applied on such TableFunction with empty tableEnv.
            + if (tableEnv == null) this

              • End diff –

          This looks like a potentially problematic condition which relies on the fact that `tableEnv` may only be `null` in case of a `TableFunction`. It would be much better if we could avoid such special casing in method which are called from many different places.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114063055 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalNode.scala — @@ -79,15 +79,21 @@ abstract class LogicalNode extends TreeNode [LogicalNode] { protected [logical] def construct(relBuilder: RelBuilder): RelBuilder def validate(tableEnv: TableEnvironment): LogicalNode = { val resolvedNode = resolveExpressions(tableEnv) resolvedNode.expressionPostOrderTransform { case a: Attribute if !a.valid => val from = children.flatMap( .output).map( .name).mkString(", ") failValidation(s"Cannot resolve [$ {a.name} ] given input [$from] .") - case e: Expression if e.validateInput().isFailure => failValidation(s"Expression $e failed on input check: " + s"$ {e.validateInput().asInstanceOf[ValidationFailure].message} ") + // A tableFunction may not contain the tableEnv when created by scala user + // We do not validate operators (select, as etc.) + // if they are applied on such TableFunction with empty tableEnv. + if (tableEnv == null) this End diff – This looks like a potentially problematic condition which relies on the fact that `tableEnv` may only be `null` in case of a `TableFunction`. It would be much better if we could avoid such special casing in method which are called from many different places.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r114061878

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -309,6 +308,42 @@ class Table(
          }

          /**
          + * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
          + * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
          — End diff –

          I think the description of the outer join is confusing.
          In the sentence "... all the rows from the outer table (table on the left of the operator), and rows that do not match the condition from the table function"
          1. it should be "that do match the condition"
          2. there is no condition
          3. it reads as the rows of the left side and table function would be unioned.

          I think we can assume that the user knows how a left join works. We should simply say that the each row of the outer table is joined with each row produced by the table function call and that the outer row is padded with nulls if the table function returns an empty table.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r114061878 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -309,6 +308,42 @@ class Table( } /** + * Joins this [ [Table] ] to a user-defined [ [org.apache.calcite.schema.TableFunction] ]. Similar + * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all — End diff – I think the description of the outer join is confusing. In the sentence "... all the rows from the outer table (table on the left of the operator), and rows that do not match the condition from the table function" 1. it should be "that do match the condition" 2. there is no condition 3. it reads as the rows of the left side and table function would be unioned. I think we can assume that the user knows how a left join works. We should simply say that the each row of the outer table is joined with each row produced by the table function call and that the outer row is padded with nulls if the table function returns an empty table.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3791

          HI @Xpray Thanks for the update. I think the PR is in a pretty good shape for me. Except for the name of `TableFunctionConversions`. looking forward to @fhueske 's opinion.

          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3791 HI @Xpray Thanks for the update. I think the PR is in a pretty good shape for me. Except for the name of `TableFunctionConversions`. looking forward to @fhueske 's opinion. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113858925

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.Table
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
          +
          +/**
          + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
          + *
          + * @param tf The tableFunction to convert.
          + */
          +class TableFunctionConversions[T](tf: TableFunction[T]) {
          +
          — End diff –

          In fact, I think it's a `Type` not a `Conversion`. I'am not sure. I think @fhueske can give us bestest suggestion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113858925 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [ [TableFunction] ] (provided by scala user) into a [ [Table] ] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions [T] (tf: TableFunction [T] ) { + — End diff – In fact, I think it's a `Type` not a `Conversion`. I'am not sure. I think @fhueske can give us bestest suggestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Xpray commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113855406

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.Table
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
          +
          +/**
          + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
          + *
          + * @param tf The tableFunction to convert.
          + */
          +class TableFunctionConversions[T](tf: TableFunction[T]) {
          +
          — End diff –

          I found that most of the existing implicit convert functions have common postfix like XXXConversions, to which I think this naming may be clear.

          Show
          githubbot ASF GitHub Bot added a comment - Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113855406 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [ [TableFunction] ] (provided by scala user) into a [ [Table] ] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions [T] (tf: TableFunction [T] ) { + — End diff – I found that most of the existing implicit convert functions have common postfix like XXXConversions, to which I think this naming may be clear.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113839148

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala —
          @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
          import org.apache.flink.api.common.typeinfo.TypeInformation
          import org.apache.flink.api.java.typeutils.TypeExtractor
          import org.apache.flink.api.java.

          {DataSet, ExecutionEnvironment}

          -import org.apache.flink.table.expressions.ExpressionParser
          — End diff –

          Please restore this line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839148 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala — @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java. {DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser — End diff – Please restore this line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113839137

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala —
          @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
          import org.apache.flink.api.common.typeinfo.TypeInformation
          import org.apache.flink.api.java.typeutils.TypeExtractor
          import org.apache.flink.api.java.

          {DataSet, ExecutionEnvironment}

          -import org.apache.flink.table.expressions.ExpressionParser
          import org.apache.flink.table.api._
          +import org.apache.flink.table.expressions.ExpressionParser
          — End diff –

          Please remove this line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113839137 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala — @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java. {DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.ExpressionParser — End diff – Please remove this line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113833504

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -107,6 +107,11 @@ abstract class TableEnvironment(val config: TableConfig) {
          // registered external catalog names -> catalog
          private val externalCatalogs = new HashMap[String, ExternalCatalog]

          + private lazy val tableFunctionParser = new TableFunctionParser(this)
          +
          + // the method for converting a udtf String to Table for Java API
          + final def tableApply(udtf: String): Table = tableFunctionParser(udtf)
          — End diff –

          `TableFunctionParser` only has one method named `apply`. IMO. It's a util method. So here are 3 suggestions:

          • If a class only contains util methods, I suggest change `class` to `object`, And tableEnv can be a param of method.
          • If `TableFunctionParser#apply` only used for `TableFunction`, I suggest move `apply` method into `UserDefinedFunctionUtils` ,Because all of the functional methods of `UDF/UDTF/UDAF` in that file.
          • if the method only used for `TableEnvironment`.Whether it can be implemented internally in `TableEnvironment` ?
            What do you think? @Xpray
          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833504 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -107,6 +107,11 @@ abstract class TableEnvironment(val config: TableConfig) { // registered external catalog names -> catalog private val externalCatalogs = new HashMap [String, ExternalCatalog] + private lazy val tableFunctionParser = new TableFunctionParser(this) + + // the method for converting a udtf String to Table for Java API + final def tableApply(udtf: String): Table = tableFunctionParser(udtf) — End diff – `TableFunctionParser` only has one method named `apply`. IMO. It's a util method. So here are 3 suggestions: If a class only contains util methods, I suggest change `class` to `object`, And tableEnv can be a param of method. If `TableFunctionParser#apply` only used for `TableFunction`, I suggest move `apply` method into `UserDefinedFunctionUtils` ,Because all of the functional methods of `UDF/UDTF/UDAF` in that file. if the method only used for `TableEnvironment`.Whether it can be implemented internally in `TableEnvironment` ? What do you think? @Xpray
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113834406

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala —
          @@ -19,11 +19,12 @@ package org.apache.flink.table.api.java

          import org.apache.flink.api.common.typeinfo.TypeInformation
          import org.apache.flink.api.java.typeutils.TypeExtractor
          -import org.apache.flink.table.api._
          -import org.apache.flink.table.functions.TableFunction
          -import org.apache.flink.table.expressions.ExpressionParser
          import org.apache.flink.streaming.api.datastream.DataStream
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
          +import org.apache.flink.table.api._
          +import org.apache.flink.table.api.scala.TableFunctionConversions
          — End diff –

          Please remove the useless import.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113834406 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala — @@ -19,11 +19,12 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.table.api._ -import org.apache.flink.table.functions.TableFunction -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api._ +import org.apache.flink.table.api.scala.TableFunctionConversions — End diff – Please remove the useless import.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113835656

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.Table
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
          +
          +/**
          + * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
          + *
          + * @param tf The tableFunction to convert.
          + */
          +class TableFunctionConversions[T](tf: TableFunction[T]) {
          +
          — End diff –

          I think before apply the `TableFunction` ,It's just a definition. And when it's applied. It's a table. So I like named `AppliedTableFunction`. So we have two step a bout using `UDTF`. that is: define -> apply -> join.
          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113835656 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Table +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + +/** + * Holds methods to convert a [ [TableFunction] ] (provided by scala user) into a [ [Table] ] + * + * @param tf The tableFunction to convert. + */ +class TableFunctionConversions [T] (tf: TableFunction [T] ) { + — End diff – I think before apply the `TableFunction` ,It's just a definition. And when it's applied. It's a table. So I like named `AppliedTableFunction`. So we have two step a bout using `UDTF`. that is: define -> apply -> join. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113833924

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableFunctionParser.scala —
          @@ -0,0 +1,66 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api
          +
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
          +
          +
          +/**
          + * A parser to convert a udtf String (for Java user) to [[Table]]
          + *
          + * @param tableEnv a [[TableEnvironment]] which is used for looking up a function
          + */
          +class TableFunctionParser(tableEnv: TableEnvironment) {
          +
          — End diff –

          Only contains util method. I suggest change it to `Object`. or move the method into `UserDefinedFunctionUtils`. And `tableEnv` can be a param of method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113833924 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableFunctionParser.scala — @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall + + +/** + * A parser to convert a udtf String (for Java user) to [ [Table] ] + * + * @param tableEnv a [ [TableEnvironment] ] which is used for looking up a function + */ +class TableFunctionParser(tableEnv: TableEnvironment) { + — End diff – Only contains util method. I suggest change it to `Object`. or move the method into `UserDefinedFunctionUtils`. And `tableEnv` can be a param of method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3791#discussion_r113836005

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -417,12 +452,33 @@ class Table(

          private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
          // check that right table belongs to the same TableEnvironment
          — End diff –

          // check that the TableEnvironment of right table is not null and right table belongs to the same TableEnvironment

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3791#discussion_r113836005 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -417,12 +452,33 @@ class Table( private def join(right: Table, joinPredicate: Option [Expression] , joinType: JoinType): Table = { // check that right table belongs to the same TableEnvironment — End diff – // check that the TableEnvironment of right table is not null and right table belongs to the same TableEnvironment
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user Xpray opened a pull request:

          https://github.com/apache/flink/pull/3791

          FLINK-6334 [table] Refactoring UDTF interface

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/Xpray/flink f6334-ready

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3791.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3791


          commit 588781f3267d77c3a8ef80f971c6741fd3aa9a00
          Author: Xpray <leonxpray@gmail.com>
          Date: 2017-04-27T15:18:19Z

          FLINK-6334 [table] Refactoring UDTF interface


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user Xpray opened a pull request: https://github.com/apache/flink/pull/3791 FLINK-6334 [table] Refactoring UDTF interface Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xpray/flink f6334-ready Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3791.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3791 commit 588781f3267d77c3a8ef80f971c6741fd3aa9a00 Author: Xpray <leonxpray@gmail.com> Date: 2017-04-27T15:18:19Z FLINK-6334 [table] Refactoring UDTF interface

            People

            • Assignee:
              RuidongLi Ruidong Li
              Reporter:
              RuidongLi Ruidong Li
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development