Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5624

Support tumbling window on streaming tables in the SQL API

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      This is a follow up of FLINK-4691.

      FLINK-4691 adds supports for group-windows for streaming tables. This jira proposes to expose the functionality in the SQL layer via the GROUP BY clauses, as described in http://calcite.apache.org/docs/stream.html#tumbling-windows.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3252

          FLINK-5624 Support tumbling window on streaming tables in the SQL API.

          This is a POC to add tumbling window support for streaming tables in SQL.

          Essentially it recognizes the `LogicalWindow` construct in Calcite and transform it to the `LogicalWindowAggregate` in flink.

          Feedbacks are highly appreciated.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-5624

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3252.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3252


          commit a8d4b5042e8bcd1b149f8915118c116e419690e0
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-02-01T22:03:44Z

          FLINK-5624 Support tumbling window on streaming tables in the SQL API.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3252 FLINK-5624 Support tumbling window on streaming tables in the SQL API. This is a POC to add tumbling window support for streaming tables in SQL. Essentially it recognizes the `LogicalWindow` construct in Calcite and transform it to the `LogicalWindowAggregate` in flink. Feedbacks are highly appreciated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5624 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3252 commit a8d4b5042e8bcd1b149f8915118c116e419690e0 Author: Haohui Mai <wheat9@apache.org> Date: 2017-02-01T22:03:44Z FLINK-5624 Support tumbling window on streaming tables in the SQL API.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3252

          Hi @haohui, thanks for your contribution!

          The referenced JIRA is about adding support for group windows to SQL, not OVER (or row) windows. It should enable queries such as:

          ```
          SELECT a, sum(b) as sumB, TUMBLE_END(rowtime(), INTERVAL '1' HOUR) AS t,
          FROM myT
          GROUP BY TUMBLE(rowtime(), INTERVAL '1' HOUR), a;
          ```

          I saw that you contributed `TUMBLE` just very recently to Calcite, so this feature is not yet available in a Calcite release that we could link against. Until then, we could add support for the more manual version of SQL tumbling windows:

          ```
          SELECT a, SUM(b) AS sumB, CEIL(rowtime() TO HOUR) AS t,
          FROM myT
          GROUP BY CEIL(rowtime() TO HOUR), a
          ```

          We would also need to find a way to reference the `rowtime`. We do not want to expose this as an actual attribute in Flink's SQL (internally, Flink treats record timestamps as metadata which may not be modified by a query). The current approach would be to implement a built-in function which serves as a marker and is replaced during the translation.

          Best, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Hi @haohui, thanks for your contribution! The referenced JIRA is about adding support for group windows to SQL, not OVER (or row) windows. It should enable queries such as: ``` SELECT a, sum(b) as sumB, TUMBLE_END(rowtime(), INTERVAL '1' HOUR) AS t, FROM myT GROUP BY TUMBLE(rowtime(), INTERVAL '1' HOUR), a; ``` I saw that you contributed `TUMBLE` just very recently to Calcite, so this feature is not yet available in a Calcite release that we could link against. Until then, we could add support for the more manual version of SQL tumbling windows: ``` SELECT a, SUM(b) AS sumB, CEIL(rowtime() TO HOUR) AS t, FROM myT GROUP BY CEIL(rowtime() TO HOUR), a ``` We would also need to find a way to reference the `rowtime`. We do not want to expose this as an actual attribute in Flink's SQL (internally, Flink treats record timestamps as metadata which may not be modified by a query). The current approach would be to implement a built-in function which serves as a marker and is replaced during the translation. Best, Fabian
          Hide
          wheat9 Haohui Mai added a comment -

          Support group by via rowtime makes sense. From a semantic point of view, do you see any differences between the approach of specifying the window using the OVER clause versus doing it through GROUP BY? Do you plan to support both?

          We would also need to find a way to reference the `rowtime`.

          +1. I think implementing a built-in function will make a lot of sense.

          Show
          wheat9 Haohui Mai added a comment - Support group by via rowtime makes sense. From a semantic point of view, do you see any differences between the approach of specifying the window using the OVER clause versus doing it through GROUP BY ? Do you plan to support both? We would also need to find a way to reference the `rowtime`. +1. I think implementing a built-in function will make a lot of sense.
          Hide
          fhueske Fabian Hueske added a comment -

          Hi Haohui Mai, windowing is a way to define groups of records. It is used to define GROUP BY and OVER aggregates. Both types of aggregates are semantically different.

          • GROUP BY aggregation: Groups multiple records and emits one row for the whole group. The emitted row may only have fields which are identical in all records of the group (grouping columns) or aggregated over all records of the group.
          • OVER aggregation: Emits one row for each input row, but the aggregate is computed over multiple records which are defined by the OVER clause.

          So, these are actually two different operators.

          Regarding the built-in function. There is FLINK-5710 to define a procTime() function which shall serve a similar purpose.
          For now, adding a rowTime() function is fine, IMO. However, we need to redesign this to support joins based on time which requires us to associate the function with a table.

          Show
          fhueske Fabian Hueske added a comment - Hi Haohui Mai , windowing is a way to define groups of records. It is used to define GROUP BY and OVER aggregates. Both types of aggregates are semantically different. GROUP BY aggregation: Groups multiple records and emits one row for the whole group. The emitted row may only have fields which are identical in all records of the group (grouping columns) or aggregated over all records of the group. OVER aggregation: Emits one row for each input row, but the aggregate is computed over multiple records which are defined by the OVER clause. So, these are actually two different operators. Regarding the built-in function. There is FLINK-5710 to define a procTime() function which shall serve a similar purpose. For now, adding a rowTime() function is fine, IMO. However, we need to redesign this to support joins based on time which requires us to associate the function with a table.
          Hide
          wheat9 Haohui Mai added a comment -

          Thanks for the explanation.

          I took a closer look at the built-in function route. Recognizing the function is relatively straightforward. The problem is that it is difficult for the rowtime() function to refer to the corresponding DataStream at the code generation phase. Probably it might be easier to add some system / virtual columns instead.

          Show
          wheat9 Haohui Mai added a comment - Thanks for the explanation. I took a closer look at the built-in function route. Recognizing the function is relatively straightforward. The problem is that it is difficult for the rowtime() function to refer to the corresponding DataStream at the code generation phase. Probably it might be easier to add some system / virtual columns instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3252

          Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause.
          Hide
          fhueske Fabian Hueske added a comment -

          Copying a comment from FLINK-5710.

          The idea is to kind of apply pattern matching for certain expressions such as GROUP BY FLOOR(procTime() TO HOURS).
          The challenge is that this expression is spread across several LogicalRel nodes.
          The expression FLOOR(procTime() TO HOURS) will be moved into a LogicalProject (or LogicalCalc) to create a new attribute. A following LogicalAggregate will then use that attribute as a grouping column.

          Once we detect such a pattern, we have to rewrite the plan and replace the LogicalAggregate and parts of the LogicalProject by a LogicalWindowAggregate. The LogicalWindowAggregate includes a window definition. Depending on which marker function is used (rowtime or proctime) the window definition is either for a processing or an event time window. After the translation, the function is no longer available.

          With this approach, we can only translate very specific queries. However, I don't think we can easily provide a generic translation for SQL window queries.
          With Calcite's special window functions TUMBLE, HOP, and SESSION (see Calcite-1345 and http://calcite.apache.org/docs/stream.html) the detection of windowing patterns will be much easier.

          Show
          fhueske Fabian Hueske added a comment - Copying a comment from FLINK-5710 . The idea is to kind of apply pattern matching for certain expressions such as GROUP BY FLOOR(procTime() TO HOURS) . The challenge is that this expression is spread across several LogicalRel nodes. The expression FLOOR(procTime() TO HOURS) will be moved into a LogicalProject (or LogicalCalc ) to create a new attribute. A following LogicalAggregate will then use that attribute as a grouping column. Once we detect such a pattern, we have to rewrite the plan and replace the LogicalAggregate and parts of the LogicalProject by a LogicalWindowAggregate . The LogicalWindowAggregate includes a window definition. Depending on which marker function is used ( rowtime or proctime ) the window definition is either for a processing or an event time window. After the translation, the function is no longer available. With this approach, we can only translate very specific queries. However, I don't think we can easily provide a generic translation for SQL window queries. With Calcite's special window functions TUMBLE, HOP, and SESSION (see Calcite-1345 and http://calcite.apache.org/docs/stream.html ) the detection of windowing patterns will be much easier.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3252

          Adding `ROWTIME()` as an expression to enable users to specify event time windows.

          After trying multiple approaches at the end I settled down with translating `LogicalAggregate` directly to `DataStreamAggregate`. The translation removes the group-by expression from the aggregate and adds the same expression as a window.

          Note that `ROWTIME()` is actually translated to a call to the local timestamp. The expression has to be executable because Calcite creates a new project operator to compute the group-by expression, where the expression has to be executed. For example, the following query

          ```
          SELECT COUNT FROM table GROUP BY FLOOR(ROWTIME() TO HOUR)
          ```

          will be translated to:

          ```
          LogicalAggregate(group={$0}, agg=

          {COUNT(*)}

          )
          LogicalProject($0=FLOOR(ROWTIME() TO HOUR))
          ...
          ```

          It's tempting to remove the group-by expression from the logical plan. However, it cannot be done using the optimization frameworks in Calcite. These frameworks expect the output types of the operators stay the same before and after the transformations. Removing the field actually changes the types thus Calcite will complain.

          The down side of this approach is that it might be difficult for Flink to catch malformed queries such as `SELECT COUNT FROM table GROUP BY FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the situation?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Adding `ROWTIME()` as an expression to enable users to specify event time windows. After trying multiple approaches at the end I settled down with translating `LogicalAggregate` directly to `DataStreamAggregate`. The translation removes the group-by expression from the aggregate and adds the same expression as a window. Note that `ROWTIME()` is actually translated to a call to the local timestamp. The expression has to be executable because Calcite creates a new project operator to compute the group-by expression, where the expression has to be executed. For example, the following query ``` SELECT COUNT FROM table GROUP BY FLOOR(ROWTIME() TO HOUR) ``` will be translated to: ``` LogicalAggregate(group={$0}, agg= {COUNT(*)} ) LogicalProject($0=FLOOR(ROWTIME() TO HOUR)) ... ``` It's tempting to remove the group-by expression from the logical plan. However, it cannot be done using the optimization frameworks in Calcite. These frameworks expect the output types of the operators stay the same before and after the transformations. Removing the field actually changes the types thus Calcite will complain. The down side of this approach is that it might be difficult for Flink to catch malformed queries such as `SELECT COUNT FROM table GROUP BY FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the situation?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100844487

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala —
          @@ -0,0 +1,42 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.expressions
          +
          +import org.apache.calcite.sql.`type`.

          {OperandTypes, ReturnTypes, SqlTypeName}

          +import org.apache.calcite.sql.validate.SqlMonotonicity
          +import org.apache.calcite.sql._
          +import org.apache.calcite.tools.RelBuilder
          +import org.apache.flink.api.common.typeinfo.

          {SqlTimeTypeInfo, TypeInformation}

          +import org.apache.flink.table.api.TableException
          +
          +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
          — End diff –

          Can you move this to a `TimeModeIndicatorFunctions` class as suggested on `FlinkStreamingFunctionCatalog` of PR #3271.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100844487 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala — @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.expressions + +import org.apache.calcite.sql.`type`. {OperandTypes, ReturnTypes, SqlTypeName} +import org.apache.calcite.sql.validate.SqlMonotonicity +import org.apache.calcite.sql._ +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo. {SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.api.TableException + +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, — End diff – Can you move this to a `TimeModeIndicatorFunctions` class as suggested on `FlinkStreamingFunctionCatalog` of PR #3271.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100845516

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala —
          @@ -0,0 +1,42 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.expressions
          +
          +import org.apache.calcite.sql.`type`.

          {OperandTypes, ReturnTypes, SqlTypeName}

          +import org.apache.calcite.sql.validate.SqlMonotonicity
          +import org.apache.calcite.sql._
          +import org.apache.calcite.tools.RelBuilder
          +import org.apache.flink.api.common.typeinfo.

          {SqlTimeTypeInfo, TypeInformation}

          +import org.apache.flink.table.api.TableException
          +
          +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
          + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
          + SqlFunctionCategory.SYSTEM)

          { + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING +}

          +
          +case class RowTime() extends LeafExpression {
          — End diff –

          Use `CurrentTimestamp` like PR #3271

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100845516 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala — @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.expressions + +import org.apache.calcite.sql.`type`. {OperandTypes, ReturnTypes, SqlTypeName} +import org.apache.calcite.sql.validate.SqlMonotonicity +import org.apache.calcite.sql._ +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo. {SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.api.TableException + +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC, + SqlFunctionCategory.SYSTEM) { + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING +} + +case class RowTime() extends LeafExpression { — End diff – Use `CurrentTimestamp` like PR #3271
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100837291

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala —
          @@ -290,6 +291,15 @@ object FunctionGenerator {
          Seq(),
          new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))

          + // Make ROWTIME() return the local timestamp
          + // The function has to be executable as in windowed queries it is used
          + // in the GroupBy expression. The results of the function, however, does
          + // not matter.
          + addSqlFunction(
          + EventTimeExtractor,
          + Seq(),
          + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
          — End diff –

          This function should not be called. So, I would suggest to create a `CallGenerator` that throws an exception, if possible when the code is generated, alternatively in the generated code. PR #3271 will need the same call generator.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100837291 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala — @@ -290,6 +291,15 @@ object FunctionGenerator { Seq(), new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) + // Make ROWTIME() return the local timestamp + // The function has to be executable as in windowed queries it is used + // in the GroupBy expression. The results of the function, however, does + // not matter. + addSqlFunction( + EventTimeExtractor, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) — End diff – This function should not be called. So, I would suggest to create a `CallGenerator` that throws an exception, if possible when the code is generated, alternatively in the generated code. PR #3271 will need the same call generator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100917108

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala —
          @@ -290,6 +291,15 @@ object FunctionGenerator {
          Seq(),
          new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))

          + // Make ROWTIME() return the local timestamp
          + // The function has to be executable as in windowed queries it is used
          + // in the GroupBy expression. The results of the function, however, does
          + // not matter.
          + addSqlFunction(
          + EventTimeExtractor,
          + Seq(),
          + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
          — End diff –

          It turns out that the function is used all the way in runtime – the translated plan looks like the following:

          ```
          LogicalAggregate(group=

          {0}

          , ...)
          LogicalProject($0=FLOOR(ROWTIME() TO HOURS)))
          ```

          The expression is used in the projection. Unfortunately there is no trivial way to exclude it in Calcite as mentioned in the last comments.

          The results of expression is not used in the query though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100917108 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala — @@ -290,6 +291,15 @@ object FunctionGenerator { Seq(), new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) + // Make ROWTIME() return the local timestamp + // The function has to be executable as in windowed queries it is used + // in the GroupBy expression. The results of the function, however, does + // not matter. + addSqlFunction( + EventTimeExtractor, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) — End diff – It turns out that the function is used all the way in runtime – the translated plan looks like the following: ``` LogicalAggregate(group= {0} , ...) LogicalProject($0=FLOOR(ROWTIME() TO HOURS))) ``` The expression is used in the projection. Unfortunately there is no trivial way to exclude it in Calcite as mentioned in the last comments. The results of expression is not used in the query though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100918614

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala —
          @@ -290,6 +291,15 @@ object FunctionGenerator {
          Seq(),
          new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))

          + // Make ROWTIME() return the local timestamp
          + // The function has to be executable as in windowed queries it is used
          + // in the GroupBy expression. The results of the function, however, does
          + // not matter.
          + addSqlFunction(
          + EventTimeExtractor,
          + Seq(),
          + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
          — End diff –

          Ah, yes. You are right. It is still called in the DataStreamCalc and cannot be easily removed as you noted.

          Alright, then I'd suggest to just emit a casted `null`. This is not very nice, as it might also be called at any other place but since we will remove the marker function soon, it should not be a big issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100918614 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala — @@ -290,6 +291,15 @@ object FunctionGenerator { Seq(), new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) + // Make ROWTIME() return the local timestamp + // The function has to be executable as in windowed queries it is used + // in the GroupBy expression. The results of the function, however, does + // not matter. + addSqlFunction( + EventTimeExtractor, + Seq(), + new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true)) — End diff – Ah, yes. You are right. It is still called in the DataStreamCalc and cannot be easily removed as you noted. Alright, then I'd suggest to just emit a casted `null`. This is not very nice, as it might also be called at any other place but since we will remove the marker function soon, it should not be a big issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3252

          The v3 PR gets the best from both of the worlds – the code generator will throw exceptions if the queries actually execute the `rowtime()`.

          Essentially it rewrites the project and the aggregate operators before passing the operators into the Volcano planner. @fhueske please take another look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 The v3 PR gets the best from both of the worlds – the code generator will throw exceptions if the queries actually execute the `rowtime()`. Essentially it rewrites the project and the aggregate operators before passing the operators into the Volcano planner. @fhueske please take another look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100991050

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -225,12 +231,29 @@ abstract class StreamTableEnvironment(
          // decorrelate
          val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

          + val prePlanner = createHepPlanner
          — End diff –

          I'll merge PR #3101 later today which adds a normalization phase before optimization by adding a HepPlanner.
          Could you integrate you changes with #3101?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100991050 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -225,12 +231,29 @@ abstract class StreamTableEnvironment( // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) + val prePlanner = createHepPlanner — End diff – I'll merge PR #3101 later today which adds a normalization phase before optimization by adding a HepPlanner. Could you integrate you changes with #3101?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100990608

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala —
          @@ -33,10 +34,18 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
          SqlMonotonicity.INCREASING
          }

          -case class RowTime() extends LeafExpression {
          +case class TimeIndicator() extends LeafExpression {
          — End diff –

          Should be `RowTime` (`ProcTime` will be added later).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100990608 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala — @@ -33,10 +34,18 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION, SqlMonotonicity.INCREASING } -case class RowTime() extends LeafExpression { +case class TimeIndicator() extends LeafExpression { — End diff – Should be `RowTime` (`ProcTime` will be added later).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r100995825

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala —
          @@ -159,9 +152,25 @@ class LogicalWindowAggregateRule
          }
          false
          }
          +
          + private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, groupExprIdx: Int) = {
          + val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
          + val newProjectExpr = mutable.ArrayBuffer[RexNode]()
          + newProjectExpr.appendAll(project.getChildExps)
          + val rexBuilder = agg.getCluster.getRexBuilder
          + newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral(
          + DataStreamAggregateRule.TIMESTAMP_ZERO, 3)
          + val newProject = project.copy(project.getTraitSet, project.getInput,
          + newProjectExpr, project.getRowType)
          +
          + agg.copy(agg.getTraitSet, List(newProject)).asInstanceOf[LogicalAggregate]
          — End diff –

          Can we create here a `LogicalAggregate` with adapted `groupSet`?
          I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is not a very nice solution. It would be better if we could keep the existing code as it is without introducing workarounds for the SQL case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r100995825 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala — @@ -159,9 +152,25 @@ class LogicalWindowAggregateRule } false } + + private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, groupExprIdx: Int) = { + val project = agg.getInput.asInstanceOf [HepRelVertex] .getCurrentRel.asInstanceOf [LogicalProject] + val newProjectExpr = mutable.ArrayBuffer [RexNode] () + newProjectExpr.appendAll(project.getChildExps) + val rexBuilder = agg.getCluster.getRexBuilder + newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral( + DataStreamAggregateRule.TIMESTAMP_ZERO, 3) + val newProject = project.copy(project.getTraitSet, project.getInput, + newProjectExpr, project.getRowType) + + agg.copy(agg.getTraitSet, List(newProject)).asInstanceOf [LogicalAggregate] — End diff – Can we create here a `LogicalAggregate` with adapted `groupSet`? I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is not a very nice solution. It would be better if we could keep the existing code as it is without introducing workarounds for the SQL case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3252

          Discussed with @fhueske offline. Thanks a lot for the comments. The V4 PR implements the following:

          • Rebased on top of #3101
          • `LogicalWindowAggregateRule` implements `RelOptRule` instead of the `ConvertRule`.
          • Instead of adding a new field in `LogicalWindowAggregate`, the implementation now transforms the original `LogicalAggregate(LogicalProject())` expression to `LogicalProject(LogicalWindowAggregate(LogicalProject(...)))`. The outermost projection ensures that the operators have the same row types before and after the transformation.

          Please take another look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Discussed with @fhueske offline. Thanks a lot for the comments. The V4 PR implements the following: Rebased on top of #3101 `LogicalWindowAggregateRule` implements `RelOptRule` instead of the `ConvertRule`. Instead of adding a new field in `LogicalWindowAggregate`, the implementation now transforms the original `LogicalAggregate(LogicalProject())` expression to `LogicalProject(LogicalWindowAggregate(LogicalProject(...)))`. The outermost projection ensures that the operators have the same row types before and after the transformation. Please take another look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r101248097

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala —
          @@ -0,0 +1,139 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.plan.rules.datastream
          +
          +import java.util.Calendar
          +
          +import com.google.common.collect.ImmutableList
          +import org.apache.calcite.avatica.util.TimeUnitRange
          +import org.apache.calcite.plan._
          +import org.apache.calcite.plan.hep.HepRelVertex
          +import org.apache.calcite.rel.logical.

          {LogicalAggregate, LogicalProject}

          +import org.apache.calcite.rex.

          {RexCall, RexLiteral, RexNode}

          +import org.apache.calcite.sql.fun.SqlFloorFunction
          +import org.apache.calcite.util.ImmutableBitSet
          +import org.apache.flink.table.api.scala.Tumble
          +import org.apache.flink.table.api.

          {TableException, TumblingWindow, Window}

          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.expressions._
          +import org.apache.flink.table.functions.EventTimeExtractor
          +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
          +
          +import scala.collection.JavaConversions._
          +
          +class LogicalWindowAggregateRule
          + extends RelOptRule(
          + LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
          + "LogicalWindowAggregateRule") {
          +
          + override def matches(call: RelOptRuleCall): Boolean =

          { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet + + val windowClause = recognizeWindow(agg) + !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined + }

          +
          + /**
          + * Transform LogicalAggregate with windowing expression to LogicalProject
          + * + LogicalWindowAggregate + LogicalProject.
          + *
          + * The transformation adds an additional LogicalProject at the top to ensure
          + * that the types are equivalent.
          + */
          + override def onMatch(call: RelOptRuleCall): Unit = {
          + val agg = call.rel[LogicalAggregate](0)
          + val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
          + val (windowExprIdx, window) = recognizeWindow(agg).get
          + val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
          +
          + val builder = call.builder()
          + val rexBuilder = builder.getRexBuilder
          + val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
          +
          + val newAgg = builder
          + .push(project.getInput)
          + .project(project.getChildExps.updated(windowExprIdx, zero))
          + .aggregate(builder.groupKey(
          + newGroupSet,
          + agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
          + .build().asInstanceOf[LogicalAggregate]
          +
          + // Create an additional project to conform with types
          + val transformed = call.builder()
          + transformed.push(LogicalWindowAggregate.create(
          + window.toLogicalWindow,
          + Seq[NamedWindowProperty](),
          + newAgg))
          + .project(List(zero) ++ transformed.fields())
          — End diff –

          The `zero` element must be injected at the position of the window attribute in the grouping set.
          If you change the order of grouping attributes in the SQL query in the `testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning fails.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r101248097 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala — @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import java.util.Calendar + +import com.google.common.collect.ImmutableList +import org.apache.calcite.avatica.util.TimeUnitRange +import org.apache.calcite.plan._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.logical. {LogicalAggregate, LogicalProject} +import org.apache.calcite.rex. {RexCall, RexLiteral, RexNode} +import org.apache.calcite.sql.fun.SqlFloorFunction +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.api.scala.Tumble +import org.apache.flink.table.api. {TableException, TumblingWindow, Window} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.EventTimeExtractor +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class LogicalWindowAggregateRule + extends RelOptRule( + LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE, + "LogicalWindowAggregateRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet + + val windowClause = recognizeWindow(agg) + !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined + } + + /** + * Transform LogicalAggregate with windowing expression to LogicalProject + * + LogicalWindowAggregate + LogicalProject. + * + * The transformation adds an additional LogicalProject at the top to ensure + * that the types are equivalent. + */ + override def onMatch(call: RelOptRuleCall): Unit = { + val agg = call.rel [LogicalAggregate] (0) + val project = agg.getInput.asInstanceOf [HepRelVertex] .getCurrentRel.asInstanceOf [LogicalProject] + val (windowExprIdx, window) = recognizeWindow(agg).get + val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx)) + + val builder = call.builder() + val rexBuilder = builder.getRexBuilder + val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3) + + val newAgg = builder + .push(project.getInput) + .project(project.getChildExps.updated(windowExprIdx, zero)) + .aggregate(builder.groupKey( + newGroupSet, + agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList) + .build().asInstanceOf [LogicalAggregate] + + // Create an additional project to conform with types + val transformed = call.builder() + transformed.push(LogicalWindowAggregate.create( + window.toLogicalWindow, + Seq [NamedWindowProperty] (), + newAgg)) + .project(List(zero) ++ transformed.fields()) — End diff – The `zero` element must be injected at the position of the window attribute in the grouping set. If you change the order of grouping attributes in the SQL query in the `testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning fails.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3252#discussion_r101239149

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/WindowAggregateITCase.java —
          @@ -0,0 +1,135 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.api.java.stream.sql;
          +
          +import org.apache.flink.api.java.tuple.Tuple4;
          +import com.google.common.collect.ImmutableList;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
          +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
          +import org.apache.flink.table.api.Table;
          +import org.apache.flink.table.api.TableEnvironment;
          +import org.apache.flink.table.api.TableException;
          +import org.apache.flink.table.api.java.StreamTableEnvironment;
          +import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
          +import org.apache.flink.types.Row;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +public class WindowAggregateITCase extends StreamingMultipleProgramsTestBase {
          — End diff –

          I don't think we need integration tests for this feature. We try to keep those to a minimum to not blow up the build time. Since window aggregates are already tested for the Table API, it should be sufficient to test the resulting execution plan.

          We have the `TableTestBase` which can be extended to validate the result of the optimization. This should be enough to test this feature. Since these tests are quite cheap, we can also test more queries also with differently ordered grouping expressions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r101239149 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/WindowAggregateITCase.java — @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.api.java.tuple.Tuple4; +import com.google.common.collect.ImmutableList; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +public class WindowAggregateITCase extends StreamingMultipleProgramsTestBase { — End diff – I don't think we need integration tests for this feature. We try to keep those to a minimum to not blow up the build time. Since window aggregates are already tested for the Table API, it should be sufficient to test the resulting execution plan. We have the `TableTestBase` which can be extended to validate the result of the optimization. This should be enough to test this feature. Since these tests are quite cheap, we can also test more queries also with differently ordered grouping expressions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3252

          Thanks for the update @haohui.
          PR is good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Thanks for the update @haohui. PR is good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3252

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3252

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3252
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.3.0 with 8304f3e159851d29691e66cacfcb4278d73a8b97

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.3.0 with 8304f3e159851d29691e66cacfcb4278d73a8b97
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tony810430 opened a pull request:

          https://github.com/apache/flink/pull/3650

          FLINK-5624 [kinesis] Let Date format for timestamp-based start position in Kinesis consumer be configurable

          The patch let user define their own pattern for parsing date string.
          If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is specified then `FlinkKinesisConsumer` will only use this format to parse date string.

          • [v] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [v] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [v] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tony810430/flink FLINK-5625

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3650.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3650


          commit 88c874e966d55092649c47ed54d4c52387eb1db9
          Author: Tony Wei <tony19920430@gmail.com>
          Date: 2017-03-30T01:48:43Z

          FLINK-5624 Let Date format for timestamp-based start position in Kinesis consumer be configurable


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/3650 FLINK-5624 [kinesis] Let Date format for timestamp-based start position in Kinesis consumer be configurable The patch let user define their own pattern for parsing date string. If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is specified then `FlinkKinesisConsumer` will only use this format to parse date string. [v] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [v] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [v] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-5625 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3650 commit 88c874e966d55092649c47ed54d4c52387eb1db9 Author: Tony Wei <tony19920430@gmail.com> Date: 2017-03-30T01:48:43Z FLINK-5624 Let Date format for timestamp-based start position in Kinesis consumer be configurable
          Hide
          tonywei Wei-Che Wei added a comment -

          Sorry for putting the wrong JIRA task id. > <

          Show
          tonywei Wei-Che Wei added a comment - Sorry for putting the wrong JIRA task id. > <

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development