Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      This jira proposes to add support for TUMBLE_START() / TUMBLE_END() / HOP_START() / HOP_END() / SESSUIB_START() / SESSION_END() in the planner in Flink.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Can we generalize this to include the batch side as well?
          We can also open another JIRA and resolve the window start / end for batch SQL there.

          What do you think Haohui Mai?

          Show
          fhueske Fabian Hueske added a comment - Can we generalize this to include the batch side as well? We can also open another JIRA and resolve the window start / end for batch SQL there. What do you think Haohui Mai ?
          Hide
          wheat9 Haohui Mai added a comment -

          I'm okay with either way, but I think we have a decent chance to implement both side in the same code as it can be implemented as a transformation that only happens at the logical plan level.

          Maybe we can start with this plan and revisit after FLINK-6261 is landed? What do you think?

          Show
          wheat9 Haohui Mai added a comment - I'm okay with either way, but I think we have a decent chance to implement both side in the same code as it can be implemented as a transformation that only happens at the logical plan level. Maybe we can start with this plan and revisit after FLINK-6261 is landed? What do you think?
          Hide
          fhueske Fabian Hueske added a comment -

          Sounds good to me. I'm currently updating my PR for FLINK-6261. Should be done soon.

          Show
          fhueske Fabian Hueske added a comment - Sounds good to me. I'm currently updating my PR for FLINK-6261 . Should be done soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3693

          FLINK-6012 [table] Support WindowStart / WindowEnd functions in streaming SQL

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-6012

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3693.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3693


          commit 01281682f4845bd1fe87dcc9cf29227592e55a28
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-04-07T08:08:12Z

          FLINK-6012 [table] Support WindowStart / WindowEnd functions in streaming SQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3693 FLINK-6012 [table] Support WindowStart / WindowEnd functions in streaming SQL You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6012 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3693.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3693 commit 01281682f4845bd1fe87dcc9cf29227592e55a28 Author: Haohui Mai <wheat9@apache.org> Date: 2017-04-07T08:08:12Z FLINK-6012 [table] Support WindowStart / WindowEnd functions in streaming SQL.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110362433

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find

          { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + }

          + found.isDefined
          + }
          +
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val innerProject = WindowExpressionRule.getInput[LogicalProject](project.getInput)
          + val agg = WindowExpressionRule.getInput[LogicalWindowAggregate](innerProject.getInput)
          +
          + // Create an additional project to conform with types
          + val transformed = call.builder()
          + val rexBuilder = transformed.getRexBuilder
          + transformed.push(LogicalWindowAggregate.create(
          + agg.getWindow,
          + Seq(
          + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
          + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
          + ), agg)
          + )
          + val aggSize = transformed.fields().size()
          +
          + transformed
          + .project(innerProject.getProjects ++ Seq(
          — End diff –

          Can be simplified to `Seq(transformed.field("w$start"), transformed.field("w$end"))`. This makes it also independent of the implementation of the window aggregate (does not rely on window properties being the last fields).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110362433 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + } + found.isDefined + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val innerProject = WindowExpressionRule.getInput [LogicalProject] (project.getInput) + val agg = WindowExpressionRule.getInput [LogicalWindowAggregate] (innerProject.getInput) + + // Create an additional project to conform with types + val transformed = call.builder() + val rexBuilder = transformed.getRexBuilder + transformed.push(LogicalWindowAggregate.create( + agg.getWindow, + Seq( + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get)) + ), agg) + ) + val aggSize = transformed.fields().size() + + transformed + .project(innerProject.getProjects ++ Seq( — End diff – Can be simplified to `Seq(transformed.field("w$start"), transformed.field("w$end"))`. This makes it also independent of the implementation of the window aggregate (does not rely on window properties being the last fields).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110361292

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find

          { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + }

          + found.isDefined
          + }
          +
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val innerProject = WindowExpressionRule.getInput[LogicalProject](project.getInput)
          — End diff –

          can be `val innerProject = call.rel(1).asInstanceOf[LogicalProject]`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110361292 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + } + found.isDefined + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val innerProject = WindowExpressionRule.getInput [LogicalProject] (project.getInput) — End diff – can be `val innerProject = call.rel(1).asInstanceOf [LogicalProject] `
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110361425

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find

          { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + }

          + found.isDefined
          + }
          +
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val innerProject = WindowExpressionRule.getInput[LogicalProject](project.getInput)
          + val agg = WindowExpressionRule.getInput[LogicalWindowAggregate](innerProject.getInput)
          +
          + // Create an additional project to conform with types
          + val transformed = call.builder()
          + val rexBuilder = transformed.getRexBuilder
          + transformed.push(LogicalWindowAggregate.create(
          + agg.getWindow,
          + Seq(
          + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
          + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
          + ), agg)
          + )
          + val aggSize = transformed.fields().size()
          +
          + transformed
          + .project(innerProject.getProjects ++ Seq(
          + rexBuilder.makeInputRef(transformed.peek(), aggSize - 2),
          + rexBuilder.makeInputRef(transformed.peek(), aggSize - 1)
          + ))
          +
          + val n = transformed.fields().size()
          + val (windowStartExprIdx, windowEndExprIdx) = (n - 2, n - 1)
          +
          + val windowStart = rexBuilder.makeInputRef(transformed.peek(), windowStartExprIdx)
          + val windowEnd = rexBuilder.makeInputRef(transformed.peek(), windowEndExprIdx)
          +
          + transformed.project(
          + project.getProjects.map(x =>
          + WindowExpressionRule.identifyAuxiliaryExpr(x, agg.getWindow) match

          { + case Some(WindowStart(_)) => rexBuilder.makeCast(x.getType, windowStart, false) + case Some(WindowEnd(_)) => rexBuilder.makeCast(x.getType, windowEnd, false) + case _ => x + }

          )
          + )
          + val res = transformed.build()
          + call.transformTo(res)
          + }
          +}
          +
          +object WindowExpressionRule {
          + private val WINDOW_EXPRESSION_RULE_PREDICATE =
          + RelOptRule.operand(classOf[LogicalProject],
          + RelOptRule.operand(classOf[LogicalProject],
          + RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
          +
          + val INSTANCE = new WindowExpressionRule
          +
          + private def getInput[T](node: RelNode) : T = {
          — End diff –

          not needed. A `RelOptRuleCall` gives access to all matching RelNodes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110361425 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + } + found.isDefined + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val innerProject = WindowExpressionRule.getInput [LogicalProject] (project.getInput) + val agg = WindowExpressionRule.getInput [LogicalWindowAggregate] (innerProject.getInput) + + // Create an additional project to conform with types + val transformed = call.builder() + val rexBuilder = transformed.getRexBuilder + transformed.push(LogicalWindowAggregate.create( + agg.getWindow, + Seq( + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get)) + ), agg) + ) + val aggSize = transformed.fields().size() + + transformed + .project(innerProject.getProjects ++ Seq( + rexBuilder.makeInputRef(transformed.peek(), aggSize - 2), + rexBuilder.makeInputRef(transformed.peek(), aggSize - 1) + )) + + val n = transformed.fields().size() + val (windowStartExprIdx, windowEndExprIdx) = (n - 2, n - 1) + + val windowStart = rexBuilder.makeInputRef(transformed.peek(), windowStartExprIdx) + val windowEnd = rexBuilder.makeInputRef(transformed.peek(), windowEndExprIdx) + + transformed.project( + project.getProjects.map(x => + WindowExpressionRule.identifyAuxiliaryExpr(x, agg.getWindow) match { + case Some(WindowStart(_)) => rexBuilder.makeCast(x.getType, windowStart, false) + case Some(WindowEnd(_)) => rexBuilder.makeCast(x.getType, windowEnd, false) + case _ => x + } ) + ) + val res = transformed.build() + call.transformTo(res) + } +} + +object WindowExpressionRule { + private val WINDOW_EXPRESSION_RULE_PREDICATE = + RelOptRule.operand(classOf [LogicalProject] , + RelOptRule.operand(classOf [LogicalProject] , + RelOptRule.operand(classOf [LogicalWindowAggregate] , RelOptRule.none()))) + + val INSTANCE = new WindowExpressionRule + + private def getInput [T] (node: RelNode) : T = { — End diff – not needed. A `RelOptRuleCall` gives access to all matching RelNodes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110361333

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find

          { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + }

          + found.isDefined
          + }
          +
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val innerProject = WindowExpressionRule.getInput[LogicalProject](project.getInput)
          + val agg = WindowExpressionRule.getInput[LogicalWindowAggregate](innerProject.getInput)
          — End diff –

          can be `val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110361333 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + } + found.isDefined + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val innerProject = WindowExpressionRule.getInput [LogicalProject] (project.getInput) + val agg = WindowExpressionRule.getInput [LogicalWindowAggregate] (innerProject.getInput) — End diff – can be `val agg = call.rel(2).asInstanceOf [LogicalWindowAggregate] `
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110362631

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find

          { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + }

          + found.isDefined
          + }
          +
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val innerProject = WindowExpressionRule.getInput[LogicalProject](project.getInput)
          + val agg = WindowExpressionRule.getInput[LogicalWindowAggregate](innerProject.getInput)
          +
          + // Create an additional project to conform with types
          + val transformed = call.builder()
          + val rexBuilder = transformed.getRexBuilder
          + transformed.push(LogicalWindowAggregate.create(
          + agg.getWindow,
          + Seq(
          + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
          + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
          + ), agg)
          + )
          + val aggSize = transformed.fields().size()
          +
          + transformed
          + .project(innerProject.getProjects ++ Seq(
          + rexBuilder.makeInputRef(transformed.peek(), aggSize - 2),
          + rexBuilder.makeInputRef(transformed.peek(), aggSize - 1)
          + ))
          +
          + val n = transformed.fields().size()
          + val (windowStartExprIdx, windowEndExprIdx) = (n - 2, n - 1)
          +
          + val windowStart = rexBuilder.makeInputRef(transformed.peek(), windowStartExprIdx)
          — End diff –

          can be `val windowStart = transformed.field("w$start")`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110362631 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { + case c: RexCall => c.getOperator.isGroupAuxiliary + case _ => false + } + found.isDefined + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val innerProject = WindowExpressionRule.getInput [LogicalProject] (project.getInput) + val agg = WindowExpressionRule.getInput [LogicalWindowAggregate] (innerProject.getInput) + + // Create an additional project to conform with types + val transformed = call.builder() + val rexBuilder = transformed.getRexBuilder + transformed.push(LogicalWindowAggregate.create( + agg.getWindow, + Seq( + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get)) + ), agg) + ) + val aggSize = transformed.fields().size() + + transformed + .project(innerProject.getProjects ++ Seq( + rexBuilder.makeInputRef(transformed.peek(), aggSize - 2), + rexBuilder.makeInputRef(transformed.peek(), aggSize - 1) + )) + + val n = transformed.fields().size() + val (windowStartExprIdx, windowEndExprIdx) = (n - 2, n - 1) + + val windowStart = rexBuilder.makeInputRef(transformed.peek(), windowStartExprIdx) — End diff – can be `val windowStart = transformed.field("w$start")`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3693#discussion_r110360562

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala —
          @@ -0,0 +1,117 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.common
          +
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.plan.

          {RelOptRule, RelOptRuleCall}

          +import org.apache.calcite.rel.RelNode
          +import org.apache.calcite.rel.logical.LogicalProject
          +import org.apache.calcite.rex.

          {RexCall, RexNode}

          +import org.apache.calcite.sql.fun.SqlStdOperatorTable
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions.

          {WindowEnd, WindowStart}

          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class WindowExpressionRule
          + extends RelOptRule(
          + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE,
          + "WindowExpressionRule") {
          + override def matches(call: RelOptRuleCall): Boolean = {
          + val project = call.rel(0).asInstanceOf[LogicalProject]
          + val found = project.getProjects.find {
          — End diff –

          use `exists()` instead of `find()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3693#discussion_r110360562 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowExpressionRule.scala — @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan. {RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex. {RexCall, RexNode} +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions. {WindowEnd, WindowStart} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class WindowExpressionRule + extends RelOptRule( + WindowExpressionRule.WINDOW_EXPRESSION_RULE_PREDICATE, + "WindowExpressionRule") { + override def matches(call: RelOptRuleCall): Boolean = { + val project = call.rel(0).asInstanceOf [LogicalProject] + val found = project.getProjects.find { — End diff – use `exists()` instead of `find()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3693

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3693
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with fa7907ab0e90d182b6386802c97f9b4e001dc440

          Show
          fhueske Fabian Hueske added a comment - Implemented with fa7907ab0e90d182b6386802c97f9b4e001dc440

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development