Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Syntax:

      table
         .overWindows(
          (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] 
            (preceding  UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
           [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
          as alias,...[n])
         )
        .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
      

      Implement restrictions:

      • All OVER clauses in the same SELECT clause must be exactly the same.
      • The PARTITION BY clause is optional (no partitioning results in single threaded execution).
      • The ORDER BY Before the FLINK-5884 implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch).
      • FOLLOWING is not supported.

      User interface design document See

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Hi, sunjincheng,

          Thanks for opening the JIRA and working on this issue.
          I think we need an explicit and mandatory orderBy because it will be required for batch queries.

          Thanks, Fabian

          Show
          fhueske Fabian Hueske added a comment - Hi, sunjincheng , Thanks for opening the JIRA and working on this issue. I think we need an explicit and mandatory orderBy because it will be required for batch queries. Thanks, Fabian
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi, Fabian Hueske about orderBy clause the time indicators of FLINK-5884's design can shared in this JIRA.Before the FLINK-5884 implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch). what do you think?

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Fabian Hueske about orderBy clause the time indicators of FLINK-5884 's design can shared in this JIRA.Before the FLINK-5884 implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch). what do you think?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi sunjincheng, yes. I would handle that exactly like the group windows. 'rowtime and 'proctime for streaming, and a regular time attribute for batch. The batch case needs to be deactivated for now, because we do not have the operators for that yet.

          Show
          fhueske Fabian Hueske added a comment - Hi sunjincheng , yes. I would handle that exactly like the group windows. 'rowtime and 'proctime for streaming, and a regular time attribute for batch. The batch case needs to be deactivated for now, because we do not have the operators for that yet.
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          That's great Fabian Hueske.

          Here I have a question to ask for your opinion, the specific situation is this:

          • Statistical requirements: The order amount rank per millisecond.
          • OrderData:
            (timestamp, orderId, amount)
            (1L, order1, 2)
            (1L, order2, 5)
            (1L, order3, 3)
            (1L, order4, 1)
            (2L, order5, 20)
            (2L, order6, 11)
            (2L, order7, 9)
            
          • The expected output is:
            (timestamp, orderId, amount, rank)
            (1L, order1, 2, 3)
            (1L, order2, 5, 1)
            (1L, order3, 3, 2)
            (1L, order4, 1, 4)
            (2L, order5, 20, 2)
            (2L, order6, 11, 3)
            (2L, order7, 30, 1)
            

          Do you think we should support the following syntax? (OVER on a GROUPWINDOW. IMO. we can support it)

           table
             .window(Tumble over 1.milli as 'w)
             .select('timestamp,'orderId, 'amount, rank('amount) over 'w as 'rank)
          

          Thanks,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - - edited That's great Fabian Hueske . Here I have a question to ask for your opinion, the specific situation is this: Statistical requirements: The order amount rank per millisecond. OrderData: (timestamp, orderId, amount) (1L, order1, 2) (1L, order2, 5) (1L, order3, 3) (1L, order4, 1) (2L, order5, 20) (2L, order6, 11) (2L, order7, 9) The expected output is: (timestamp, orderId, amount, rank) (1L, order1, 2, 3) (1L, order2, 5, 1) (1L, order3, 3, 2) (1L, order4, 1, 4) (2L, order5, 20, 2) (2L, order6, 11, 3) (2L, order7, 30, 1) Do you think we should support the following syntax? (OVER on a GROUPWINDOW. IMO. we can support it) table .window(Tumble over 1.milli as 'w) .select('timestamp,'orderId, 'amount, rank('amount) over 'w as 'rank) Thanks, SunJincheng
          Hide
          fhueske Fabian Hueske added a comment - - edited

          I think that use case is not supported yet (but should be at some point).

          The point is that RANK() is applied to all records in a partition and the order in which rank counts are assigned depends on the ORDER BY clause. If you want to reset the rank counter after each millisecond, you need to partition on time and order by amount. Rank does also not take a parameter.

          I think in SQL the query would look as follows:

          SELECT timestamp, orderId, amount, rank() OVER (PARTITION BY CEIL(timestamp TO MILLISECOND) ORDER BY amount) FROM stream
          

          So, we would partition on timestamp and order by amount and compute the RANK() function on each partition.

          I don't think the current OVER window implementations are able to execute your use case.

          Show
          fhueske Fabian Hueske added a comment - - edited I think that use case is not supported yet (but should be at some point). The point is that RANK() is applied to all records in a partition and the order in which rank counts are assigned depends on the ORDER BY clause. If you want to reset the rank counter after each millisecond, you need to partition on time and order by amount. Rank does also not take a parameter. I think in SQL the query would look as follows: SELECT timestamp, orderId, amount, rank() OVER (PARTITION BY CEIL(timestamp TO MILLISECOND) ORDER BY amount) FROM stream So, we would partition on timestamp and order by amount and compute the RANK() function on each partition. I don't think the current OVER window implementations are able to execute your use case.
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          IMO.The definition of the tableAPI window can be divided into two parts:
          1. Window size definition: For example: TumbleWindow, SlideWindow, SessionWindow, OverWindow (in this JIRA. will added)
          2. Window trigger type definition: For example: groupBy a group of output a result and over each row output a result (in this JIRA. will added)
          So, in the future we want to support the following two combinations:

             groupBy('windowAlias) --> groupBy + (TumbleWindow, SlideWindow, SessionWindow, OverWindow) with groupAgg
            over  'windowAlias -->  over+ (TumbleWindow, SlideWindow, SessionWindow, OverWindow) wich overAgg
          

          In other words, the four window size definitions (Tumble, Slide, Session, Over) and two window trigger type definition (groupBy, over) is orthogonal. And moreover TableAPI is powerful than SQL, i.e. SQL supports the functionality of a subset of the Table API. (Although In this JIRA. we only implement over + OverWindow).
          what do you think?

          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - - edited IMO.The definition of the tableAPI window can be divided into two parts: 1. Window size definition: For example: TumbleWindow , SlideWindow , SessionWindow , OverWindow (in this JIRA. will added) 2. Window trigger type definition: For example: groupBy a group of output a result and over each row output a result (in this JIRA. will added) So, in the future we want to support the following two combinations:    groupBy('windowAlias) --> groupBy + (TumbleWindow, SlideWindow, SessionWindow, OverWindow) with groupAgg   over 'windowAlias --> over+ (TumbleWindow, SlideWindow, SessionWindow, OverWindow) wich overAgg In other words, the four window size definitions (Tumble, Slide, Session, Over) and two window trigger type definition (groupBy, over) is orthogonal. And moreover TableAPI is powerful than SQL, i.e. SQL supports the functionality of a subset of the Table API. (Although In this JIRA. we only implement over + OverWindow). what do you think? Best, SunJincheng
          Hide
          fhueske Fabian Hueske added a comment -

          I'm not sure whether each window type is useful / required for both groupBy and over.

          How would an OverWindow be defined? What would be the semantics of an OverWindow with groupBy?
          Also OverWindow (as in SQL) can express the semantics that I have in mind for TumbleWindow and SlideWindow for over. Hence, TumbleWindow and SlideWindow would just be syntactic sugar and not add to expressiveness.
          I agree that SessionWindow would be useful for over as well.

          Show
          fhueske Fabian Hueske added a comment - I'm not sure whether each window type is useful / required for both groupBy and over . How would an OverWindow be defined? What would be the semantics of an OverWindow with groupBy ? Also OverWindow (as in SQL) can express the semantics that I have in mind for TumbleWindow and SlideWindow for over . Hence, TumbleWindow and SlideWindow would just be syntactic sugar and not add to expressiveness. I agree that SessionWindow would be useful for over as well.
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          Yes, Your right In SQL we should be consistent with the standard SQL semantics, I also think in SQL we only support over with OverWindow just like current master code. About OverWindow with groupBy , it's a little confused. Current tableAPI SlideWindow defined both
          window size and trigger frequency, e.g.: Slide over 2.rows every 1.rows, So over with sideWindow dose not make sense at current time(If we want do that we must do some changes).Can you give me an example of using over to describe slide and tumble, for example: Tumble over 5.rows and Slide over 6.milli every 2.milli?

          Show
          sunjincheng121 sunjincheng added a comment - - edited Yes, Your right In SQL we should be consistent with the standard SQL semantics, I also think in SQL we only support over with OverWindow just like current master code. About OverWindow with groupBy , it's a little confused. Current tableAPI SlideWindow defined both window size and trigger frequency, e.g.: Slide over 2.rows every 1.rows , So over with sideWindow dose not make sense at current time(If we want do that we must do some changes).Can you give me an example of using over to describe slide and tumble, for example: Tumble over 5.rows and Slide over 6.milli every 2.milli ?
          Hide
          fhueske Fabian Hueske added a comment -

          That's true. It is not possible to define Tumble Rows windows with SQL over. However, Tumble Time (which should be much more common) is possible by partitioning on the (ceiled) time attribute (as in my previous example).
          My concern is that we overload the API with too many window options which are hard to understand and rarely used in practice.

          For Flink 1.3, I would suggest to focus on adding syntax to the Table API to specify SQL over windows because these can be currently executed. I don't think we should add many more new window types (including translation and execution) for the next release. Of course the row window syntax of the Table API should be designed such that it can be later easily extended without breaking existing code.

          What do you think sunjincheng?

          Show
          fhueske Fabian Hueske added a comment - That's true. It is not possible to define Tumble Rows windows with SQL over. However, Tumble Time (which should be much more common) is possible by partitioning on the (ceiled) time attribute (as in my previous example). My concern is that we overload the API with too many window options which are hard to understand and rarely used in practice. For Flink 1.3, I would suggest to focus on adding syntax to the Table API to specify SQL over windows because these can be currently executed. I don't think we should add many more new window types (including translation and execution) for the next release. Of course the row window syntax of the Table API should be designed such that it can be later easily extended without breaking existing code. What do you think sunjincheng ?
          Hide
          sunjincheng121 sunjincheng added a comment -

          Of course Fabian Hueske I agree with you. 1.3 release is very important, and the time is pressing, we can focus on adding syntax to the Table API according to SQL OVER windows. You are right, we discuss these are for our design considerations. I'll propose the Interface design later today.

          Thanks,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Of course Fabian Hueske I agree with you. 1.3 release is very important, and the time is pressing, we can focus on adding syntax to the Table API according to SQL OVER windows. You are right, we discuss these are for our design considerations. I'll propose the Interface design later today. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3743

          FLINK-6228[table] Integrating the OVER windows in the Table API (st…

          In this PR I had integrating the OVER windows in the Table API, Implementation of the syntax and use examples are as follows:

          • Syntax:
            ```
            table
            .overWindows(
            (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression]
            (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
            [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
            as alias,...[n])
            )
            .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])

          ```

          • examples:
            ```
            // Rows clause
            table
            .window(Over partitionBy 'c orderBy 'rowTime preceding 2.rows as 'w1)
            .select(
            'c,
            'b.count over 'w1 as 'countB,
            'e.sum over 'w1 as 'sumE)

          // Range clause
          table
          .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1)
          .select(
          'c,
          'b.count over 'w1 as 'countB,
          'e.sum over 'w1 as 'sumE)
          ```

          NOTE: The documentation of the OVER tableAPI not included in this PR.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6228[table] Integrating the OVER windows in the Table API")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3743.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3743


          commit 03d4153be93d505cdb47d174e5aafe10eb93a45f
          Author: sunjincheng121 <sunjincheng121@gmail.com>
          Date: 2017-04-13T09:36:18Z

          FLINK-6228[table] Integrating the OVER windows in the Table API (stream)


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3743 FLINK-6228 [table] Integrating the OVER windows in the Table API (st… In this PR I had integrating the OVER windows in the Table API, Implementation of the syntax and use examples are as follows: Syntax: ``` table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,... [n] ) ) .select( [col1,... [n] ], (agg(col1) OVER overWindowAlias, … [n] ) ``` examples: ``` // Rows clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.rows as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) // Range clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) ``` More detail Info : https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit# NOTE: The documentation of the OVER tableAPI not included in this PR. [x] General The pull request references the related JIRA issue (" FLINK-6228 [table] Integrating the OVER windows in the Table API") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6228 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3743.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3743 commit 03d4153be93d505cdb47d174e5aafe10eb93a45f Author: sunjincheng121 <sunjincheng121@gmail.com> Date: 2017-04-13T09:36:18Z FLINK-6228 [table] Integrating the OVER windows in the Table API (stream)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112456485

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -810,6 +810,47 @@ class Table(
          new WindowedTable(this, window)
          }

          + /**
          + * Groups the records of a table by assigning them to windows defined by a time or row interval.
          + *
          + * For streaming tables of infinite size, grouping into windows is required to define finite
          + * groups on which over-based aggregates can be computed.
          + *
          + * Over window for batch tables are currently not supported.
          + *
          + * @param overWindows windows that specifies how elements are grouped.
          + * @return Over windowed table
          + */
          + def window(overWindows: OverWindow*): OverWindowedTable = {
          +
          + if (tableEnv.isInstanceOf[BatchTableEnvironment])

          { + throw TableException("Over window for batch tables are currently not supported.") + }

          else {
          + overWindows.foreach { overWindow =>
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          + if (!orderName.equalsIgnoreCase("rowtime")
          + && !orderName.equalsIgnoreCase("proctime")) {
          + throw ValidationException(
          + s"OrderBy expression must be ['rowtime] or ['proctime], but got ['$

          {orderName}

          ]")
          + }
          + }
          + }
          +
          + if (overWindows.size != 1)

          { + throw TableException("OverWindow only supported single window at current time.") + }

          +
          + overWindows.foreach { overWindow =>
          + if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
          — End diff –

          This check should be done in `OverCall.validateInput()` as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456485 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + * For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which over-based aggregates can be computed. + * + * Over window for batch tables are currently not supported. + * + * @param overWindows windows that specifies how elements are grouped. + * @return Over windowed table + */ + def window(overWindows: OverWindow*): OverWindowedTable = { + + if (tableEnv.isInstanceOf [BatchTableEnvironment] ) { + throw TableException("Over window for batch tables are currently not supported.") + } else { + overWindows.foreach { overWindow => + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + if (!orderName.equalsIgnoreCase("rowtime") + && !orderName.equalsIgnoreCase("proctime")) { + throw ValidationException( + s"OrderBy expression must be ['rowtime] or ['proctime] , but got ['$ {orderName} ]") + } + } + } + + if (overWindows.size != 1) { + throw TableException("OverWindow only supported single window at current time.") + } + + overWindows.foreach { overWindow => + if (!overWindow.preceding.asInstanceOf [Literal] .resultType.getClass — End diff – This check should be done in `OverCall.validateInput()` as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112459590

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -190,8 +190,8 @@ object ProjectionTranslator {
          def expandProjectList(
          — End diff –

          I would not change the `expandProjectList()` method which has a dedicated purpose.

          Instead, I would add a new method to `ProjectionTranslator`, which just translates the over windows:

          ```
          def translateOverWindows(
          exprs: Seq[Expression],
          overWindows: Array[OverWindow]): Seq[Expression]
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112459590 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( — End diff – I would not change the `expandProjectList()` method which has a dedicated purpose. Instead, I would add a new method to `ProjectionTranslator`, which just translates the over windows: ``` def translateOverWindows( exprs: Seq [Expression] , overWindows: Array [OverWindow] ): Seq [Expression] ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112460885

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -216,6 +216,26 @@ object ProjectionTranslator

          { projectList += unresolved }

          + case OverCall(agg, aggAlias, alias, _) =>
          + val overWindow = overWindows.find(_.alias.equals(alias))
          + val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
          + val childrenOutput = parent.output
          + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
          +
          + val resolvedFieldReference = if (candidates.length > 1) {
          — End diff –

          the other expressions are resolved later. We should follow this pattern to the the code base consistent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460885 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => + val overWindow = overWindows.find(_.alias.equals(alias)) + val aggName = agg.child.asInstanceOf [UnresolvedFieldReference] .name + val childrenOutput = parent.output + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + + val resolvedFieldReference = if (candidates.length > 1) { — End diff – the other expressions are resolved later. We should follow this pattern to the the code base consistent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112457606

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -190,8 +190,8 @@ object ProjectionTranslator {
          def expandProjectList(
          exprs: Seq[Expression],
          parent: LogicalNode,

          • tableEnv: TableEnvironment)
          • : Seq[Expression] = {
            + tableEnv: TableEnvironment,
            + overWindows: OverWindow*): Seq[Expression] = {
              • End diff –

          Please use `Array` instead of varargs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112457606 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( exprs: Seq [Expression] , parent: LogicalNode, tableEnv: TableEnvironment) : Seq [Expression] = { + tableEnv: TableEnvironment, + overWindows: OverWindow*): Seq [Expression] = { End diff – Please use `Array` instead of varargs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112460318

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -216,6 +216,26 @@ object ProjectionTranslator

          { projectList += unresolved }

          + case OverCall(agg, aggAlias, alias, _) =>
          + val overWindow = overWindows.find(_.alias.equals(alias))
          + val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
          + val childrenOutput = parent.output
          — End diff –

          `childOutput` or `parentOutput`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460318 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => + val overWindow = overWindows.find(_.alias.equals(alias)) + val aggName = agg.child.asInstanceOf [UnresolvedFieldReference] .name + val childrenOutput = parent.output — End diff – `childOutput` or `parentOutput`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112482490

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.

          {Expression, ExpressionParser}

          import org.apache.flink.table.plan.logical._

          /**
          + * An over window specification.
          + *
          + * Over window is similar to the traditional OVER SQL.
          + */
          +class OverWindow {
          +
          + private[flink] var alias: Expression = _
          + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
          + private[flink] var orderBy: Expression = _
          + private[flink] var preceding: Expression = _
          + private[flink] var following: Expression = null
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: Expression): OverWindow = {
          — End diff –

          I think it would be good if we could enforce the alias.
          This could be done by first build a window which is not of type `OverWindow` and letting `as()` return the complete `OverWindow`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112482490 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions. {Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private [flink] var alias: Expression = _ + private [flink] var partitionBy: Seq [Expression] = Seq [Expression] () + private [flink] var orderBy: Expression = _ + private [flink] var preceding: Expression = _ + private [flink] var following: Expression = null + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: Expression): OverWindow = { — End diff – I think it would be good if we could enforce the alias. This could be done by first build a window which is not of type `OverWindow` and letting `as()` return the complete `OverWindow`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112476982

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala —
          @@ -364,6 +365,21 @@ trait ImplicitExpressionOperations {
          def position(haystack: Expression) = Position(expr, haystack)

          /**
          + * For windowing function to config over window
          + * e.g.:
          + * table
          + * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
          + * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
          + */
          + def over(alias: Expression) = {
          + expr match {
          + case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], null, alias)
          — End diff –

          rm `new` (case classes should be instantiated with the `apply()` method, i.e., without `new`)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112476982 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala — @@ -364,6 +365,21 @@ trait ImplicitExpressionOperations { def position(haystack: Expression) = Position(expr, haystack) /** + * For windowing function to config over window + * e.g.: + * table + * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) + */ + def over(alias: Expression) = { + expr match { + case _: Aggregation => new OverCall(expr.asInstanceOf [Aggregation] , null, alias) — End diff – rm `new` (case classes should be instantiated with the `apply()` method, i.e., without `new`)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112471307

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -216,6 +216,26 @@ object ProjectionTranslator

          { projectList += unresolved }

          + case OverCall(agg, aggAlias, alias, _) =>
          + val overWindow = overWindows.find(_.alias.equals(alias))
          + val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
          + val childrenOutput = parent.output
          + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
          +
          + val resolvedFieldReference = if (candidates.length > 1) {
          — End diff –

          The aggregations are automatically resolved and validated if we add `agg` to the children of `OverCall`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112471307 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => + val overWindow = overWindows.find(_.alias.equals(alias)) + val aggName = agg.child.asInstanceOf [UnresolvedFieldReference] .name + val childrenOutput = parent.output + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + + val resolvedFieldReference = if (candidates.length > 1) { — End diff – The aggregations are automatically resolved and validated if we add `agg` to the children of `OverCall`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112479103

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala —
          @@ -586,6 +602,13 @@ trait ImplicitExpressionOperations {

          • to [[ImplicitExpressionOperations]].
            */
            trait ImplicitExpressionConversions {
            +
            + implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue)
            + implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue)
            +
            + implicit val CURRENT_ROW = toRowInterval(0L)
            + implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue)
              • End diff –

          isn't a range interval of `0` a valid range to use? I would mean all rows that arrived in the same millisecond as the current row, no?
          Should we use `-1` as constant for current row instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112479103 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala — @@ -586,6 +602,13 @@ trait ImplicitExpressionOperations { to [ [ImplicitExpressionOperations] ]. */ trait ImplicitExpressionConversions { + + implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue) + implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue) + + implicit val CURRENT_ROW = toRowInterval(0L) + implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue) End diff – isn't a range interval of `0` a valid range to use? I would mean all rows that arrived in the same millisecond as the current row, no? Should we use `-1` as constant for current row instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112469838

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala —
          @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression {

          • Convert Aggregate to its counterpart in Calcite, i.e. AggCall
            */
            private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
            +
            + /**
            + * Because SqlAggFunction from Calcite's AggCallImpl is invisible,
            + * we have to manually create sqlAggFunction in flink code base.
            + *
            + */
            + private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction
            +
            + /**
            + * Attach the Resolved Child to aggregation
            + */
            + private[flink] def withResolvedChild(child: Expression): Aggregation
              • End diff –

          remove the `withResolvedChild()` method as we validate the aggregation argument with the default validation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469838 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala — @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private [flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** + * Because SqlAggFunction from Calcite's AggCallImpl is invisible, + * we have to manually create sqlAggFunction in flink code base. + * + */ + private [flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction + + /** + * Attach the Resolved Child to aggregation + */ + private [flink] def withResolvedChild(child: Expression): Aggregation End diff – remove the `withResolvedChild()` method as we validate the aggregation argument with the default validation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112485134

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          — End diff –

          This class should override `checkInputs()` and check that the interval configuration is valid (unbounded - current row, preceding - current row), and the checks which are moved from `table.scala`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112485134 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( — End diff – This class should override `checkInputs()` and check that the interval configuration is valid (unbounded - current row, preceding - current row), and the checks which are moved from `table.scala`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112455555

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -810,6 +810,47 @@ class Table(
          new WindowedTable(this, window)
          }

          + /**
          — End diff –

          Extend docs to

          ```
          /**

          • Defines over-windows on the records of a table.
            *
          • An over-window defines for each record an interval of records over which aggregation
          • functions can be computed.
            *
          • Example:
            *
          • {{ { * table * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) * .select('c, 'b.count over 'ow, 'e.sum over 'ow) * }

            }}
            *

          • _Note_: Computing over window aggregates on a streaming table is only a parallel operation
          • if the window is partititioned. Otherwise, the whole stream will be processed by a single
          • task, i.e., with parallelism 1.
            *
          • _Note_: Over-windows for batch tables are currently not supported.
            *
          • @param overWindows windows that specify the record interval over which aggregations are
          • computed.
          • @return An OverWindowedTable to specify the aggregations.
            */
            ```
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112455555 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** — End diff – Extend docs to ``` /** Defines over-windows on the records of a table. * An over-window defines for each record an interval of records over which aggregation functions can be computed. * Example: * {{ { * table * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) * .select('c, 'b.count over 'ow, 'e.sum over 'ow) * } }} * _ Note _: Computing over window aggregates on a streaming table is only a parallel operation if the window is partititioned. Otherwise, the whole stream will be processed by a single task, i.e., with parallelism 1. * _ Note _: Over-windows for batch tables are currently not supported. * @param overWindows windows that specify the record interval over which aggregations are computed. @return An OverWindowedTable to specify the aggregations. */ ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112456740

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -928,6 +969,27 @@ class WindowedTable(

          }

          +class OverWindowedTable(
          + private[flink] val table: Table,
          + private[flink] val overWindows: OverWindow*) {
          — End diff –

          Please use `Array` instead of varargs for internal methods & classes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456740 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -928,6 +969,27 @@ class WindowedTable( } +class OverWindowedTable( + private [flink] val table: Table, + private [flink] val overWindows: OverWindow*) { — End diff – Please use `Array` instead of varargs for internal methods & classes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112473915

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -810,6 +810,47 @@ class Table(
          new WindowedTable(this, window)
          }

          + /**
          + * Groups the records of a table by assigning them to windows defined by a time or row interval.
          + *
          + * For streaming tables of infinite size, grouping into windows is required to define finite
          + * groups on which over-based aggregates can be computed.
          + *
          + * Over window for batch tables are currently not supported.
          + *
          + * @param overWindows windows that specifies how elements are grouped.
          + * @return Over windowed table
          + */
          + def window(overWindows: OverWindow*): OverWindowedTable = {
          +
          + if (tableEnv.isInstanceOf[BatchTableEnvironment])

          { + throw TableException("Over window for batch tables are currently not supported.") + }

          else {
          + overWindows.foreach { overWindow =>
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          — End diff –

          This check should be done in `OverCall.validateInput()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112473915 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + * For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which over-based aggregates can be computed. + * + * Over window for batch tables are currently not supported. + * + * @param overWindows windows that specifies how elements are grouped. + * @return Over windowed table + */ + def window(overWindows: OverWindow*): OverWindowedTable = { + + if (tableEnv.isInstanceOf [BatchTableEnvironment] ) { + throw TableException("Over window for batch tables are currently not supported.") + } else { + overWindows.foreach { overWindow => + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name — End diff – This check should be done in `OverCall.validateInput()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112461145

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -216,6 +216,26 @@ object ProjectionTranslator

          { projectList += unresolved }

          + case OverCall(agg, aggAlias, alias, _) =>
          + val overWindow = overWindows.find(_.alias.equals(alias))
          + val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
          + val childrenOutput = parent.output
          + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
          +
          + val resolvedFieldReference = if (candidates.length > 1)

          { + throw new TableException(s"Reference $aggName is ambiguous.") + }

          else if (candidates.isEmpty)

          { + throw new TableException(s"Can not resolve [$aggName].") + }

          else

          { + Some(candidates.head.withName(aggName)) + }

          +
          + projectList += new OverCall(
          — End diff –

          remove `new`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112461145 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => + val overWindow = overWindows.find(_.alias.equals(alias)) + val aggName = agg.child.asInstanceOf [UnresolvedFieldReference] .name + val childrenOutput = parent.output + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + + val resolvedFieldReference = if (candidates.length > 1) { + throw new TableException(s"Reference $aggName is ambiguous.") + } else if (candidates.isEmpty) { + throw new TableException(s"Can not resolve [$aggName].") + } else { + Some(candidates.head.withName(aggName)) + } + + projectList += new OverCall( — End diff – remove `new`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112494282

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall = {
          — End diff –

          `OverCall` should not implement its own `as` method. We should use the regular `Alias` expression for renaming expressions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494282 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { — End diff – `OverCall` should not implement its own `as` method. We should use the regular `Alias` expression for renaming expressions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112555311

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala —
          @@ -83,3 +83,49 @@ object Session

          { */ def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) }

          +
          +/**
          + * Helper object for creating a over window.
          + */
          +object Over {
          — End diff –

          I think we need an equivalent class for the Java Table API, similar as `org.apache.flink.table.api.java.groupWindows.scala`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112555311 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala — @@ -83,3 +83,49 @@ object Session { */ def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) } + +/** + * Helper object for creating a over window. + */ +object Over { — End diff – I think we need an equivalent class for the Java Table API, similar as `org.apache.flink.table.api.java.groupWindows.scala`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112456197

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala —
          @@ -810,6 +810,47 @@ class Table(
          new WindowedTable(this, window)
          }

          + /**
          + * Groups the records of a table by assigning them to windows defined by a time or row interval.
          + *
          + * For streaming tables of infinite size, grouping into windows is required to define finite
          + * groups on which over-based aggregates can be computed.
          + *
          + * Over window for batch tables are currently not supported.
          + *
          + * @param overWindows windows that specifies how elements are grouped.
          + * @return Over windowed table
          + */
          + def window(overWindows: OverWindow*): OverWindowedTable = {
          +
          + if (tableEnv.isInstanceOf[BatchTableEnvironment])

          { + throw TableException("Over window for batch tables are currently not supported.") + }

          else {
          + overWindows.foreach { overWindow =>
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          — End diff –

          The other operators are validated later. Can you add this check to This check to `OverCall.validateInput()`?

          Please add tests to validate that the checks work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456197 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala — @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + * For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which over-based aggregates can be computed. + * + * Over window for batch tables are currently not supported. + * + * @param overWindows windows that specifies how elements are grouped. + * @return Over windowed table + */ + def window(overWindows: OverWindow*): OverWindowedTable = { + + if (tableEnv.isInstanceOf [BatchTableEnvironment] ) { + throw TableException("Over window for batch tables are currently not supported.") + } else { + overWindows.foreach { overWindow => + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name — End diff – The other operators are validated later. Can you add this check to This check to `OverCall.validateInput()`? Please add tests to validate that the checks work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112469357

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime"))

          { + // for stream + relBuilder.literal(orderName) + }

          else

          { + // for batch + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode,sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach(x=>
          + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + precedingValue: Long,
          + sqlKind: SqlKind): RexWindowBound = {
          +
          + if (precedingValue == Long.MaxValue)

          { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + }

          else if (precedingValue == 0L)

          { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + }

          else

          { + + val returnType = new BasicSqlType( + relBuilder.getTypeFactory.getTypeSystem, + SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + }

          + }
          +
          + override private[flink] def children: Seq[Expression] = Seq()
          — End diff –

          change to `Seq(agg)` to automatically validate the aggregation call and its arguments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469357 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name))) + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf [Long] , SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following.value.asInstanceOf [Long] , SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + precedingValue: Long, + sqlKind: SqlKind): RexWindowBound = { + + if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + } else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + } else { + + val returnType = new BasicSqlType( + relBuilder.getTypeFactory.getTypeSystem, + SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private [flink] def children: Seq [Expression] = Seq() — End diff – change to `Seq(agg)` to automatically validate the aggregation call and its arguments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112501222

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala —
          @@ -0,0 +1,47 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.typeutils
          +
          +import org.apache.calcite.rel.`type`.

          {RelDataType, RelDataTypeFactory}

          +import org.apache.calcite.sql.`type`.SqlTypeName
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.table.api.Types._
          +import org.apache.flink.table.api.TableException
          +
          +object SqlTypeUtils {
          — End diff –

          Util class can be removed. We can create a `RelDataType` from a `TypeInformation` as follows:

          ```
          relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112501222 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala — @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.typeutils + +import org.apache.calcite.rel.`type`. {RelDataType, RelDataTypeFactory} +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.api.TableException + +object SqlTypeUtils { — End diff – Util class can be removed. We can create a `RelDataType` from a `TypeInformation` as follows: ``` relBuilder.getTypeFactory.asInstanceOf [FlinkTypeFactory] .createTypeFromTypeInfo(typeInfo) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112495160

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall = {
          — End diff –

          The proper translation can be done in `ProjectTranslator.translateOverWindow()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495160 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { — End diff – The proper translation can be done in `ProjectTranslator.translateOverWindow()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112488004

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala —
          @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression {

          • Convert Aggregate to its counterpart in Calcite, i.e. AggCall
            */
            private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
            +
            + /**
            + * Because SqlAggFunction from Calcite's AggCallImpl is invisible,
              • End diff –

          Thanks for the explanation for this method.
          Can you update the doc to `Returns the SqlAggFunction for this Aggregation` and change the method name to `getSqlAggFunction()`? The method does not convert the Aggregation because the returned SqlAggFunction does not contain the arguments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112488004 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala — @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private [flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** + * Because SqlAggFunction from Calcite's AggCallImpl is invisible, End diff – Thanks for the explanation for this method. Can you update the doc to `Returns the SqlAggFunction for this Aggregation` and change the method name to `getSqlAggFunction()`? The method does not convert the Aggregation because the returned SqlAggFunction does not contain the arguments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112494372

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          — End diff –

          The `aggAlias` should be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494372 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, — End diff – The `aggAlias` should be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112567381

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime"))

          { + // for stream + relBuilder.literal(orderName) + }

          else

          { + // for batch + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode,sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach(x=>
          + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + precedingValue: Long,
          — End diff –

          `precedingValue` -> `value`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567381 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name))) + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf [Long] , SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following.value.asInstanceOf [Long] , SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + precedingValue: Long, — End diff – `precedingValue` -> `value`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112559640

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
          + // for stream
          + relBuilder.literal(orderName)
          — End diff –

          can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112559640 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) — End diff – can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112564205

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          — End diff –

          We also need to validate other properties of the overWindow:

          • check that the partitionBy expressions are valid fields in the input.
          • If a partitionBy expression is not a field, we would need to push the expression into a Project before the Project with the overWindow and reference the new field. This would need to happen in `Table.window()`. I think for now we can have the restriction that only field references are allowed expressions.
          • validate the `preceding` and `following` are literals @
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112564205 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( — End diff – We also need to validate other properties of the overWindow: check that the partitionBy expressions are valid fields in the input. If a partitionBy expression is not a field, we would need to push the expression into a Project before the Project with the overWindow and reference the new field. This would need to happen in `Table.window()`. I think for now we can have the restriction that only field references are allowed expressions. validate the `preceding` and `following` are literals @
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112495783

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
          case expr if !expr.valid => u
          case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$

          {ne.name}

          -$tp")
          case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
          + case over: OverCall if null != over.aggAlias =>
          — End diff –

          We don't need this if we handle the aggregation alias as regular `Alias` expression

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495783 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -54,6 +54,9 @@ case class Project(projectList: Seq [NamedExpression] , child: LogicalNode) extend case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$ {ne.name} -$tp") case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) + case over: OverCall if null != over.aggAlias => — End diff – We don't need this if we handle the aggregation alias as regular `Alias` expression
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112567644

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime"))

          { + // for stream + relBuilder.literal(orderName) + }

          else

          { + // for batch + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode,sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach(x=>
          + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + precedingValue: Long,
          + sqlKind: SqlKind): RexWindowBound = {
          +
          + if (precedingValue == Long.MaxValue) {
          — End diff –

          Please use the constants defined in `expressionDsl.scala` for the checks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567644 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name))) + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf [Long] , SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following.value.asInstanceOf [Long] , SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + precedingValue: Long, + sqlKind: SqlKind): RexWindowBound = { + + if (precedingValue == Long.MaxValue) { — End diff – Please use the constants defined in `expressionDsl.scala` for the checks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112499806

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -93,28 +96,43 @@ class DataStreamOverAggregate(

          val orderKeys = overWindow.orderKeys.getFieldCollations

          • if (orderKeys.size() != 1) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered by a single time column.") - }
          • val orderKey = orderKeys.get(0)
            + val timeType = if (!orderKeys.isEmpty) {
            + if (orderKeys.size() != 1) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + + "column.") + }

            + val orderKey = orderKeys.get(0)

          • if (!orderKey.direction.equals(ASCENDING)) {
          • throw new TableException(
          • "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
            + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + }

            + inputType
            + .getFieldList
            + .get(orderKey.getFieldIndex)
            + .getValue.asInstanceOf[TimeModeType]
            + } else {
            + val it = logicWindow.constants.listIterator()
            + if (it.hasNext) {
            + val item = it.next().getValue
            + if (item.isInstanceOf[NlsString])

            Unknown macro: { + val value = item.asInstanceOf[NlsString].getValue + if (value.equalsIgnoreCase("rowtime")) { + new RowTimeType + } else { + new ProcTimeType + } + }

            + }
            }

          val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

          val generator = new CodeGenerator(

          • tableEnv.getConfig,
          • false,
          • inputDS.getType)
            -
          • val timeType = inputType
          • .getFieldList
          • .get(orderKey.getFieldIndex)
          • .getValue
            + tableEnv.getConfig,
              • End diff –

          indent

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112499806 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations if (orderKeys.size() != 1) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered by a single time column.") - } val orderKey = orderKeys.get(0) + val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + + "column.") + } + val orderKey = orderKeys.get(0) if (!orderKey.direction.equals(ASCENDING)) { throw new TableException( "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType + .getFieldList + .get(orderKey.getFieldIndex) + .getValue.asInstanceOf [TimeModeType] + } else { + val it = logicWindow.constants.listIterator() + if (it.hasNext) { + val item = it.next().getValue + if (item.isInstanceOf [NlsString] ) Unknown macro: { + val value = item.asInstanceOf[NlsString].getValue + if (value.equalsIgnoreCase("rowtime")) { + new RowTimeType + } else { + new ProcTimeType + } + } + } } val inputDS = input.asInstanceOf [DataStreamRel] .translateToPlan(tableEnv) val generator = new CodeGenerator( tableEnv.getConfig, false, inputDS.getType) - val timeType = inputType .getFieldList .get(orderKey.getFieldIndex) .getValue + tableEnv.getConfig, End diff – indent
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112570409

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala —
          @@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.

          {RelDataType, RelDataTypeFieldImpl}

          import org.apache.calcite.rel.core.AggregateCall
          import org.apache.calcite.rel.core.Window.Group
          import org.apache.calcite.rel.core.Window
          -import org.apache.calcite.rex.

          {RexInputRef}

          +import org.apache.calcite.rex.

          {RexInputRef, RexLiteral}

          — End diff –

          All these changes could be undone if we don't use a literal for the orderBy() expression

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570409 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala — @@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`. {RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.Window -import org.apache.calcite.rex. {RexInputRef} +import org.apache.calcite.rex. {RexInputRef, RexLiteral} — End diff – All these changes could be undone if we don't use a literal for the orderBy() expression
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112561580

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime"))

          { + // for stream + relBuilder.literal(orderName) + }

          else

          { + // for batch + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode,sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach(x=>
          — End diff –

          +spacea `foreach( x =>`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112561580 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach(x=> — End diff – +spacea `foreach( x =>`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112570230

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -93,28 +96,43 @@ class DataStreamOverAggregate(

          val orderKeys = overWindow.orderKeys.getFieldCollations

          • if (orderKeys.size() != 1) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered by a single time column.") - }
          • val orderKey = orderKeys.get(0)
            + val timeType = if (!orderKeys.isEmpty) {
            + if (orderKeys.size() != 1) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + + "column.") + }

            + val orderKey = orderKeys.get(0)

          • if (!orderKey.direction.equals(ASCENDING)) {
          • throw new TableException(
          • "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
            + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + }

            + inputType
            + .getFieldList
            + .get(orderKey.getFieldIndex)
            + .getValue.asInstanceOf[TimeModeType]
            + } else {

              • End diff –

          this could be removed if we don't use literals for the orderBy() field

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570230 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations if (orderKeys.size() != 1) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered by a single time column.") - } val orderKey = orderKeys.get(0) + val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + + "column.") + } + val orderKey = orderKeys.get(0) if (!orderKey.direction.equals(ASCENDING)) { throw new TableException( "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType + .getFieldList + .get(orderKey.getFieldIndex) + .getValue.asInstanceOf [TimeModeType] + } else { End diff – this could be removed if we don't use literals for the orderBy() field
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112557404

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          — End diff –

          +space `val sets: util.HashSet`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112557404 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () — End diff – +space `val sets: util.HashSet`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112570095

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime"))

          { + // for stream + relBuilder.literal(orderName) + }

          else

          { + // for batch + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode,sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach(x=>
          + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + precedingValue: Long,
          + sqlKind: SqlKind): RexWindowBound = {
          +
          + if (precedingValue == Long.MaxValue)

          { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + }

          else if (precedingValue == 0L)

          { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + }

          else

          { + + val returnType = new BasicSqlType( + relBuilder.getTypeFactory.getTypeSystem, + SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + }

          + }
          +
          + override private[flink] def children: Seq[Expression] = Seq()
          — End diff –

          maybe we can check the partitionBy expressions here as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570095 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name))) + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf [Long] , SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following.value.asInstanceOf [Long] , SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + precedingValue: Long, + sqlKind: SqlKind): RexWindowBound = { + + if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + } else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + } else { + + val returnType = new BasicSqlType( + relBuilder.getTypeFactory.getTypeSystem, + SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private [flink] def children: Seq [Expression] = Seq() — End diff – maybe we can check the partitionBy expressions here as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112571486

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.

          {Expression, ExpressionParser}

          import org.apache.flink.table.plan.logical._

          /**
          + * An over window specification.
          + *
          + * Over window is similar to the traditional OVER SQL.
          + */
          +class OverWindow {
          +
          + private[flink] var alias: Expression = _
          + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
          + private[flink] var orderBy: Expression = _
          + private[flink] var preceding: Expression = _
          + private[flink] var following: Expression = null
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: Expression): OverWindow =

          { + this.alias = alias + this + }

          +
          + /**
          + * Partitions the elements on some partition keys.
          + *
          + * @param partitionBy
          + * @return this over window
          + */
          + def partitionBy(partitionBy: String): OverWindow =

          { + this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + }

          +
          + /**
          + * Partitions the elements on some partition keys.
          + *
          + * @param partitionBy
          + * @return this over window
          + */
          + def partitionBy(partitionBy: Expression*): OverWindow =

          { + this.partitionBy = partitionBy + this + }

          +
          +
          + /**
          + * Specifies the time mode.
          + *
          + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
          + * to specify time mode.
          + * @return this over window
          + */
          + def orderBy(orderBy: String): OverWindow =

          { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + }

          +
          + /**
          + * Specifies the time mode.
          + *
          + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
          + * to specify time mode.
          + * @return this over window
          + */
          + def orderBy(orderBy: Expression): OverWindow =

          { + this.orderBy = orderBy + this + }

          +
          + /**
          + * Set the preceding offset (based on time or row-count intervals) for over window
          + *
          + * @param preceding forward offset that relative to the current row
          + * @return this over window
          + */
          + def preceding(preceding: String): OverWindow =

          { + this.preceding(ExpressionParser.parseExpression(preceding)) + }

          +
          + /**
          + * Set the preceding offset (based on time or row-count intervals) for over window
          + *
          + * @param preceding forward offset that relative to the current row
          + * @return this over window
          + */
          + def preceding(preceding: Expression): OverWindow =

          { + this.preceding = preceding + this + }

          +
          + /**
          + * Set the following offset (based on time or row-count intervals) for over window
          + *
          + * @param following subsequent offset that relative to the current row
          + * @return this over window
          + */
          + def following(following: String): OverWindow = {
          — End diff –

          Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL.

          What do you think @sunjincheng121 ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112571486 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions. {Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private [flink] var alias: Expression = _ + private [flink] var partitionBy: Seq [Expression] = Seq [Expression] () + private [flink] var orderBy: Expression = _ + private [flink] var preceding: Expression = _ + private [flink] var following: Expression = null + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: Expression): OverWindow = { + this.alias = alias + this + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: String): OverWindow = { + this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: Expression*): OverWindow = { + this.partitionBy = partitionBy + this + } + + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [ [orderBy 'rowtime or orderBy 'proctime] ] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: String): OverWindow = { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [ [orderBy 'rowtime or orderBy 'proctime] ] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: Expression): OverWindow = { + this.orderBy = orderBy + this + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: String): OverWindow = { + this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: Expression): OverWindow = { + this.preceding = preceding + this + } + + /** + * Set the following offset (based on time or row-count intervals) for over window + * + * @param following subsequent offset that relative to the current row + * @return this over window + */ + def following(following: String): OverWindow = { — End diff – Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL. What do you think @sunjincheng121 ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112560976

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall =

          { + this.aggAlias = aggAlias + this + }

          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.toSqlAggFunction()
          +
          + val aggReturnType: TypeInformation[_] = agg.resultType
          +
          + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
          + // for stream
          + relBuilder.literal(orderName)
          — End diff –

          If we do this, we could remove the special literal handling in `OverAggregate` and `DataStreamOverAggregate`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112560976 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation [_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets:util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) — End diff – If we do this, we could remove the special literal handling in `OverAggregate` and `DataStreamOverAggregate`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112570917

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala —
          @@ -0,0 +1,265 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.stream.table
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.functions.source.SourceFunction
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
          +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.table.api.TableEnvironment
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.

          {RowTimeSourceFunction}

          +import org.apache.flink.table.api.scala.stream.utils.

          {StreamITCase, StreamingWithStateTestBase}

          +import org.apache.flink.types.Row
          +import org.junit.Assert._
          +import org.junit.Test
          +
          +import scala.collection.mutable
          +
          +class OverWindowITCase extends StreamingWithStateTestBase {
          +
          + @Test
          + def testProcTimeUnBoundedPartitionedRowOver(): Unit =

          { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeUnBoundedPartitionedRangeOver(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testProcTimeBoundedPartitionedRangeOver(): Unit =

          { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeBoundedPartitionedRowOver(): Unit = {
          — End diff –

          Add a test for rowtime bounded range as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570917 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala — @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase. {RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils. {StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver(): Unit = { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRowOver(): Unit = { — End diff – Add a test for rowtime bounded range as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112628642

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
          case expr if !expr.valid => u
          case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$

          {ne.name}

          -$tp")
          case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
          + case over: OverCall if null != over.aggAlias =>
          — End diff –

          If i understand you correctly, you want add alias for AGG,e.g.:
          ` .select('c, 'b.count over 'w as 'mycount) --> .select('c, 'b.count as 'mycount over 'w)`
          but for SQL we add alias to OVER, e.g.:
          `SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1 from T1`
          So I want to be consistent with SQL,What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112628642 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -54,6 +54,9 @@ case class Project(projectList: Seq [NamedExpression] , child: LogicalNode) extend case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$ {ne.name} -$tp") case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) + case over: OverCall if null != over.aggAlias => — End diff – If i understand you correctly, you want add alias for AGG,e.g.: ` .select('c, 'b.count over 'w as 'mycount) --> .select('c, 'b.count as 'mycount over 'w)` but for SQL we add alias to OVER, e.g.: `SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1 from T1` So I want to be consistent with SQL,What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112628964

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall = {
          — End diff –

          Please, tell me more about this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112628964 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { — End diff – Please, tell me more about this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112605911

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala —
          @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.

          {Expression, ExpressionParser}

          import org.apache.flink.table.plan.logical._

          /**
          + * An over window specification.
          + *
          + * Over window is similar to the traditional OVER SQL.
          + */
          +class OverWindow {
          +
          + private[flink] var alias: Expression = _
          + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
          + private[flink] var orderBy: Expression = _
          + private[flink] var preceding: Expression = _
          + private[flink] var following: Expression = null
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
          +
          + /**
          + * Assigns an alias for this window that the following `select()` clause can refer to.
          + *
          + * @param alias alias for this over window
          + * @return this over window
          + */
          + def as(alias: Expression): OverWindow =

          { + this.alias = alias + this + }

          +
          + /**
          + * Partitions the elements on some partition keys.
          + *
          + * @param partitionBy
          + * @return this over window
          + */
          + def partitionBy(partitionBy: String): OverWindow =

          { + this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + }

          +
          + /**
          + * Partitions the elements on some partition keys.
          + *
          + * @param partitionBy
          + * @return this over window
          + */
          + def partitionBy(partitionBy: Expression*): OverWindow =

          { + this.partitionBy = partitionBy + this + }

          +
          +
          + /**
          + * Specifies the time mode.
          + *
          + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
          + * to specify time mode.
          + * @return this over window
          + */
          + def orderBy(orderBy: String): OverWindow =

          { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + }

          +
          + /**
          + * Specifies the time mode.
          + *
          + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
          + * to specify time mode.
          + * @return this over window
          + */
          + def orderBy(orderBy: Expression): OverWindow =

          { + this.orderBy = orderBy + this + }

          +
          + /**
          + * Set the preceding offset (based on time or row-count intervals) for over window
          + *
          + * @param preceding forward offset that relative to the current row
          + * @return this over window
          + */
          + def preceding(preceding: String): OverWindow =

          { + this.preceding(ExpressionParser.parseExpression(preceding)) + }

          +
          + /**
          + * Set the preceding offset (based on time or row-count intervals) for over window
          + *
          + * @param preceding forward offset that relative to the current row
          + * @return this over window
          + */
          + def preceding(preceding: Expression): OverWindow =

          { + this.preceding = preceding + this + }

          +
          + /**
          + * Set the following offset (based on time or row-count intervals) for over window
          + *
          + * @param following subsequent offset that relative to the current row
          + * @return this over window
          + */
          + def following(following: String): OverWindow = {
          — End diff –

          That's good idea. +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112605911 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala — @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions. {Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private [flink] var alias: Expression = _ + private [flink] var partitionBy: Seq [Expression] = Seq [Expression] () + private [flink] var orderBy: Expression = _ + private [flink] var preceding: Expression = _ + private [flink] var following: Expression = null + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: Expression): OverWindow = { + this.alias = alias + this + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: String): OverWindow = { + this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: Expression*): OverWindow = { + this.partitionBy = partitionBy + this + } + + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [ [orderBy 'rowtime or orderBy 'proctime] ] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: String): OverWindow = { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [ [orderBy 'rowtime or orderBy 'proctime] ] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: Expression): OverWindow = { + this.orderBy = orderBy + this + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: String): OverWindow = { + this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: Expression): OverWindow = { + this.preceding = preceding + this + } + + /** + * Set the following offset (based on time or row-count intervals) for over window + * + * @param following subsequent offset that relative to the current row + * @return this over window + */ + def following(following: String): OverWindow = { — End diff – That's good idea. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112620324

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          — End diff –

          Can we check `partitionBy` as following(In `toRexNode `method):
          ```
          overWindow.partitionBy.foreach {
          x =>
          val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
          if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType)

          { ValidationException( s"expression $partitionKey cannot be used as a partition key expression " + "because it's not a valid key type which must be hashable and comparable") }

          ```
          Because It's not work(can not resolve the partition key field) when I add `partitionBy` into children, and add check logic into `validateInput` method.
          `def children: Seq[Expression] = Seq(agg) ++ overWindow.partitionBy`
          Is there something wrong in the code ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112620324 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( — End diff – Can we check `partitionBy` as following(In `toRexNode `method): ``` overWindow.partitionBy.foreach { x => val partitionKey = relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name) if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) { ValidationException( s"expression $partitionKey cannot be used as a partition key expression " + "because it's not a valid key type which must be hashable and comparable") } ``` Because It's not work(can not resolve the partition key field) when I add `partitionBy` into children, and add check logic into `validateInput` method. `def children: Seq [Expression] = Seq(agg) ++ overWindow.partitionBy` Is there something wrong in the code ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112604758

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala —
          @@ -0,0 +1,265 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.stream.table
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.functions.source.SourceFunction
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
          +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.table.api.TableEnvironment
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.

          {RowTimeSourceFunction}

          +import org.apache.flink.table.api.scala.stream.utils.

          {StreamITCase, StreamingWithStateTestBase}

          +import org.apache.flink.types.Row
          +import org.junit.Assert._
          +import org.junit.Test
          +
          +import scala.collection.mutable
          +
          +class OverWindowITCase extends StreamingWithStateTestBase {
          +
          + @Test
          + def testProcTimeUnBoundedPartitionedRowOver(): Unit =

          { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeUnBoundedPartitionedRangeOver(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testProcTimeBoundedPartitionedRangeOver(): Unit =

          { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeBoundedPartitionedRowOver(): Unit = {
          — End diff –

          In fact, I do not want to add more ITCase, and even I want to remove this ITCase, because TableAPI and SQL of the process function is the same, I think I have a full test in OverWindowTest is enough, so I hope in this class with the least TestCase, cover specific function points, such as `rows / range proc-time / row-time` is enough. What do you think?"

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112604758 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala — @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase. {RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils. {StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver(): Unit = { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRowOver(): Unit = { — End diff – In fact, I do not want to add more ITCase, and even I want to remove this ITCase, because TableAPI and SQL of the process function is the same, I think I have a full test in OverWindowTest is enough, so I hope in this class with the least TestCase, cover specific function points, such as `rows / range proc-time / row-time` is enough. What do you think?"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112633124

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          — End diff –

          Hm, not sure. Would need to look into this further. But, IMO it is very important to validate that the `partitionBy` fields exist.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112633124 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( — End diff – Hm, not sure. Would need to look into this further. But, IMO it is very important to validate that the `partitionBy` fields exist.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112635892

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param aggAlias agg alias for following `select()` clause.
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + var aggAlias: Expression,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + private[flink] def as(aggAlias: Expression): OverCall = {
          — End diff –

          If we remove the `as` method from `OverCall`, it will be handled as a regular`Alias` expression which wraps the `OverCall`.
          Since the `OverCall` is wrapped, the `ProjectTranslator.translateOverWindow()` method needs another `case Alias(OverCall(agg, alias, _), aggAlias, _)` when inserting the OverWindow into the call.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112635892 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private [flink] def as(aggAlias: Expression): OverCall = { — End diff – If we remove the `as` method from `OverCall`, it will be handled as a regular`Alias` expression which wraps the `OverCall`. Since the `OverCall` is wrapped, the `ProjectTranslator.translateOverWindow()` method needs another `case Alias(OverCall(agg, alias, _), aggAlias, _)` when inserting the OverWindow into the call.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112636652

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
          case expr if !expr.valid => u
          case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$

          {ne.name}

          -$tp")
          case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
          + case over: OverCall if null != over.aggAlias =>
          — End diff –

          Currently `.select('c, 'b.count over 'w as 'mycount)` would be translated into `OverCall(Count(b), w, mycount, _)`. If we remove `as` from `OverCall`, it will be wrapped in a regular `Alias` expression: `Alias(OverCall(Count(b), w, _), mycount)`.

          The syntax would be the same, but the internal representation would use the existing `Alias` expression, just like any other expression that is renamed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112636652 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -54,6 +54,9 @@ case class Project(projectList: Seq [NamedExpression] , child: LogicalNode) extend case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"$ {ne.name} -$tp") case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) + case over: OverCall if null != over.aggAlias => — End diff – Currently `.select('c, 'b.count over 'w as 'mycount)` would be translated into `OverCall(Count(b), w, mycount, _)`. If we remove `as` from `OverCall`, it will be wrapped in a regular `Alias` expression: `Alias(OverCall(Count(b), w, _), mycount)`. The syntax would be the same, but the internal representation would use the existing `Alias` expression, just like any other expression that is renamed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112636908

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala —
          @@ -0,0 +1,265 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.stream.table
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.functions.source.SourceFunction
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
          +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.table.api.TableEnvironment
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.

          {RowTimeSourceFunction}

          +import org.apache.flink.table.api.scala.stream.utils.

          {StreamITCase, StreamingWithStateTestBase}

          +import org.apache.flink.types.Row
          +import org.junit.Assert._
          +import org.junit.Test
          +
          +import scala.collection.mutable
          +
          +class OverWindowITCase extends StreamingWithStateTestBase {
          +
          + @Test
          + def testProcTimeUnBoundedPartitionedRowOver(): Unit =

          { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeUnBoundedPartitionedRangeOver(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testProcTimeBoundedPartitionedRangeOver(): Unit =

          { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testRowTimeBoundedPartitionedRowOver(): Unit = {
          — End diff –

          Fair enough. The `OverWindowTest` should be sufficient.
          However, I would keep a few integration tests for the Table API as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112636908 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala — @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase. {RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils. {StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver(): Unit = { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRowOver(): Unit = { — End diff – Fair enough. The `OverWindowTest` should be sufficient. However, I would keep a few integration tests for the Table API as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112657301

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala —
          @@ -0,0 +1,528 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.api.scala.stream.table
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.apache.flink.table.utils.

          {StreamTableTestUtil, TableTestBase}

          +import org.junit.Test
          +
          +class OverWindowTest extends TableTestBase {
          — End diff –

          I think we should add a few more tests here:

          • Add two tests without following for row and range
          • Check that orderBy field is correctly validated (only rowtime, proctime)
          • Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions)
          • Check that preceding and following are of same type
          • Check that preceding and following are literals
          • any other validation checks?
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112657301 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala — @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils. {StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class OverWindowTest extends TableTestBase { — End diff – I think we should add a few more tests here: Add two tests without following for row and range Check that orderBy field is correctly validated (only rowtime, proctime) Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions) Check that preceding and following are of same type Check that preceding and following are literals any other validation checks?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112654288

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.getSqlAggFunction()
          +
          + val relDataType = relBuilder
          + .getTypeFactory.asInstanceOf[FlinkTypeFactory]
          + .createTypeFromTypeInfo(agg.resultType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime"))

          { + // for stream event-time + relBuilder.call(EventTimeExtractor) + }

          + else if (orderName.equalsIgnoreCase("proctime"))

          { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + }

          else

          { + // for batch event-time + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode, sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach {
          + x =>
          + val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
          + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType)

          { + throw ValidationException( + s"expression $partitionKey cannot be used as a partition key expression " + + "because it's not a valid key type which must be hashable and comparable") + }

          + partitionKeys.add(partitionKey)
          + }
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + bound: Literal,
          + sqlKind: SqlKind): RexWindowBound = {
          +
          + if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW)

          { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + }

          else if (bound == CURRENT_RANGE || bound == CURRENT_ROW)

          { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + }

          else

          { + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(Types.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(bound.value)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + }

          + }
          +
          + override private[flink] def children: Seq[Expression] = Seq(agg)
          +
          + override def toString = s"$

          {this.getClass.getCanonicalName}

          ($

          {overWindowAlias.toString}

          )"
          +
          + override private[flink] def resultType = agg.resultType
          +
          + override private[flink] def validateInput(): ValidationResult = {
          + var validationResult: ValidationResult = ValidationSuccess
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          + if (!orderName.equalsIgnoreCase("rowtime")
          — End diff –

          If we replace the `UnresolvedFieldReference` by a `RowTIme` or `ProcTime` expression, we should check here if it is a `TimeIndicator` or a valid field reference. This is the same check that we need to do for the `partitionBy` fields.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112654288 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.getSqlAggFunction() + + val relDataType = relBuilder + .getTypeFactory.asInstanceOf [FlinkTypeFactory] + .createTypeFromTypeInfo(agg.resultType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets: util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime")) { + // for stream event-time + relBuilder.call(EventTimeExtractor) + } + else if (orderName.equalsIgnoreCase("proctime")) { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + } else { + // for batch event-time + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode, sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach { + x => + val partitionKey = relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name) + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) { + throw ValidationException( + s"expression $partitionKey cannot be used as a partition key expression " + + "because it's not a valid key type which must be hashable and comparable") + } + partitionKeys.add(partitionKey) + } + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + bound: Literal, + sqlKind: SqlKind): RexWindowBound = { + + if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + } else { + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(Types.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(bound.value)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private [flink] def children: Seq [Expression] = Seq(agg) + + override def toString = s"$ {this.getClass.getCanonicalName} ($ {overWindowAlias.toString} )" + + override private [flink] def resultType = agg.resultType + + override private [flink] def validateInput(): ValidationResult = { + var validationResult: ValidationResult = ValidationSuccess + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + if (!orderName.equalsIgnoreCase("rowtime") — End diff – If we replace the `UnresolvedFieldReference` by a `RowTIme` or `ProcTime` expression, we should check here if it is a `TimeIndicator` or a valid field reference. This is the same check that we need to do for the `partitionBy` fields.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112653396

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.getSqlAggFunction()
          +
          + val relDataType = relBuilder
          + .getTypeFactory.asInstanceOf[FlinkTypeFactory]
          + .createTypeFromTypeInfo(agg.resultType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime"))

          { + // for stream event-time + relBuilder.call(EventTimeExtractor) + }

          + else if (orderName.equalsIgnoreCase("proctime"))

          { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + }

          else

          { + // for batch event-time + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode, sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach {
          + x =>
          + val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
          + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
          — End diff –

          move this check to `validateInput()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112653396 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.getSqlAggFunction() + + val relDataType = relBuilder + .getTypeFactory.asInstanceOf [FlinkTypeFactory] + .createTypeFromTypeInfo(agg.resultType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets: util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime")) { + // for stream event-time + relBuilder.call(EventTimeExtractor) + } + else if (orderName.equalsIgnoreCase("proctime")) { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + } else { + // for batch event-time + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode, sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach { + x => + val partitionKey = relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name) + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) { — End diff – move this check to `validateInput()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112650081

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala —
          @@ -82,3 +84,32 @@ object Session

          { */ def withGap(gap: String): SessionWindow = new SessionWindow(gap) }

          +
          +/**
          + * Helper object for creating a over window.
          + */
          +object Over {
          +
          + /**
          + * Specifies the time attribute on which rows are grouped.
          + *
          + * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
          + *
          + * For batch tables, refer to a timestamp or long attribute.
          + */
          + def orderBy(orderBy: Expression): OverWindowPredefined = {
          — End diff –

          The Java API is based on `String` not `Expression`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112650081 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala — @@ -82,3 +84,32 @@ object Session { */ def withGap(gap: String): SessionWindow = new SessionWindow(gap) } + +/** + * Helper object for creating a over window. + */ +object Over { + + /** + * Specifies the time attribute on which rows are grouped. + * + * For streaming tables call [ [orderBy 'rowtime or orderBy 'proctime] ] to specify time mode. + * + * For batch tables, refer to a timestamp or long attribute. + */ + def orderBy(orderBy: Expression): OverWindowPredefined = { — End diff – The Java API is based on `String` not `Expression`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112654550

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.getSqlAggFunction()
          +
          + val relDataType = relBuilder
          + .getTypeFactory.asInstanceOf[FlinkTypeFactory]
          + .createTypeFromTypeInfo(agg.resultType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime"))

          { + // for stream event-time + relBuilder.call(EventTimeExtractor) + }

          + else if (orderName.equalsIgnoreCase("proctime"))

          { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + }

          else

          { + // for batch event-time + relBuilder.field(orderName) + }

          +
          + orderKeys.add(new RexFieldCollation(rexNode, sets))
          +
          + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + overWindow.partitionBy.foreach {
          + x =>
          + val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
          + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType)

          { + throw ValidationException( + s"expression $partitionKey cannot be used as a partition key expression " + + "because it's not a valid key type which must be hashable and comparable") + }

          + partitionKeys.add(partitionKey)
          + }
          +
          + val preceding = overWindow.preceding.asInstanceOf[Literal]
          + val following = overWindow.following.asInstanceOf[Literal]
          +
          + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
          +
          + val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
          + val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
          +
          + rexBuilder.makeOver(
          + relDataType,
          + operator,
          + aggExprs,
          + partitionKeys,
          + orderKeys.build,
          + lowerBound,
          + upperBound,
          + isPhysical,
          + true,
          + false)
          + }
          +
          + private def createBound(
          + relBuilder: RelBuilder,
          + bound: Literal,
          + sqlKind: SqlKind): RexWindowBound = {
          +
          + if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW)

          { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + }

          else if (bound == CURRENT_RANGE || bound == CURRENT_ROW)

          { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + }

          else

          { + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(Types.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(bound.value)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + }

          + }
          +
          + override private[flink] def children: Seq[Expression] = Seq(agg)
          +
          + override def toString = s"$

          {this.getClass.getCanonicalName}

          ($

          {overWindowAlias.toString}

          )"
          +
          + override private[flink] def resultType = agg.resultType
          +
          + override private[flink] def validateInput(): ValidationResult = {
          + var validationResult: ValidationResult = ValidationSuccess
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          + if (!orderName.equalsIgnoreCase("rowtime")
          + && !orderName.equalsIgnoreCase("proctime")) {
          + ValidationFailure(
          + s"OrderBy expression must be ['rowtime] or ['proctime], but got ['$

          {orderName}

          ]")
          + }
          + if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
          — End diff –

          We need to check if `preceding` and `following` are of type `Literal` first.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112654550 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.getSqlAggFunction() + + val relDataType = relBuilder + .getTypeFactory.asInstanceOf [FlinkTypeFactory] + .createTypeFromTypeInfo(agg.resultType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets: util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime")) { + // for stream event-time + relBuilder.call(EventTimeExtractor) + } + else if (orderName.equalsIgnoreCase("proctime")) { + // for stream proc-time + relBuilder.call(ProcTimeExtractor) + } else { + // for batch event-time + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode, sets)) + + val partitionKeys: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + overWindow.partitionBy.foreach { + x => + val partitionKey = relBuilder.field(x.asInstanceOf [UnresolvedFieldReference] .name) + if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) { + throw ValidationException( + s"expression $partitionKey cannot be used as a partition key expression " + + "because it's not a valid key type which must be hashable and comparable") + } + partitionKeys.add(partitionKey) + } + + val preceding = overWindow.preceding.asInstanceOf [Literal] + val following = overWindow.following.asInstanceOf [Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf [RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + bound: Literal, + sqlKind: SqlKind): RexWindowBound = { + + if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + } else { + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(Types.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(bound.value)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private [flink] def children: Seq [Expression] = Seq(agg) + + override def toString = s"$ {this.getClass.getCanonicalName} ($ {overWindowAlias.toString} )" + + override private [flink] def resultType = agg.resultType + + override private [flink] def validateInput(): ValidationResult = { + var validationResult: ValidationResult = ValidationSuccess + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + if (!orderName.equalsIgnoreCase("rowtime") + && !orderName.equalsIgnoreCase("proctime")) { + ValidationFailure( + s"OrderBy expression must be ['rowtime] or ['proctime] , but got ['$ {orderName} ]") + } + if (!overWindow.preceding.asInstanceOf [Literal] .resultType.getClass — End diff – We need to check if `preceding` and `following` are of type `Literal` first.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112652207

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala —
          @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
          }

          /**
          + * Over expression for calcite over transform.
          + *
          + * @param agg over-agg expression
          + * @param overWindowAlias over window alias
          + * @param overWindow over window
          + */
          +case class OverCall(
          + agg: Aggregation,
          + overWindowAlias: Expression,
          + var overWindow: OverWindow = null) extends Expression {
          +
          + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
          +
          + val rexBuilder = relBuilder.getRexBuilder
          +
          + val operator: SqlAggFunction = agg.getSqlAggFunction()
          +
          + val relDataType = relBuilder
          + .getTypeFactory.asInstanceOf[FlinkTypeFactory]
          + .createTypeFromTypeInfo(agg.resultType)
          +
          + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
          + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
          +
          + aggExprs.add(relBuilder.field(aggChildName))
          +
          + val orderKeys: ImmutableList.Builder[RexFieldCollation] =
          + new ImmutableList.Builder[RexFieldCollation]()
          +
          + val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
          + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
          +
          + val rexNode =
          + if (orderName.equalsIgnoreCase("rowtime")) {
          — End diff –

          We cannot properly distinguish batch and streaming here.
          If a batch table has a column `rowtime`, this will fail.

          We could replace the `UnresolvedFieldReference` for `rowtime` / `proctime` by `RowTime` / `ProcTime` `TimeIndicator` expression in `ProjectionTranslator.translateOverWindow()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112652207 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala — @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq [Expression] ) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + override private [flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.getSqlAggFunction() + + val relDataType = relBuilder + .getTypeFactory.asInstanceOf [FlinkTypeFactory] + .createTypeFromTypeInfo(agg.resultType) + + val aggExprs: util.ArrayList [RexNode] = new util.ArrayList [RexNode] () + val aggChildName = agg.child.asInstanceOf [ResolvedFieldReference] .name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder [RexFieldCollation] = + new ImmutableList.Builder [RexFieldCollation] () + + val sets: util.HashSet [SqlKind] = new util.HashSet [SqlKind] () + val orderName = overWindow.orderBy.asInstanceOf [UnresolvedFieldReference] .name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime")) { — End diff – We cannot properly distinguish batch and streaming here. If a batch table has a column `rowtime`, this will fail. We could replace the `UnresolvedFieldReference` for `rowtime` / `proctime` by `RowTime` / `ProcTime` `TimeIndicator` expression in `ProjectionTranslator.translateOverWindow()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3743#discussion_r112655769

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala —
          @@ -221,6 +221,23 @@ object ProjectionTranslator

          { projectList }

          + def translateOverWindows(
          + exprs: Seq[Expression],
          + overWindows: Array[OverWindow]): Seq[Expression] = {
          — End diff –

          Add `tableEnv` parameter to identify whether this is a batch or a streaming query.
          In case of a streaming query, check that `overwindow.orderBy` is an `UnresolvedFieldReference` and translates `rowtime` to a `RowTime` expression and `proctime` to a `ProcTime` expression.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112655769 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala — @@ -221,6 +221,23 @@ object ProjectionTranslator { projectList } + def translateOverWindows( + exprs: Seq [Expression] , + overWindows: Array [OverWindow] ): Seq [Expression] = { — End diff – Add `tableEnv` parameter to identify whether this is a batch or a streaming query. In case of a streaming query, check that `overwindow.orderBy` is an `UnresolvedFieldReference` and translates `rowtime` to a `RowTime` expression and `proctime` to a `ProcTime` expression.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3743

          @fhueske Thanks a lot for your comments and suggestions. And I like the idea of move all validations into `validateInput` method. I have updated the PR with following changes:
          1.Check that orderBy field is correctly validated (only rowtime, proctime)
          2.Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions)
          3.Check that preceding and following are of same type
          4.Check that preceding and following are literals
          5.Check that preceding value is > 0
          6.Check that following value is >= -1
          7.Move all validations into `validateInput` method.
          BTW. In this PR we only add batch check in the `table.scala#window(overWindows: OverWindow*)` method.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3743 @fhueske Thanks a lot for your comments and suggestions. And I like the idea of move all validations into `validateInput` method. I have updated the PR with following changes: 1.Check that orderBy field is correctly validated (only rowtime, proctime) 2.Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions) 3.Check that preceding and following are of same type 4.Check that preceding and following are literals 5.Check that preceding value is > 0 6.Check that following value is >= -1 7.Move all validations into `validateInput` method. BTW. In this PR we only add batch check in the `table.scala#window(overWindows: OverWindow*)` method. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3743

          Hi @sunjincheng121,

          thanks for the update! I was working on some refactoring of this PR as well to get the validation logic clean, but I think your approach is much nicer.
          I'll will rebase my changes on top of yours and will open a PR against your PR branch soon.

          Cheers, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3743 Hi @sunjincheng121, thanks for the update! I was working on some refactoring of this PR as well to get the validation logic clean, but I think your approach is much nicer. I'll will rebase my changes on top of yours and will open a PR against your PR branch soon. Cheers, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3743

          Hi @fhueske, Thanks for everything you did. I have merged the changes, and I have moved validations of `partitionBy` into `validateInput` method and added two tests about partitionBy:

          • TestPartitionByWithUnresolved
          • TestPartitionByWithNotKeyType

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3743 Hi @fhueske, Thanks for everything you did. I have merged the changes, and I have moved validations of `partitionBy` into `validateInput` method and added two tests about partitionBy: TestPartitionByWithUnresolved TestPartitionByWithNotKeyType Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3743

          Thanks for the update @sunjincheng121!

          Will merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3743 Thanks for the update @sunjincheng121! Will merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3743

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3743
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with fe018921ed0b24f94ab2139f04293d6074ce4fba

          Show
          fhueske Fabian Hueske added a comment - Implemented with fe018921ed0b24f94ab2139f04293d6074ce4fba

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development