Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user KurtYoung opened a pull request:

        https://github.com/apache/flink/pull/3594

        FLINK-6149 [table] Add additional flink logical relation nodes and …

        …separate current optimization to logical and physical optimize

        This PR introduces an additional layer named flink logical plan which consist of couple flink logcal relation nodes. The reason why we want to add this is explained in the parent jira: https://issues.apache.org/jira/browse/FLINK-6066. This is the first sub task to add all needed data structures and make all the tests passed.

        The core changes are as following:

        • Add bunch of `FlinkLogicxxxxx` to represent flink logical nodes, corresponding to each calcite basic rel nodes like `Aggregate`, `TableScan`.
        • Make current `DataSetxxxx` and `DataStreamxxxx` nodes to represent the first version of physical nodes.
        • Make almost every active rules included by logical optimize, except the ones trying to translate logical nodes to corresponding `DataSetxxxx` or `DataStreamxxxx`. The later one become the first version of physical optimize rules.

        The following task would be make code reused as much as possible since some logic like `computeSelfCost` is almost the same for some logical and physical nodes.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/KurtYoung/flink flink-6149

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3594.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3594


        commit 75c488bc05011717896de33569ffed8ddd835118
        Author: Kurt Young <ykt836@gmail.com>
        Date: 2017-03-22T08:42:20Z

        FLINK-6149 [table] Add additional flink logical relation nodes and separate current optimization to logical and physical optimize


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3594 FLINK-6149 [table] Add additional flink logical relation nodes and … …separate current optimization to logical and physical optimize This PR introduces an additional layer named flink logical plan which consist of couple flink logcal relation nodes. The reason why we want to add this is explained in the parent jira: https://issues.apache.org/jira/browse/FLINK-6066 . This is the first sub task to add all needed data structures and make all the tests passed. The core changes are as following: Add bunch of `FlinkLogicxxxxx` to represent flink logical nodes, corresponding to each calcite basic rel nodes like `Aggregate`, `TableScan`. Make current `DataSetxxxx` and `DataStreamxxxx` nodes to represent the first version of physical nodes. Make almost every active rules included by logical optimize, except the ones trying to translate logical nodes to corresponding `DataSetxxxx` or `DataStreamxxxx`. The later one become the first version of physical optimize rules. The following task would be make code reused as much as possible since some logic like `computeSelfCost` is almost the same for some logical and physical nodes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-6149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3594.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3594 commit 75c488bc05011717896de33569ffed8ddd835118 Author: Kurt Young <ykt836@gmail.com> Date: 2017-03-22T08:42:20Z FLINK-6149 [table] Add additional flink logical relation nodes and separate current optimization to logical and physical optimize
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109261751

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import java.util.

        {List => JList}

        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.RelNode
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.

        {Aggregate, AggregateCall}

        +import org.apache.calcite.rel.logical.LogicalAggregate
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.util.ImmutableBitSet
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +class FlinkLogicalAggregate(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + child: RelNode,
        + indicator: Boolean,
        + groupSet: ImmutableBitSet,
        + groupSets: JList[ImmutableBitSet],
        + aggCalls: JList[AggregateCall])
        + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
        + with FlinkLogicalRel {
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + input: RelNode,
        + indicator: Boolean,
        + groupSet: ImmutableBitSet,
        + groupSets: JList[ImmutableBitSet],
        + aggCalls: JList[AggregateCall]): Aggregate =

        { + new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + }

        +
        + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.aggCalls.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + }

        +}
        +
        +private class FlinkLogicalAggregateConverter
        + extends ConverterRule(
        + classOf[LogicalAggregate],
        + Convention.NONE,
        + FlinkConventions.LOGICAL,
        + "FlinkLogicalAggregateConverter") {
        +
        + override def matches(call: RelOptRuleCall): Boolean = {
        + val agg = call.rel(0).asInstanceOf[LogicalAggregate]
        + !agg.containsDistinctCall()
        — End diff –

        Should we apply translation restrictions on the logical level? DataSet and DataStream might differ in their support for different types of operations. It might be better to let the physical optimization decides what's supported and what is not.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109261751 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import java.util. {List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core. {Aggregate, AggregateCall} +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.rel.FlinkConventions + +class FlinkLogicalAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + child: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: JList [ImmutableBitSet] , + aggCalls: JList [AggregateCall] ) + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: JList [ImmutableBitSet] , + aggCalls: JList [AggregateCall] ): Aggregate = { + new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.aggCalls.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } +} + +private class FlinkLogicalAggregateConverter + extends ConverterRule( + classOf [LogicalAggregate] , + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalAggregateConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf [LogicalAggregate] + !agg.containsDistinctCall() — End diff – Should we apply translation restrictions on the logical level? DataSet and DataStream might differ in their support for different types of operations. It might be better to let the physical optimization decides what's supported and what is not.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262449

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala —
        @@ -0,0 +1,109 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.`type`.RelDataType
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.TableScan
        +import org.apache.calcite.rel.logical.LogicalTableScan
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelNode, RelWriter}

        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.calcite.FlinkTypeFactory
        +import org.apache.flink.table.plan.schema.TableSourceTable
        +import org.apache.flink.table.rel.FlinkConventions
        +import org.apache.flink.table.sources.TableSource
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalTableSourceScan(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + table: RelOptTable,
        + val tableSource: TableSource[_])
        + extends TableScan(cluster, traitSet, table)
        + with FlinkLogicalRel {
        +
        + def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan =

        { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + }

        +
        + override def deriveRowType(): RelDataType =

        { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + }

        +
        + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + }

        +
        + override def explainTerms(pw: RelWriter): RelWriter = {
        — End diff –

        other `FlinkLogical` nodes do not implement `explainTerms()`

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262449 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala — @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelNode, RelWriter} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConverters._ + +class FlinkLogicalTableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + val tableSource: TableSource [_] ) + extends TableScan(cluster, traitSet, table) + with FlinkLogicalRel { + + def copy(traitSet: RelTraitSet, tableSource: TableSource [_] ): FlinkLogicalTableSourceScan = { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + } + + override def deriveRowType(): RelDataType = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + } + + override def explainTerms(pw: RelWriter): RelWriter = { — End diff – other `FlinkLogical` nodes do not implement `explainTerms()`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109395373

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala —
        @@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
        import org.apache.flink.table.plan.nodes.dataset.

        {DataSetCalc, DataSetSort}

        import java.lang.Double

        +import org.apache.flink.table.rel.logical.FlinkLogicalCalc
        +
        object FlinkRelMdRowCount extends RelMdRowCount {

        • val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
        • BuiltInMethod.ROW_COUNT.method,
        • this)
          + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
          + BuiltInMethod.ROW_COUNT.method,
          + this)
          +
          + def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
            • End diff –

        Can we change this to `def getRowCount(rel: Calc, mq: RelMetadataQuery): Double` to cover `DataSetCalc` and `FlinkLogicalCalc` or would that be too generic?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109395373 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala — @@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod import org.apache.flink.table.plan.nodes.dataset. {DataSetCalc, DataSetSort} import java.lang.Double +import org.apache.flink.table.rel.logical.FlinkLogicalCalc + object FlinkRelMdRowCount extends RelMdRowCount { val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( BuiltInMethod.ROW_COUNT.method, this) + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.ROW_COUNT.method, + this) + + def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq) End diff – Can we change this to `def getRowCount(rel: Calc, mq: RelMetadataQuery): Double` to cover `DataSetCalc` and `FlinkLogicalCalc` or would that be too generic?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262692

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala —
        @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {

        val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)

        • val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table")
          + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
          + s"UNION ALL SELECT a, b, c FROM $table")
            • End diff –

        Why did you change `UNION` to `UNION ALL`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262692 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala — @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable [(Long, Int, String)] ('d, 'e, 'f) val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + + s"UNION ALL SELECT a, b, c FROM $table") End diff – Why did you change `UNION` to `UNION ALL`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262207

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala —
        @@ -0,0 +1,148 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.RelFieldCollation.Direction
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.Sort
        +import org.apache.calcite.rel.logical.LogicalSort
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelCollation, RelNode, RelWriter}

        +import org.apache.calcite.rex.

        {RexLiteral, RexNode}

        +import org.apache.flink.api.common.operators.Order
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalSort(
        + cluster: RelOptCluster,
        + traits: RelTraitSet,
        + child: RelNode,
        + collation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode)
        + extends Sort(cluster, traits, child, collation, offset, fetch)
        + with FlinkLogicalRel {
        +
        + private val limitStart: Long = if (offset != null)

        { + RexLiteral.intValue(offset) + }

        else

        { + 0L + }

        +
        + private val limitEnd: Long = if (fetch != null)

        { + RexLiteral.intValue(fetch) + limitStart + }

        else

        { + Long.MaxValue + }

        +
        + val getOffset: RexNode = offset
        +
        + val getFetch: RexNode = fetch
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + newInput: RelNode,
        + newCollation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode): Sort =

        { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + }

        +
        + override def estimateRowCount(metadata: RelMetadataQuery): Double = {
        + val inputRowCnt = metadata.getRowCount(this.getInput)
        + if (inputRowCnt == null)

        { + inputRowCnt + }

        else {
        + val rowCount = (inputRowCnt - limitStart).max(1.0)
        + if (fetch != null)

        { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + }

        else

        { + rowCount + }

        + }
        + }
        +
        + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost =

        { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + }

        +
        + override def explainTerms(pw: RelWriter) : RelWriter = {
        — End diff –

        the other `FlinkLogical` nodes do not implement `explainTerms()`

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262207 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala — @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelCollation, RelNode, RelWriter} +import org.apache.calcite.rex. {RexLiteral, RexNode} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalSort( + cluster: RelOptCluster, + traits: RelTraitSet, + child: RelNode, + collation: RelCollation, + offset: RexNode, + fetch: RexNode) + extends Sort(cluster, traits, child, collation, offset, fetch) + with FlinkLogicalRel { + + private val limitStart: Long = if (offset != null) { + RexLiteral.intValue(offset) + } else { + 0L + } + + private val limitEnd: Long = if (fetch != null) { + RexLiteral.intValue(fetch) + limitStart + } else { + Long.MaxValue + } + + val getOffset: RexNode = offset + + val getFetch: RexNode = fetch + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + } + + override def estimateRowCount(metadata: RelMetadataQuery): Double = { + val inputRowCnt = metadata.getRowCount(this.getInput) + if (inputRowCnt == null) { + inputRowCnt + } else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + } else { + rowCount + } + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + } + + override def explainTerms(pw: RelWriter) : RelWriter = { — End diff – the other `FlinkLogical` nodes do not implement `explainTerms()`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109261556

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala —
        @@ -0,0 +1,130 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.plan.volcano.RelSubset
        +import org.apache.calcite.rel.RelNode
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core._
        +import org.apache.calcite.rel.logical.LogicalJoin
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rex.RexNode
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalJoin(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + left: RelNode,
        + right: RelNode,
        + condition: RexNode,
        + joinType: JoinRelType)
        + extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
        + with FlinkLogicalRel {
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + conditionExpr: RexNode,
        + left: RelNode,
        + right: RelNode,
        + joinType: JoinRelType,
        + semiJoinDone: Boolean): Join =

        { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + }

        +
        + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + }

        +}
        +
        +private class FlinkLogicalJoinConverter
        + extends ConverterRule(
        + classOf[LogicalJoin],
        + Convention.NONE,
        + FlinkConventions.LOGICAL,
        + "FlinkLogicalJoinConverter") {
        +
        + override def matches(call: RelOptRuleCall): Boolean = {
        + val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
        + val joinInfo = join.analyzeCondition
        +
        + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
        — End diff –

        Do we need these checks here? Whether the join can be translated depends also on stream and batch. So we could just create a join and let the physical optimization decide whether this can be translated or not.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109261556 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala — @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType) + extends Join(cluster, traitSet, left, right, condition, Set.empty [CorrelationId] .asJava, joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + } +} + +private class FlinkLogicalJoinConverter + extends ConverterRule( + classOf [LogicalJoin] , + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalJoinConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalJoin = call.rel(0).asInstanceOf [LogicalJoin] + val joinInfo = join.analyzeCondition + + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join) — End diff – Do we need these checks here? Whether the join can be translated depends also on stream and batch. So we could just create a join and let the physical optimization decide whether this can be translated or not.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109257615

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala —
        @@ -18,58 +18,57 @@
        package org.apache.flink.table.plan.rules.dataSet

        import org.apache.calcite.plan.volcano.RelSubset
        -import org.apache.calcite.plan.

        {Convention, RelOptRule, RelOptRuleCall, RelTraitSet}

        +import org.apache.calcite.plan.

        {RelOptRule, RelOptRuleCall, RelTraitSet}

        import org.apache.calcite.rel.RelNode
        import org.apache.calcite.rel.convert.ConverterRule
        -import org.apache.calcite.rel.logical.

        {LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}

        import org.apache.calcite.rex.RexNode
        -import org.apache.flink.table.plan.nodes.dataset.

        {DataSetConvention, DataSetCorrelate}

        +import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
        +import org.apache.flink.table.rel.FlinkConventions
        +import org.apache.flink.table.rel.logical.

        {FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}

        /**

        • Rule to convert a LogicalCorrelate into a DataSetCorrelate.
            • End diff –

        update comment

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109257615 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala — @@ -18,58 +18,57 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan. {Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical. {LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} import org.apache.calcite.rex.RexNode -import org.apache.flink.table.plan.nodes.dataset. {DataSetConvention, DataSetCorrelate} +import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.rel.logical. {FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} /** Rule to convert a LogicalCorrelate into a DataSetCorrelate. End diff – update comment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262356

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala —
        @@ -0,0 +1,148 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.RelFieldCollation.Direction
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.Sort
        +import org.apache.calcite.rel.logical.LogicalSort
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelCollation, RelNode, RelWriter}

        +import org.apache.calcite.rex.

        {RexLiteral, RexNode}

        +import org.apache.flink.api.common.operators.Order
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalSort(
        + cluster: RelOptCluster,
        + traits: RelTraitSet,
        + child: RelNode,
        + collation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode)
        + extends Sort(cluster, traits, child, collation, offset, fetch)
        + with FlinkLogicalRel {
        +
        + private val limitStart: Long = if (offset != null)

        { + RexLiteral.intValue(offset) + }

        else

        { + 0L + }

        +
        + private val limitEnd: Long = if (fetch != null)

        { + RexLiteral.intValue(fetch) + limitStart + }

        else

        { + Long.MaxValue + }

        +
        + val getOffset: RexNode = offset
        +
        + val getFetch: RexNode = fetch
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + newInput: RelNode,
        + newCollation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode): Sort =

        { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + }

        +
        + override def estimateRowCount(metadata: RelMetadataQuery): Double = {
        + val inputRowCnt = metadata.getRowCount(this.getInput)
        + if (inputRowCnt == null)

        { + inputRowCnt + }

        else {
        + val rowCount = (inputRowCnt - limitStart).max(1.0)
        + if (fetch != null)

        { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + }

        else

        { + rowCount + }

        + }
        + }
        +
        + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost =

        { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + }

        +
        + override def explainTerms(pw: RelWriter) : RelWriter = {
        — End diff –

        some of the other methods could be removed as well

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262356 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala — @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelCollation, RelNode, RelWriter} +import org.apache.calcite.rex. {RexLiteral, RexNode} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalSort( + cluster: RelOptCluster, + traits: RelTraitSet, + child: RelNode, + collation: RelCollation, + offset: RexNode, + fetch: RexNode) + extends Sort(cluster, traits, child, collation, offset, fetch) + with FlinkLogicalRel { + + private val limitStart: Long = if (offset != null) { + RexLiteral.intValue(offset) + } else { + 0L + } + + private val limitEnd: Long = if (fetch != null) { + RexLiteral.intValue(fetch) + limitStart + } else { + Long.MaxValue + } + + val getOffset: RexNode = offset + + val getFetch: RexNode = fetch + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + } + + override def estimateRowCount(metadata: RelMetadataQuery): Double = { + val inputRowCnt = metadata.getRowCount(this.getInput) + if (inputRowCnt == null) { + inputRowCnt + } else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + } else { + rowCount + } + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + } + + override def explainTerms(pw: RelWriter) : RelWriter = { — End diff – some of the other methods could be removed as well
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262780

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala —
        @@ -85,8 +85,10 @@ class QueryDecorrelationTest extends TableTestBase {
        term("joinType", "InnerJoin")
        ),
        term("select", "empno0", "salary")

        • ),
        • term("groupBy", "empno0"),
          + )
          + ,
          + term("groupBy", "empno0")
            • End diff –

        revert these changes?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262780 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala — @@ -85,8 +85,10 @@ class QueryDecorrelationTest extends TableTestBase { term("joinType", "InnerJoin") ), term("select", "empno0", "salary") ), term("groupBy", "empno0"), + ) + , + term("groupBy", "empno0") End diff – revert these changes?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109253071

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
        @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
        }

        // 3. optimize the logical Flink plan

        • val optRuleSet = getOptRuleSet
        • val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
        • val optimizedPlan = if (optRuleSet.iterator().hasNext) {
        • runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
          + val logicalOptRuleSet = getLogicalOptRuleSet
          + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
          + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
          + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
            • End diff –

        Isn't the plan to use the rule-based `HepPlanner` instead of the `VolcanoPlanner` for logical optimization or should switching be a follow-up step?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109253071 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment( } // 3. optimize the logical Flink plan val optRuleSet = getOptRuleSet val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() val optimizedPlan = if (optRuleSet.iterator().hasNext) { runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) + val logicalOptRuleSet = getLogicalOptRuleSet + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) End diff – Isn't the plan to use the rule-based `HepPlanner` instead of the `VolcanoPlanner` for logical optimization or should switching be a follow-up step?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r109262077

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        — End diff –

        move the `FlinkLogicalRels` to `org.apache.flink.table.plan.nodes.flinklogical`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r109262077 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical — End diff – move the `FlinkLogicalRels` to `org.apache.flink.table.plan.nodes.flinklogical`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111856357

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
        @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
        }

        // 3. optimize the logical Flink plan

        • val optRuleSet = getOptRuleSet
        • val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
        • val optimizedPlan = if (optRuleSet.iterator().hasNext) {
        • runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
          + val logicalOptRuleSet = getLogicalOptRuleSet
          + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
          + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
          + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
            • End diff –

        Yes, i will first test how HepPlanner works in some test and product environments and then decide whether or how we change it to rule-based planner. So this will be my follow up task.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111856357 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment( } // 3. optimize the logical Flink plan val optRuleSet = getOptRuleSet val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() val optimizedPlan = if (optRuleSet.iterator().hasNext) { runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) + val logicalOptRuleSet = getLogicalOptRuleSet + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) End diff – Yes, i will first test how HepPlanner works in some test and product environments and then decide whether or how we change it to rule-based planner. So this will be my follow up task.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111857715

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala —
        @@ -0,0 +1,130 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.plan.volcano.RelSubset
        +import org.apache.calcite.rel.RelNode
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core._
        +import org.apache.calcite.rel.logical.LogicalJoin
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rex.RexNode
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalJoin(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + left: RelNode,
        + right: RelNode,
        + condition: RexNode,
        + joinType: JoinRelType)
        + extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
        + with FlinkLogicalRel {
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + conditionExpr: RexNode,
        + left: RelNode,
        + right: RelNode,
        + joinType: JoinRelType,
        + semiJoinDone: Boolean): Join =

        { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + }

        +
        + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + }

        +}
        +
        +private class FlinkLogicalJoinConverter
        + extends ConverterRule(
        + classOf[LogicalJoin],
        + Convention.NONE,
        + FlinkConventions.LOGICAL,
        + "FlinkLogicalJoinConverter") {
        +
        + override def matches(call: RelOptRuleCall): Boolean = {
        + val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
        + val joinInfo = join.analyzeCondition
        +
        + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
        — End diff –

        Currently the answer is yes, because the physical join can only support equation condition with simple keys.
        If the join key is express like `a + 1 = b - 2`, if we don't have this restriction in logical layer, the join condition will be like this and we can't translate to physical join. There are 2 possible solutions:
        1. keep it this way
        2. Re-introduce rules which can exact expression out of join condition and add addition "Calc" node to keep join key as simple field.
        I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to let logical layer know all these restrictions. What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111857715 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala — @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType) + extends Join(cluster, traitSet, left, right, condition, Set.empty [CorrelationId] .asJava, joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + } +} + +private class FlinkLogicalJoinConverter + extends ConverterRule( + classOf [LogicalJoin] , + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalJoinConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalJoin = call.rel(0).asInstanceOf [LogicalJoin] + val joinInfo = join.analyzeCondition + + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join) — End diff – Currently the answer is yes, because the physical join can only support equation condition with simple keys. If the join key is express like `a + 1 = b - 2`, if we don't have this restriction in logical layer, the join condition will be like this and we can't translate to physical join. There are 2 possible solutions: 1. keep it this way 2. Re-introduce rules which can exact expression out of join condition and add addition "Calc" node to keep join key as simple field. I prefer 1 for now, and keep 2 in mind. After all, it's not very nice to let logical layer know all these restrictions. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111857813

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        — End diff –

        How about rename `org.apache.flink.table.plan.node` to `org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` and `datastream`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111857813 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical — End diff – How about rename `org.apache.flink.table.plan.node` to `org.apache.flink.table.plan.rel` and have 3 sub packages: `logical`, `dataset` and `datastream`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111859756

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala —
        @@ -0,0 +1,148 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.RelFieldCollation.Direction
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.Sort
        +import org.apache.calcite.rel.logical.LogicalSort
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelCollation, RelNode, RelWriter}

        +import org.apache.calcite.rex.

        {RexLiteral, RexNode}

        +import org.apache.flink.api.common.operators.Order
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalSort(
        + cluster: RelOptCluster,
        + traits: RelTraitSet,
        + child: RelNode,
        + collation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode)
        + extends Sort(cluster, traits, child, collation, offset, fetch)
        + with FlinkLogicalRel {
        +
        + private val limitStart: Long = if (offset != null)

        { + RexLiteral.intValue(offset) + }

        else

        { + 0L + }

        +
        + private val limitEnd: Long = if (fetch != null)

        { + RexLiteral.intValue(fetch) + limitStart + }

        else

        { + Long.MaxValue + }

        +
        + val getOffset: RexNode = offset
        +
        + val getFetch: RexNode = fetch
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + newInput: RelNode,
        + newCollation: RelCollation,
        + offset: RexNode,
        + fetch: RexNode): Sort =

        { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + }

        +
        + override def estimateRowCount(metadata: RelMetadataQuery): Double = {
        + val inputRowCnt = metadata.getRowCount(this.getInput)
        + if (inputRowCnt == null)

        { + inputRowCnt + }

        else {
        + val rowCount = (inputRowCnt - limitStart).max(1.0)
        + if (fetch != null)

        { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + }

        else

        { + rowCount + }

        + }
        + }
        +
        + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost =

        { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + }

        +
        + override def explainTerms(pw: RelWriter) : RelWriter = {
        — End diff –

        yes, these can be removed

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111859756 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala — @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelCollation, RelNode, RelWriter} +import org.apache.calcite.rex. {RexLiteral, RexNode} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalSort( + cluster: RelOptCluster, + traits: RelTraitSet, + child: RelNode, + collation: RelCollation, + offset: RexNode, + fetch: RexNode) + extends Sort(cluster, traits, child, collation, offset, fetch) + with FlinkLogicalRel { + + private val limitStart: Long = if (offset != null) { + RexLiteral.intValue(offset) + } else { + 0L + } + + private val limitEnd: Long = if (fetch != null) { + RexLiteral.intValue(fetch) + limitStart + } else { + Long.MaxValue + } + + val getOffset: RexNode = offset + + val getFetch: RexNode = fetch + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + } + + override def estimateRowCount(metadata: RelMetadataQuery): Double = { + val inputRowCnt = metadata.getRowCount(this.getInput) + if (inputRowCnt == null) { + inputRowCnt + } else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + } else { + rowCount + } + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + } + + override def explainTerms(pw: RelWriter) : RelWriter = { — End diff – yes, these can be removed
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111860136

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala —
        @@ -0,0 +1,109 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.`type`.RelDataType
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.TableScan
        +import org.apache.calcite.rel.logical.LogicalTableScan
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelNode, RelWriter}

        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.calcite.FlinkTypeFactory
        +import org.apache.flink.table.plan.schema.TableSourceTable
        +import org.apache.flink.table.rel.FlinkConventions
        +import org.apache.flink.table.sources.TableSource
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalTableSourceScan(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + table: RelOptTable,
        + val tableSource: TableSource[_])
        + extends TableScan(cluster, traitSet, table)
        + with FlinkLogicalRel {
        +
        + def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan =

        { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + }

        +
        + override def deriveRowType(): RelDataType =

        { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + }

        +
        + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + }

        +
        + override def explainTerms(pw: RelWriter): RelWriter = {
        — End diff –

        We need to rewrite `explainTerms` and `toString` to make the RelNode's digest distinguished from other `TableSourceScan`. The default implementation only compare the table name and it's not enough when we push filter or projects into the `TableSource`.

        Actually there is a bug in `toString` and fixed in https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a, i will backport it in here.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111860136 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala — @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelNode, RelWriter} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConverters._ + +class FlinkLogicalTableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + val tableSource: TableSource [_] ) + extends TableScan(cluster, traitSet, table) + with FlinkLogicalRel { + + def copy(traitSet: RelTraitSet, tableSource: TableSource [_] ): FlinkLogicalTableSourceScan = { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + } + + override def deriveRowType(): RelDataType = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + } + + override def explainTerms(pw: RelWriter): RelWriter = { — End diff – We need to rewrite `explainTerms` and `toString` to make the RelNode's digest distinguished from other `TableSourceScan`. The default implementation only compare the table name and it's not enough when we push filter or projects into the `TableSource`. Actually there is a bug in `toString` and fixed in https://github.com/apache/flink/commit/697cc96106846547ff856aa5e478fee037ffde1a , i will backport it in here.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111860657

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala —
        @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {

        val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)

        • val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table")
          + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
          + s"UNION ALL SELECT a, b, c FROM $table")
            • End diff –

        Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i understand correctly, `UNION` will do a global distinct for all fields and `UNION ALL` just concat two datasets or datastreams. I think the behavior of `DataStream.union` is rather `UNION ALL` than `UNION`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111860657 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala — @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable [(Long, Int, String)] ('d, 'e, 'f) val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + + s"UNION ALL SELECT a, b, c FROM $table") End diff – Actually it's wrong in old test using `UNION` instead of `UNION ALL`. If i understand correctly, `UNION` will do a global distinct for all fields and `UNION ALL` just concat two datasets or datastreams. I think the behavior of `DataStream.union` is rather `UNION ALL` than `UNION`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111861037

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala —
        @@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod
        import org.apache.flink.table.plan.nodes.dataset.

        {DataSetCalc, DataSetSort}

        import java.lang.Double

        +import org.apache.flink.table.rel.logical.FlinkLogicalCalc
        +
        object FlinkRelMdRowCount extends RelMdRowCount {

        • val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
        • BuiltInMethod.ROW_COUNT.method,
        • this)
          + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
          + BuiltInMethod.ROW_COUNT.method,
          + this)
          +
          + def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq)
            • End diff –

        I'm ok with both, will change to `Calc`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111861037 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/cost/FlinkRelMdRowCount.scala — @@ -23,13 +23,17 @@ import org.apache.calcite.util.BuiltInMethod import org.apache.flink.table.plan.nodes.dataset. {DataSetCalc, DataSetSort} import java.lang.Double +import org.apache.flink.table.rel.logical.FlinkLogicalCalc + object FlinkRelMdRowCount extends RelMdRowCount { val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( BuiltInMethod.ROW_COUNT.method, this) + val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.ROW_COUNT.method, + this) + + def getRowCount(rel: FlinkLogicalCalc, mq: RelMetadataQuery): Double = rel.estimateRowCount(mq) End diff – I'm ok with both, will change to `Calc`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111861591

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala —
        @@ -18,58 +18,57 @@
        package org.apache.flink.table.plan.rules.dataSet

        import org.apache.calcite.plan.volcano.RelSubset
        -import org.apache.calcite.plan.

        {Convention, RelOptRule, RelOptRuleCall, RelTraitSet}

        +import org.apache.calcite.plan.

        {RelOptRule, RelOptRuleCall, RelTraitSet}

        import org.apache.calcite.rel.RelNode
        import org.apache.calcite.rel.convert.ConverterRule
        -import org.apache.calcite.rel.logical.

        {LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}

        import org.apache.calcite.rex.RexNode
        -import org.apache.flink.table.plan.nodes.dataset.

        {DataSetConvention, DataSetCorrelate}

        +import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate
        +import org.apache.flink.table.rel.FlinkConventions
        +import org.apache.flink.table.rel.logical.

        {FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}

        /**

        • Rule to convert a LogicalCorrelate into a DataSetCorrelate.
            • End diff –

        I will remove this simple comment to be consistent will other convert rules.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111861591 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala — @@ -18,58 +18,57 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan. {Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical. {LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} import org.apache.calcite.rex.RexNode -import org.apache.flink.table.plan.nodes.dataset. {DataSetConvention, DataSetCorrelate} +import org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.rel.logical. {FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} /** Rule to convert a LogicalCorrelate into a DataSetCorrelate. End diff – I will remove this simple comment to be consistent will other convert rules.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on the issue:

        https://github.com/apache/flink/pull/3594

        Hi @fhueske , sorry for taking it so long to update this PR.
        Looks like it has so many conflicts with master now, once you are ok with all the changes, i will rebase it to master.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Hi @fhueske , sorry for taking it so long to update this PR. Looks like it has so many conflicts with master now, once you are ok with all the changes, i will rebase it to master.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111917910

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala —
        @@ -0,0 +1,130 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.plan.volcano.RelSubset
        +import org.apache.calcite.rel.RelNode
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core._
        +import org.apache.calcite.rel.logical.LogicalJoin
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rex.RexNode
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalJoin(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + left: RelNode,
        + right: RelNode,
        + condition: RexNode,
        + joinType: JoinRelType)
        + extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType)
        + with FlinkLogicalRel {
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + conditionExpr: RexNode,
        + left: RelNode,
        + right: RelNode,
        + joinType: JoinRelType,
        + semiJoinDone: Boolean): Join =

        { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + }

        +
        + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + }

        +}
        +
        +private class FlinkLogicalJoinConverter
        + extends ConverterRule(
        + classOf[LogicalJoin],
        + Convention.NONE,
        + FlinkConventions.LOGICAL,
        + "FlinkLogicalJoinConverter") {
        +
        + override def matches(call: RelOptRuleCall): Boolean = {
        + val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
        + val joinInfo = join.analyzeCondition
        +
        + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
        — End diff –

        I see, thanks for the explanation. I agree, we need to keep the restriction here, to push the logical plan in the right direction.

        We might need different sets of logical optimization rules for batch and streaming at some point.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111917910 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalJoin.scala — @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.flink.table.rel.FlinkConventions + +import scala.collection.JavaConverters._ + +class FlinkLogicalJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType) + extends Join(cluster, traitSet, left, right, condition, Set.empty [CorrelationId] .asJava, joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val leftRowCnt = metadata.getRowCount(getLeft) + val leftRowSize = estimateRowSize(getLeft.getRowType) + + val rightRowCnt = metadata.getRowCount(getRight) + val rightRowSize = estimateRowSize(getRight.getRowType) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + } +} + +private class FlinkLogicalJoinConverter + extends ConverterRule( + classOf [LogicalJoin] , + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalJoinConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalJoin = call.rel(0).asInstanceOf [LogicalJoin] + val joinInfo = join.analyzeCondition + + hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join) — End diff – I see, thanks for the explanation. I agree, we need to keep the restriction here, to push the logical plan in the right direction. We might need different sets of logical optimization rules for batch and streaming at some point.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111918681

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        — End diff –

        I'd like to keep the package name at `org.apache.flink.table.plan.nodes` to avoid another refactoring, but `logical` sounds better than `flinklogical`, IMO.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111918681 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical — End diff – I'd like to keep the package name at `org.apache.flink.table.plan.nodes` to avoid another refactoring, but `logical` sounds better than `flinklogical`, IMO.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111918822

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import java.util.

        {List => JList}

        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.RelNode
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.

        {Aggregate, AggregateCall}

        +import org.apache.calcite.rel.logical.LogicalAggregate
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.util.ImmutableBitSet
        +import org.apache.flink.table.rel.FlinkConventions
        +
        +class FlinkLogicalAggregate(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + child: RelNode,
        + indicator: Boolean,
        + groupSet: ImmutableBitSet,
        + groupSets: JList[ImmutableBitSet],
        + aggCalls: JList[AggregateCall])
        + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
        + with FlinkLogicalRel {
        +
        + override def copy(
        + traitSet: RelTraitSet,
        + input: RelNode,
        + indicator: Boolean,
        + groupSet: ImmutableBitSet,
        + groupSets: JList[ImmutableBitSet],
        + aggCalls: JList[AggregateCall]): Aggregate =

        { + new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + }

        +
        + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.aggCalls.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + }

        +}
        +
        +private class FlinkLogicalAggregateConverter
        + extends ConverterRule(
        + classOf[LogicalAggregate],
        + Convention.NONE,
        + FlinkConventions.LOGICAL,
        + "FlinkLogicalAggregateConverter") {
        +
        + override def matches(call: RelOptRuleCall): Boolean = {
        + val agg = call.rel(0).asInstanceOf[LogicalAggregate]
        + !agg.containsDistinctCall()
        — End diff –

        +1

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111918822 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import java.util. {List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core. {Aggregate, AggregateCall} +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.rel.FlinkConventions + +class FlinkLogicalAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + child: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: JList [ImmutableBitSet] , + aggCalls: JList [AggregateCall] ) + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: JList [ImmutableBitSet] , + aggCalls: JList [AggregateCall] ): Aggregate = { + new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.aggCalls.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } +} + +private class FlinkLogicalAggregateConverter + extends ConverterRule( + classOf [LogicalAggregate] , + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalAggregateConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf [LogicalAggregate] + !agg.containsDistinctCall() — End diff – +1
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111918921

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala —
        @@ -0,0 +1,109 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        +
        +import org.apache.calcite.plan._
        +import org.apache.calcite.rel.`type`.RelDataType
        +import org.apache.calcite.rel.convert.ConverterRule
        +import org.apache.calcite.rel.core.TableScan
        +import org.apache.calcite.rel.logical.LogicalTableScan
        +import org.apache.calcite.rel.metadata.RelMetadataQuery
        +import org.apache.calcite.rel.

        {RelNode, RelWriter}

        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.calcite.FlinkTypeFactory
        +import org.apache.flink.table.plan.schema.TableSourceTable
        +import org.apache.flink.table.rel.FlinkConventions
        +import org.apache.flink.table.sources.TableSource
        +
        +import scala.collection.JavaConverters._
        +
        +class FlinkLogicalTableSourceScan(
        + cluster: RelOptCluster,
        + traitSet: RelTraitSet,
        + table: RelOptTable,
        + val tableSource: TableSource[_])
        + extends TableScan(cluster, traitSet, table)
        + with FlinkLogicalRel {
        +
        + def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan =

        { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + }

        +
        + override def deriveRowType(): RelDataType =

        { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + }

        +
        + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

        { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + }

        +
        + override def explainTerms(pw: RelWriter): RelWriter = {
        — End diff –

        I see, +1

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111918921 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalTableSourceScan.scala — @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelNode, RelWriter} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.rel.FlinkConventions +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConverters._ + +class FlinkLogicalTableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + val tableSource: TableSource [_] ) + extends TableScan(cluster, traitSet, table) + with FlinkLogicalRel { + + def copy(traitSet: RelTraitSet, tableSource: TableSource [_] ): FlinkLogicalTableSourceScan = { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + } + + override def deriveRowType(): RelDataType = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val rowCnt = metadata.getRowCount(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + } + + override def explainTerms(pw: RelWriter): RelWriter = { — End diff – I see, +1
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111919363

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala —
        @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {

        val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)

        • val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table")
          + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
          + s"UNION ALL SELECT a, b, c FROM $table")
            • End diff –

        Yes, you are right. DataStream can only do `UNION ALL`.
        We should prevent the translation of `UNION` and also change the explainString to `union all`, IMO.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111919363 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala — @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable [(Long, Int, String)] ('d, 'e, 'f) val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + + s"UNION ALL SELECT a, b, c FROM $table") End diff – Yes, you are right. DataStream can only do `UNION ALL`. We should prevent the translation of `UNION` and also change the explainString to `union all`, IMO.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3594

        Thanks for the update @KurtYoung. I'll have a detailed look at the changes later today.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3594 Thanks for the update @KurtYoung. I'll have a detailed look at the changes later today.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111930527

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala —
        @@ -0,0 +1,92 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.rel.logical
        — End diff –

        OK, will rename the packages.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111930527 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalAggregate.scala — @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.rel.logical — End diff – OK, will rename the packages.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111931445

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala —
        @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase {

        val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)

        • val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table")
          + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " +
          + s"UNION ALL SELECT a, b, c FROM $table")
            • End diff –

        +1, will change.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111931445 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala — @@ -335,7 +335,8 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable [(Long, Int, String)] ('d, 'e, 'f) val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + + s"UNION ALL SELECT a, b, c FROM $table") End diff – +1, will change.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on the issue:

        https://github.com/apache/flink/pull/3594

        @fhueske I have rename the package and update the union description.

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 @fhueske I have rename the package and update the union description.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3594#discussion_r111953023

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
        @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment(
        }

        // 3. optimize the logical Flink plan

        • val optRuleSet = getOptRuleSet
        • val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
        • val optimizedPlan = if (optRuleSet.iterator().hasNext) {
        • runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
          + val logicalOptRuleSet = getLogicalOptRuleSet
          + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
          + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
          + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
            • End diff –

        +1

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3594#discussion_r111953023 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -228,15 +229,24 @@ abstract class BatchTableEnvironment( } // 3. optimize the logical Flink plan val optRuleSet = getOptRuleSet val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() val optimizedPlan = if (optRuleSet.iterator().hasNext) { runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) + val logicalOptRuleSet = getLogicalOptRuleSet + val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() + val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { + runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) End diff – +1
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3594

        Hi @KurtYoung, thanks for the update!
        I think this PR is good to merge.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3594 Hi @KurtYoung, thanks for the update! I think this PR is good to merge.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user KurtYoung commented on the issue:

        https://github.com/apache/flink/pull/3594

        Rebased to master and will merge after build check

        Show
        githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3594 Rebased to master and will merge after build check
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3594

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3594
        Hide
        dshkvyra Dmytro Shkvyra added a comment -

        This change make fail SQL statements with null parts:
        Like this:

            val sqlQuery =
              "SELECT a, cnt " +
                "FROM" +
                " (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) " +
                "RIGHT JOIN A " +
                "ON a < cnt"
        

        Error:

        
        org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
        
        LogicalProject(a=[$1], cnt=[$0])
          LogicalJoin(condition=[<($1, $0)], joinType=[right])
            LogicalProject(cnt=[$0])
              LogicalFilter(condition=[<($0, 0)])
                LogicalAggregate(group=[{}], cnt=[COUNT()])
                  LogicalProject($f0=[0])
                    LogicalTableScan(table=[[B]])
            LogicalTableScan(table=[[A]])
        
        This exception indicates that the query uses an unsupported SQL feature.
        Please check the documentation for the set of currently supported SQL features.
        
        	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:267)
        	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:235)
        	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:265)
        	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
        	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
        
        Show
        dshkvyra Dmytro Shkvyra added a comment - This change make fail SQL statements with null parts: Like this: val sqlQuery = "SELECT a, cnt " + "FROM" + " (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) " + "RIGHT JOIN A " + "ON a < cnt" Error: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(a=[$1], cnt=[$0]) LogicalJoin(condition=[<($1, $0)], joinType=[right]) LogicalProject(cnt=[$0]) LogicalFilter(condition=[<($0, 0)]) LogicalAggregate(group=[{}], cnt=[COUNT()]) LogicalProject($f0=[0]) LogicalTableScan(table=[[B]]) LogicalTableScan(table=[[A]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:267) at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:235) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:265) at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user DmytroShkvyra commented on the issue:

        https://github.com/apache/flink/pull/3594

        @KurtYoung, @fhueske This PR violated work with null nodes:
        `
        org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

        LogicalProject(a=[$1], cnt=[$0])
        LogicalJoin(condition=[<($1, $0)], joinType=[right])
        LogicalProject(cnt=[$0])
        LogicalFilter(condition=[<($0, 0)])
        LogicalAggregate(group=[{}], cnt=[COUNT()])
        LogicalProject($f0=[0])
        LogicalTableScan(table=[[B]])
        LogicalTableScan(table=[[A]])

        This exception indicates that the query uses an unsupported SQL feature.
        Please check the documentation for the set of currently supported SQL features.

        at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:267)
        at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:235)
        at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:265)
        at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
        at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)`

        Show
        githubbot ASF GitHub Bot added a comment - Github user DmytroShkvyra commented on the issue: https://github.com/apache/flink/pull/3594 @KurtYoung, @fhueske This PR violated work with null nodes: ` org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalProject(a= [$1] , cnt= [$0] ) LogicalJoin(condition= [<($1, $0)] , joinType= [right] ) LogicalProject(cnt= [$0] ) LogicalFilter(condition= [<($0, 0)] ) LogicalAggregate(group= [{}] , cnt= [COUNT()] ) LogicalProject($f0= [0] ) LogicalTableScan(table=[ [B] ]) LogicalTableScan(table=[ [A] ]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:267) at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:235) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:265) at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)`
        Hide
        ykt836 Kurt Young added a comment -

        Dmytro Shkvyra Thanks for the reporting, i will look into it.

        Show
        ykt836 Kurt Young added a comment - Dmytro Shkvyra Thanks for the reporting, i will look into it.
        Hide
        ykt836 Kurt Young added a comment -

        Hi Dmytro Shkvyra, i have tried to reproduce the problem with out this change, and it also raise the same exception.
        I believe the root cause is actually the LogicalJoin(condition=[<($1, $0)], joinType=[right]) does't have equality contidion and the join type is not inner. The DataSetJoinRule has explicitly forbidden such kind of join by the change of https://issues.apache.org/jira/browse/FLINK-5520

        Show
        ykt836 Kurt Young added a comment - Hi Dmytro Shkvyra , i have tried to reproduce the problem with out this change, and it also raise the same exception. I believe the root cause is actually the LogicalJoin(condition= [<($1, $0)] , joinType= [right] ) does't have equality contidion and the join type is not inner. The DataSetJoinRule has explicitly forbidden such kind of join by the change of https://issues.apache.org/jira/browse/FLINK-5520
        Hide
        dshkvyra Dmytro Shkvyra added a comment -

        Ok Kurt Young, I will close it. Thanks for clarification.

        Show
        dshkvyra Dmytro Shkvyra added a comment - Ok Kurt Young , I will close it. Thanks for clarification.

          People

          • Assignee:
            ykt836 Kurt Young
            Reporter:
            ykt836 Kurt Young
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development