Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Similar to the SQL, window clause is defined "as" a symbol which is explicitly used in groupby/over. We are proposing to refactor the way to write groupby+window tableAPI as follows:

      tab //Table('a,'b,'c)
      .window( Slide over 10.milli every 5.milli  as 'w1as 'w1) // WindowedTable
      .groupBy('w1,'a,'b) // WindowGroupedTable
      .select('a,'b,c.count as 'mycount) // Table
      .window(Tumble over 5.milli  on 'b as 'w2)
      .groupBy('w2)
      .select('a.count, 'w2.start, 'w2.end)
      

      In this way, for row-window, we anyway need to define window clause as a symbol. This change will make the API of window and row-window consistent, example for row-window:

        .window(RowXXXWindow as ‘x, RowYYYWindow as ‘y) // WindowedTable
        .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, ‘x.end)
      

      What do you think? Fabian Hueske Timo Walther

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Done for 1.2.0 with be09143cd323d478811447b10807ae7f7d6a4b7b
          Done for 1.3.0 with 6bf556e60148917794f81088fa20c5cc7465823a

          Show
          fhueske Fabian Hueske added a comment - Done for 1.2.0 with be09143cd323d478811447b10807ae7f7d6a4b7b Done for 1.3.0 with 6bf556e60148917794f81088fa20c5cc7465823a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3046

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3046
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3046

          Thanks for the update @sunjincheng121!
          I'll do one more pass over the docs and merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3046 Thanks for the update @sunjincheng121! I'll do one more pass over the docs and merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3046

          Hi, @fhueske thanks a lot for the review. I have updated the PR according to your comments. Let me know if I miss something,thanks again!!

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3046 Hi, @fhueske thanks a lot for the review. I have updated the PR according to your comments. Let me know if I miss something,thanks again!!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96860219

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -930,3 +921,46 @@ class GroupWindowedTable(
          }

          }
          +
          +
          +class WindowedTable(
          + private[flink] val table: Table,
          + private[flink] val window: Window) {
          +
          + /**
          + * Groups the elements on some keys (window alias or group keys). It should be noted that one
          + * window alias MUST be included in the key list. Use this function before a selection with
          + * aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY
          + * statement.
          + *
          + * Example:
          + *
          + * {{{
          + * tab.groupBy('windowAlias, 'key).select('key, 'value.avg)
          — End diff –

          add the `window()` call to the example to show where the `'windowAlias` comes from?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96860219 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -930,3 +921,46 @@ class GroupWindowedTable( } } + + +class WindowedTable( + private [flink] val table: Table, + private [flink] val window: Window) { + + /** + * Groups the elements on some keys (window alias or group keys). It should be noted that one + * window alias MUST be included in the key list. Use this function before a selection with + * aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY + * statement. + * + * Example: + * + * {{{ + * tab.groupBy('windowAlias, 'key).select('key, 'value.avg) — End diff – add the `window()` call to the example to show where the `'windowAlias` comes from?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96860442

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -855,57 +858,45 @@ class GroupedTable(
          val fieldExprs = ExpressionParser.parseExpressionList(fields)
          select(fieldExprs: _*)
          }
          -

          • /**
          • * Groups the records of a table by assigning them to windows defined by a time or row interval.
          • *
          • * For streaming tables of infinite size, grouping into windows is required to define finite
          • * groups on which group-based aggregates can be computed.
          • *
          • * For batch tables of finite size, windowing essentially provides shortcuts for time-based
          • * groupBy.
          • *
          • * @param groupWindow group-window that specifies how elements are grouped.
          • * @return A windowed table.
          • */
          • def window(groupWindow: GroupWindow): GroupWindowedTable = { - new GroupWindowedTable(table, groupKey, groupWindow) - }

            }

          -class GroupWindowedTable(
          +class WindowGroupedTable(
          private[flink] val table: Table,
          private[flink] val groupKey: Seq[Expression],

          • private[flink] val window: GroupWindow) {
            + private[flink] val window: Window) {

          /**

          • * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
            + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement.
          • The field expressions can contain complex expressions and aggregations.
            *
          • Example:
            *
          • {{ { - * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average) + * windowGroupedTable.select('key, 'window.start, 'value.avg + " The average" as 'average) * }

            }}
            */
            def select(fields: Expression*): Table = {
            + // get group keys by removing window column

              • End diff –

          window column -> window alias

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96860442 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -855,57 +858,45 @@ class GroupedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } - /** * Groups the records of a table by assigning them to windows defined by a time or row interval. * * For streaming tables of infinite size, grouping into windows is required to define finite * groups on which group-based aggregates can be computed. * * For batch tables of finite size, windowing essentially provides shortcuts for time-based * groupBy. * * @param groupWindow group-window that specifies how elements are grouped. * @return A windowed table. */ def window(groupWindow: GroupWindow): GroupWindowedTable = { - new GroupWindowedTable(table, groupKey, groupWindow) - } } -class GroupWindowedTable( +class WindowGroupedTable( private [flink] val table: Table, private [flink] val groupKey: Seq [Expression] , private [flink] val window: GroupWindow) { + private [flink] val window: Window) { /** * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement. + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations. * Example: * {{ { - * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average) + * windowGroupedTable.select('key, 'window.start, 'value.avg + " The average" as 'average) * } }} */ def select(fields: Expression*): Table = { + // get group keys by removing window column End diff – window column -> window alias
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96864474

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -116,4 +119,21 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
          val results = windowedTable.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test(expected = classOf[ValidationException])
          + def testMutilGroupWindow(): Unit = {
          — End diff –

          Mutil -> Multi

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96864474 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -116,4 +119,21 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val results = windowedTable.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test(expected = classOf [ValidationException] ) + def testMutilGroupWindow(): Unit = { — End diff – Mutil -> Multi
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96862582

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -60,7 +60,7 @@ abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {

          • @return this window
            */
            def as(alias: Expression): EventTimeWindow = {
              • End diff –

          I think the `as` methods of windows can could be moved to `Window`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96862582 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -60,7 +60,7 @@ abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow { @return this window */ def as(alias: Expression): EventTimeWindow = { End diff – I think the `as` methods of windows can could be moved to `Window`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96860112

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -930,3 +921,46 @@ class GroupWindowedTable(
          }

          }
          +
          +
          +class WindowedTable(
          + private[flink] val table: Table,
          + private[flink] val window: Window) {
          +
          + /**
          + * Groups the elements on some keys (window alias or group keys). It should be noted that one
          + * window alias MUST be included in the key list. Use this function before a selection with
          + * aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY
          + * statement.
          + *
          + * Example:
          + *
          + * {{

          { + * tab.groupBy('windowAlias, 'key).select('key, 'value.avg) + * }

          }}
          + */
          + def groupBy(fields: Expression*): WindowGroupedTable = {
          + if (fields.filter(window.alias.get.equals(_)).length != 1) {
          + throw new ValidationException("Group by must contain only one window column.")
          — End diff –

          "GroupBy must contain exactly one window alias.".

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96860112 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -930,3 +921,46 @@ class GroupWindowedTable( } } + + +class WindowedTable( + private [flink] val table: Table, + private [flink] val window: Window) { + + /** + * Groups the elements on some keys (window alias or group keys). It should be noted that one + * window alias MUST be included in the key list. Use this function before a selection with + * aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY + * statement. + * + * Example: + * + * {{ { + * tab.groupBy('windowAlias, 'key).select('key, 'value.avg) + * } }} + */ + def groupBy(fields: Expression*): WindowGroupedTable = { + if (fields.filter(window.alias.get.equals(_)).length != 1) { + throw new ValidationException("Group by must contain only one window column.") — End diff – "GroupBy must contain exactly one window alias.".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r96859413

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -798,11 +798,14 @@ class Table(

          • _Note_: window on non-grouped streaming table is a non-parallel operation, i.e., all data
              • End diff –

          This comment should be removed. Since `window` and `groupBy` are now switched, the windows can still be computed in parallel if `groupBy` is called on `WindowedTable`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r96859413 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -798,11 +798,14 @@ class Table( _ Note _: window on non-grouped streaming table is a non-parallel operation, i.e., all data End diff – This comment should be removed. Since `window` and `groupBy` are now switched, the windows can still be computed in parallel if `groupBy` is called on `WindowedTable`.
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi Fabian Hueske sorry for the delayed reply
          According to our discussion, I updated the PR, Let me know If I miss something, thanks a lot.

          Show
          sunjincheng121 sunjincheng added a comment - Hi Fabian Hueske sorry for the delayed reply According to our discussion, I updated the PR, Let me know If I miss something, thanks a lot.
          Hide
          fhueske Fabian Hueske added a comment -

          I think it would be good to get a fix for this soon to include it in the 1.2.0 release.
          Otherwise we would break the API in the next version (1.3.0) which I would like to avoid.
          sunjincheng, do you think you can contribute a PR in today or tomorrow?

          Thank you very much, Fabian

          Show
          fhueske Fabian Hueske added a comment - I think it would be good to get a fix for this soon to include it in the 1.2.0 release. Otherwise we would break the API in the next version (1.3.0) which I would like to avoid. sunjincheng , do you think you can contribute a PR in today or tomorrow? Thank you very much, Fabian
          Hide
          ShaoxuanWang Shaoxuan Wang added a comment -

          Yes Fabian Hueske, we should not support groupby() without keys and window alias.

          Show
          ShaoxuanWang Shaoxuan Wang added a comment - Yes Fabian Hueske , we should not support groupby() without keys and window alias.
          Hide
          sunjincheng121 sunjincheng added a comment -

          The first approach +1

          Show
          sunjincheng121 sunjincheng added a comment - The first approach +1
          Hide
          fhueske Fabian Hueske added a comment -

          Yes, that's what I would propose.

          GroupedTable has an optional window attribute (optional because you can also say tab.groupBy().select() which results in a grouped non-windowed aggregation).
          When we later add row windows (SQL OVER-style) to the Table API, we add a select() method to WindowedTable.

          I think there is one open question. How do we handle the case if WindowedTable.groupBy() does not reference a window alias.
          I see three options:

          1. Fail and ask for a window alias
          2. Accept and use the window defined in the WindowTable (this makes the window alias optional)
          3. Accept and compute a non-windowed grouped aggregate

          I think failing might be the best approach (together with enforcing window aliases in window()).
          This would be the most explicit approach. The other two cases might result in behavior which is unexpected by users.

          What do you think sunjincheng and Shaoxuan Wang?

          Show
          fhueske Fabian Hueske added a comment - Yes, that's what I would propose. GroupedTable has an optional window attribute (optional because you can also say tab.groupBy().select() which results in a grouped non-windowed aggregation). When we later add row windows (SQL OVER-style) to the Table API, we add a select() method to WindowedTable . I think there is one open question. How do we handle the case if WindowedTable.groupBy() does not reference a window alias. I see three options: 1. Fail and ask for a window alias 2. Accept and use the window defined in the WindowTable (this makes the window alias optional) 3. Accept and compute a non-windowed grouped aggregate I think failing might be the best approach (together with enforcing window aliases in window() ). This would be the most explicit approach. The other two cases might result in behavior which is unexpected by users. What do you think sunjincheng and Shaoxuan Wang ?
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi Fabian Hueske Thank you for the detailed explanation!!!
          If I understand you correctly, you mean that we should rely on the type of restrictions to guide users to do the right thing, in this case:

          tab //('a,'b,'c)
          .window( ... on 'a as 'w1) //WindowedTable
          .groupBy('w1,'a,'b) //GroupedTable
          .select('a,'b,c.count as 'mycount) //Table
          .window(...  on 'b as 'w2)
          .groupBy('w2)
          .select(...)
          
          

          is this correct? If not, please let me know ... thanks a lot.

          Show
          sunjincheng121 sunjincheng added a comment - Hi Fabian Hueske Thank you for the detailed explanation!!! If I understand you correctly, you mean that we should rely on the type of restrictions to guide users to do the right thing, in this case: tab //('a,'b,'c) .window( ... on 'a as 'w1) //WindowedTable .groupBy('w1,'a,'b) //GroupedTable .select('a,'b,c.count as 'mycount) //Table .window(... on 'b as 'w2) .groupBy('w2) .select(...) is this correct? If not, please let me know ... thanks a lot.
          Hide
          fhueske Fabian Hueske added a comment -

          Hi sunjincheng, thanks for the reply.

          My concerns are about your first example, i.e. "#1 windows are defined at the start and used later". Here you could do something like

          val windowedTable = table // table: (a, b, c)
           .window(Slide over 10.milli every 5.milli on 'a as 'w1)
           .window(Tumble over 5.milli on 'b as 'w2)
           .groupBy('w1, 'c)
           .select('c, 'b.count as 'bcnt, 'b.sum as 'bsum, 'w1.start)
           .groupBy( 'w2, 'bcnt)
           .select('bcnt, 'bsum.avg as 'bsumavg)
          

          Here, the second window 'w2 is defined on attribute b. However, later when window 'w2 is used, the input table does no longer have the attribute b.

          I am much more in favor of variant "#2 windows are defined with groupBy", which ties window() and groupBy() together. Actually, I think this is a very nice syntax.

          I am also not convinced that removing the internal table classes such as GroupedTable and GroupWindowedTable improves the code structure.
          Instead of having multiple well scoped implementations of select (each with its appropriate documentation) we would have one select() method with many case distinctions (simple select, non-grouped aggregate, grouped aggregate, group-windowed aggregate).

          Show
          fhueske Fabian Hueske added a comment - Hi sunjincheng , thanks for the reply. My concerns are about your first example, i.e. "#1 windows are defined at the start and used later". Here you could do something like val windowedTable = table // table: (a, b, c) .window(Slide over 10.milli every 5.milli on 'a as 'w1) .window(Tumble over 5.milli on 'b as 'w2) .groupBy('w1, 'c) .select('c, 'b.count as 'bcnt, 'b.sum as 'bsum, 'w1.start) .groupBy( 'w2, 'bcnt) .select('bcnt, 'bsum.avg as 'bsumavg) Here, the second window 'w2 is defined on attribute b . However, later when window 'w2 is used, the input table does no longer have the attribute b . I am much more in favor of variant "#2 windows are defined with groupBy", which ties window() and groupBy() together. Actually, I think this is a very nice syntax. I am also not convinced that removing the internal table classes such as GroupedTable and GroupWindowedTable improves the code structure. Instead of having multiple well scoped implementations of select (each with its appropriate documentation) we would have one select() method with many case distinctions (simple select, non-grouped aggregate, grouped aggregate, group-windowed aggregate).
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi Fabian Hueske shaoxuan wang thanks for the reply .
          Fabian Hueske You are right, no matter it is a stream table or a batch table, we need to ensure the correctness. As you said we must check the window's properties at the implementation phase. I agree with you.

          BTW, "Groupby ('w)" is not only consistent with the row-window, but also consistent with the calcite SQL. For instance:

          GroupBy:

          SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
            productId,
            COUNT(*) AS c,
            SUM(units) AS units
          FROM Orders
          GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
          

          Over:

          SELECT STREAM *
          FROM (
            SELECT STREAM rowtime,
              productId,
              units,
              AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
              AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
            FROM Orders
            WINDOW product AS (
              ORDER BY rowtime
              PARTITION BY productId))
          WHERE m10 > d7;
          

          The following two statements are supported by the current changes:
          #1. windows are defined at the start and used later:

          val windowedTable = table
           .window(Slide over 10.milli every 5.milli as 'w1)
           .window(Tumble over 5.milli  as 'w2)
           .groupBy('w1, 'key)
           .select('string, 'int.count as 'count, 'w1.start)
           .groupBy( 'w2, 'key)
           .select('string, 'count.sum as sum2)
          

          #2. windows are defined with groupBy:

           val windowedTable = table
           .window(Slide over 10.milli every 5.milli as 'w1)
           .groupBy('w1, 'key)
           .select('string, 'int.count as 'count, 'w1.start)
           .window(Tumble over 5.milli  as 'w2)
           .groupBy( 'w2, 'key)
           .select('string, 'count.sum as sum2)
          

          I hope this makes sense to you?
          You said "by tying window and groupBy together, we could avoid such situations" is just like # 2 or must be written "groupBy (). Window ()"?

          reference:
          Azure: https://msdn.microsoft.com/en-us/library/azure/dn835051.aspx
          Calcite: http://calcite.apache.org/docs/stream.html#tumbling-windows

          Show
          sunjincheng121 sunjincheng added a comment - Hi Fabian Hueske shaoxuan wang thanks for the reply . Fabian Hueske You are right, no matter it is a stream table or a batch table, we need to ensure the correctness. As you said we must check the window's properties at the implementation phase. I agree with you. BTW, "Groupby ('w)" is not only consistent with the row-window, but also consistent with the calcite SQL. For instance: GroupBy: SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime, productId, COUNT(*) AS c, SUM(units) AS units FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId; Over: SELECT STREAM * FROM ( SELECT STREAM rowtime, productId, units, AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10, AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7 FROM Orders WINDOW product AS ( ORDER BY rowtime PARTITION BY productId)) WHERE m10 > d7; The following two statements are supported by the current changes: #1. windows are defined at the start and used later: val windowedTable = table .window(Slide over 10.milli every 5.milli as 'w1) .window(Tumble over 5.milli as 'w2) .groupBy('w1, 'key) .select('string, ' int .count as 'count, 'w1.start) .groupBy( 'w2, 'key) .select('string, 'count.sum as sum2) #2. windows are defined with groupBy: val windowedTable = table .window(Slide over 10.milli every 5.milli as 'w1) .groupBy('w1, 'key) .select('string, ' int .count as 'count, 'w1.start) .window(Tumble over 5.milli as 'w2) .groupBy( 'w2, 'key) .select('string, 'count.sum as sum2) I hope this makes sense to you? You said "by tying window and groupBy together, we could avoid such situations" is just like # 2 or must be written "groupBy (). Window ()"? reference: Azure: https://msdn.microsoft.com/en-us/library/azure/dn835051.aspx Calcite: http://calcite.apache.org/docs/stream.html#tumbling-windows
          Hide
          ShaoxuanWang Shaoxuan Wang added a comment - - edited

          sunjincheng, thanks for the updates.

          Hi Fabian Hueske,
          The major reason we propose this change is that window is a clause but not an operator from semantic point of view.
          This changes give the flexibility to users such that they still can put window clause and groupby close (just move the window definition before groupby) if they want.
          I think your have raised a good question on "scope of window" for batch window on a certain column (which could be removed by some operators). We should make sure this will still work. We will check the design and add the test cases for this.

          Show
          ShaoxuanWang Shaoxuan Wang added a comment - - edited sunjincheng , thanks for the updates. Hi Fabian Hueske , The major reason we propose this change is that window is a clause but not an operator from semantic point of view. This changes give the flexibility to users such that they still can put window clause and groupby close (just move the window definition before groupby) if they want. I think your have raised a good question on "scope of window" for batch window on a certain column (which could be removed by some operators). We should make sure this will still work. We will check the design and add the test cases for this.
          Hide
          fhueske Fabian Hueske added a comment -

          Hi, sorry for the delayed reply.

          I like the idea of making the use of group and row windows consistent. Also referencing the window alias in groupBy is OK for me.

          However, I am not so sure about the proposal to define windows anywhere in the Table API query as in the example above where w1 and w2 are defined at the start and used later.
          For batch tables, windows can be defined on arbitrary time attributes (e.g., orderTime). It might happen that we define a window on a time attribute, perform some other operations which removed the window time attribute, and try to groupBy the defined window, however, the window attribute is no longer present in the input.

          Of course we can validate that the attribute exists and give a meaningful error message. However, by tying window and groupBy together, we could avoid such situations.

          What do you think?

          Show
          fhueske Fabian Hueske added a comment - Hi, sorry for the delayed reply. I like the idea of making the use of group and row windows consistent. Also referencing the window alias in groupBy is OK for me. However, I am not so sure about the proposal to define windows anywhere in the Table API query as in the example above where w1 and w2 are defined at the start and used later. For batch tables, windows can be defined on arbitrary time attributes (e.g., orderTime ). It might happen that we define a window on a time attribute, perform some other operations which removed the window time attribute, and try to groupBy the defined window, however, the window attribute is no longer present in the input. Of course we can validate that the attribute exists and give a meaningful error message. However, by tying window and groupBy together, we could avoid such situations. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r95315326

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -150,7 +147,7 @@ class TumblingWindow(size: Expression) extends GroupWindow {
          def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))

          override private[flink] def toLogicalWindow: LogicalWindow =

          • ProcessingTimeTumblingGroupWindow(alias, size)
            + ProcessingTimeTumblingGroupWindow(name, size)
              • End diff –

          Hi, @shaoxuan-wang thanks a lot for the review. I have updated the PR according to your comments. The change list:
          1. Remove GroupWindowedTable.
          2. Chanage "name" to "alias".
          3. Add testMultiWindow.
          4. Rebase code and fixed the conflicts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r95315326 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -150,7 +147,7 @@ class TumblingWindow(size: Expression) extends GroupWindow { def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias)) override private [flink] def toLogicalWindow: LogicalWindow = ProcessingTimeTumblingGroupWindow(alias, size) + ProcessingTimeTumblingGroupWindow(name, size) End diff – Hi, @shaoxuan-wang thanks a lot for the review. I have updated the PR according to your comments. The change list: 1. Remove GroupWindowedTable. 2. Chanage "name" to "alias". 3. Add testMultiWindow. 4. Rebase code and fixed the conflicts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r95244683

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -150,7 +147,7 @@ class TumblingWindow(size: Expression) extends GroupWindow {
          def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))

          override private[flink] def toLogicalWindow: LogicalWindow =

          • ProcessingTimeTumblingGroupWindow(alias, size)
            + ProcessingTimeTumblingGroupWindow(name, size)
              • End diff –

          Better to keep using "alias" here. By the way,
          I found that the input of some functions use "name", like ProcessingTimeTumblingGroupWindow, while some others use "alias", like TumblingEventTimeWindow. Better to change them consistently to "alias". What do you think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r95244683 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -150,7 +147,7 @@ class TumblingWindow(size: Expression) extends GroupWindow { def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias)) override private [flink] def toLogicalWindow: LogicalWindow = ProcessingTimeTumblingGroupWindow(alias, size) + ProcessingTimeTumblingGroupWindow(name, size) End diff – Better to keep using "alias" here. By the way, I found that the input of some functions use "name", like ProcessingTimeTumblingGroupWindow, while some others use "alias", like TumblingEventTimeWindow. Better to change them consistently to "alias". What do you think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3046#discussion_r95219196

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -785,13 +793,19 @@ class Table(

          • will be processed by a single operator.
            *
          • @param groupWindow group-window that specifies how elements are grouped.
          • * @return A windowed table.
            */
          • def window(groupWindow: GroupWindow): GroupWindowedTable = {
            + def window(groupWindow: GroupWindow): Table = {
            if (tableEnv.isInstanceOf[BatchTableEnvironment]) { throw new ValidationException(s"Windows on batch tables are currently not supported.") }
          • new GroupWindowedTable(this, Seq(), groupWindow)
            + if (None == groupWindow.name) { + throw new ValidationException("An alias must be specified for the window.") + }

            + if (windowPool.contains(groupWindow.name.get)) {
            + throw new ValidationException("The window alias can not be duplicated.")

              • End diff –

          s/"The window alias can not be duplicated."/"The same window alias has been defined"/

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3046#discussion_r95219196 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -785,13 +793,19 @@ class Table( will be processed by a single operator. * @param groupWindow group-window that specifies how elements are grouped. * @return A windowed table. */ def window(groupWindow: GroupWindow): GroupWindowedTable = { + def window(groupWindow: GroupWindow): Table = { if (tableEnv.isInstanceOf [BatchTableEnvironment] ) { throw new ValidationException(s"Windows on batch tables are currently not supported.") } new GroupWindowedTable(this, Seq(), groupWindow) + if (None == groupWindow.name) { + throw new ValidationException("An alias must be specified for the window.") + } + if (windowPool.contains(groupWindow.name.get)) { + throw new ValidationException("The window alias can not be duplicated.") End diff – s/"The window alias can not be duplicated."/"The same window alias has been defined"/
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3046

          FLINK-5386[Table API & SQL] refactoring Window Clause

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-5386[Table API & SQL] refactoring Window Clause")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-5386-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3046.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3046


          commit 5f5b8789b6c833d73ed189f606f0e81095205990
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-15T02:31:33Z

          up

          commit decb18ad2a6b83a78bc9d4ef4acacfc9c877d866
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-15T02:31:33Z

          up

          commit 538af431d054fe9da4ee30f1c6b7e46c5c20181a
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-23T03:20:00Z

          Merge branch 'master' of https://github.com/sunjincheng121/flink

          commit c0e0c4245cf6c5d842a91b91e40929912346c755
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-23T06:11:42Z

          FLINK-5386[Table API & SQL] refactoring Window Clause


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3046 FLINK-5386 [Table API & SQL] refactoring Window Clause Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-5386 [Table API & SQL] refactoring Window Clause") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5386 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3046.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3046 commit 5f5b8789b6c833d73ed189f606f0e81095205990 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-15T02:31:33Z up commit decb18ad2a6b83a78bc9d4ef4acacfc9c877d866 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-15T02:31:33Z up commit 538af431d054fe9da4ee30f1c6b7e46c5c20181a Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-23T03:20:00Z Merge branch 'master' of https://github.com/sunjincheng121/flink commit c0e0c4245cf6c5d842a91b91e40929912346c755 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-23T06:11:42Z FLINK-5386 [Table API & SQL] refactoring Window Clause

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development