Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4557 Table API Stream Aggregations
  3. FLINK-6200

Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The goal of this issue is to add support for OVER RANGE aggregations on event time streams to the SQL interface.
      Queries similar to the following should be supported:
      SELECT
      a,
      SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB,
      MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS minB
      FROM myStream
      The following restrictions should initially apply:
      All OVER clauses in the same SELECT clause must be exactly the same.
      The PARTITION BY clause is optional (no partitioning results in single threaded execution).
      The ORDER BY clause may only have rowTime() as parameter. rowTime() is a parameterless scalar function that just indicates processing time mode.
      bounded PRECEDING is not supported (see FLINK-5655)
      FOLLOWING is not supported.
      The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well.
      This issue includes:
      Design of the DataStream operator to compute OVER ROW aggregates
      Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).

        Issue Links

          Activity

          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi hongyuhong do you want take this JIRA.? If you have no time to do this, I'll solve this ISSUE tomorrow? hope your feedback.
          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Hi hongyuhong do you want take this JIRA.? If you have no time to do this, I'll solve this ISSUE tomorrow? hope your feedback. Best, SunJincheng
          Hide
          Yuhong_kyo hongyuhong added a comment -

          Hi sunjincheng i have implemented it based on FLINK-5658.

          Show
          Yuhong_kyo hongyuhong added a comment - Hi sunjincheng i have implemented it based on FLINK-5658 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user hongyuhong opened a pull request:

          https://github.com/apache/flink/pull/3649

          FLINK-6200[SQL] support unbounded event time range window

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/hongyuhong/flink flink-6200

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3649.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3649


          commit 7815fd7f4cb41b3ce64e0e679579302073307d69
          Author: hongyuhong 00223286 <hongyuhong@huawei.com>
          Date: 2017-03-29T02:29:17Z

          FLINK-6200[SQL] support unbounded event time range window


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user hongyuhong opened a pull request: https://github.com/apache/flink/pull/3649 FLINK-6200 [SQL] support unbounded event time range window Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hongyuhong/flink flink-6200 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3649.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3649 commit 7815fd7f4cb41b3ce64e0e679579302073307d69 Author: hongyuhong 00223286 <hongyuhong@huawei.com> Date: 2017-03-29T02:29:17Z FLINK-6200 [SQL] support unbounded event time range window
          Hide
          sunjincheng121 sunjincheng added a comment -

          hongyuhong cool, I had assign this JIRA. to you.

          Show
          sunjincheng121 sunjincheng added a comment - hongyuhong cool, I had assign this JIRA. to you.
          Hide
          Yuhong_kyo hongyuhong added a comment -

          Hi sunjincheng thanks very much.I'd appreciate if you and Fabian Hueske can have a look.

          Show
          Yuhong_kyo hongyuhong added a comment - Hi sunjincheng thanks very much.I'd appreciate if you and Fabian Hueske can have a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3649#discussion_r108983942

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala —
          @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(

          • If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
          • an append with O(1).
            */
          • private def insertToSortedList(recordTimeStamp: Long) = {
            + private def insertToSortedList(recordTimestamp: Long) = {
            val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
            var continue = true
            while (listIterator.hasPrevious && continue) {
            val timestamp = listIterator.previous
          • if (recordTimeStamp >= timestamp)
            Unknown macro: { + if (recordTimestamp >= timestamp) { listIterator.next - listIterator.add(recordTimeStamp) + listIterator.add(recordTimestamp) continue = false } }

          if (continue)

          { - sortedTimestamps.addFirst(recordTimeStamp) + sortedTimestamps.addFirst(recordTimestamp) }

          }

          + /**
          + * Process the same timestamp datas, the mechanism is different between
          + * rows and range window.
          + */
          + def processElementsWithSameTimestamp(
          + curRowList: JList[Row],
          + lastAccumulator: Row,
          + out: Collector[Row]): Unit
          +
          +}
          +
          +/**
          + * A ProcessFunction to support unbounded ROWS window.
          + * With the ROWS option you define on a physical level how many rows are included in your window frame
          — End diff –

          This line violates the 100 character limit of the Scala code style.
          Please run a local build before opening a PR to capture such problems (`mvn clean install` inside of the `./flink-libraries/flink-table` folder is usually sufficient and takes ~5 mins).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3649#discussion_r108983942 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala — @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction( If timestamps arrive in order (as in case of using the RocksDB state backend) this is just an append with O(1). */ private def insertToSortedList(recordTimeStamp: Long) = { + private def insertToSortedList(recordTimestamp: Long) = { val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size) var continue = true while (listIterator.hasPrevious && continue) { val timestamp = listIterator.previous if (recordTimeStamp >= timestamp) Unknown macro: { + if (recordTimestamp >= timestamp) { listIterator.next - listIterator.add(recordTimeStamp) + listIterator.add(recordTimestamp) continue = false } } if (continue) { - sortedTimestamps.addFirst(recordTimeStamp) + sortedTimestamps.addFirst(recordTimestamp) } } + /** + * Process the same timestamp datas, the mechanism is different between + * rows and range window. + */ + def processElementsWithSameTimestamp( + curRowList: JList [Row] , + lastAccumulator: Row, + out: Collector [Row] ): Unit + +} + +/** + * A ProcessFunction to support unbounded ROWS window. + * With the ROWS option you define on a physical level how many rows are included in your window frame — End diff – This line violates the 100 character limit of the Scala code style. Please run a local build before opening a PR to capture such problems (`mvn clean install` inside of the `./flink-libraries/flink-table` folder is usually sufficient and takes ~5 mins).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3649

          Hi @hongyuhong, the PR looks very good (except for the style error).
          I'll fix that and merge the PR.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3649 Hi @hongyuhong, the PR looks very good (except for the style error). I'll fix that and merge the PR. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3649

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3649
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 44f9c76a9ff50e95947c9f78a86b485f564e3796

          Thanks for the contribution hongyuhong!

          Show
          fhueske Fabian Hueske added a comment - Implemented with 44f9c76a9ff50e95947c9f78a86b485f564e3796 Thanks for the contribution hongyuhong !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hongyuhong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3649#discussion_r109072158

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala —
          @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(

          • If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
          • an append with O(1).
            */
          • private def insertToSortedList(recordTimeStamp: Long) = {
            + private def insertToSortedList(recordTimestamp: Long) = {
            val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
            var continue = true
            while (listIterator.hasPrevious && continue) {
            val timestamp = listIterator.previous
          • if (recordTimeStamp >= timestamp)
            Unknown macro: { + if (recordTimestamp >= timestamp) { listIterator.next - listIterator.add(recordTimeStamp) + listIterator.add(recordTimestamp) continue = false } }

          if (continue)

          { - sortedTimestamps.addFirst(recordTimeStamp) + sortedTimestamps.addFirst(recordTimestamp) }

          }

          + /**
          + * Process the same timestamp datas, the mechanism is different between
          + * rows and range window.
          + */
          + def processElementsWithSameTimestamp(
          + curRowList: JList[Row],
          + lastAccumulator: Row,
          + out: Collector[Row]): Unit
          +
          +}
          +
          +/**
          + * A ProcessFunction to support unbounded ROWS window.
          + * With the ROWS option you define on a physical level how many rows are included in your window frame
          — End diff –

          Thanks for reminding me, i will pay attention next time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3649#discussion_r109072158 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala — @@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction( If timestamps arrive in order (as in case of using the RocksDB state backend) this is just an append with O(1). */ private def insertToSortedList(recordTimeStamp: Long) = { + private def insertToSortedList(recordTimestamp: Long) = { val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size) var continue = true while (listIterator.hasPrevious && continue) { val timestamp = listIterator.previous if (recordTimeStamp >= timestamp) Unknown macro: { + if (recordTimestamp >= timestamp) { listIterator.next - listIterator.add(recordTimeStamp) + listIterator.add(recordTimestamp) continue = false } } if (continue) { - sortedTimestamps.addFirst(recordTimeStamp) + sortedTimestamps.addFirst(recordTimestamp) } } + /** + * Process the same timestamp datas, the mechanism is different between + * rows and range window. + */ + def processElementsWithSameTimestamp( + curRowList: JList [Row] , + lastAccumulator: Row, + out: Collector [Row] ): Unit + +} + +/** + * A ProcessFunction to support unbounded ROWS window. + * With the ROWS option you define on a physical level how many rows are included in your window frame — End diff – Thanks for reminding me, i will pay attention next time.

            People

            • Assignee:
              Yuhong_kyo hongyuhong
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development