Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6124

support max/min aggregations for string type

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Recently when I port some query to Flink SQL, I found currently min/max aggregations on string type is not supported and should be added.
      When min/max aggregations are used on string column, return min/max value by lexicographically order.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user docete opened a pull request:

          https://github.com/apache/flink/pull/3579

          FLINK-6124 [Table API & SQL] support max/min aggregations for strin…

          currently min/max aggregations on string type is not supported and should be added.
          When min/max aggregations are used on string column, return min/max value by lexicographically order.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/docete/flink master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3579.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3579


          commit d4c8efd34c2b1b26318452082d8c2ab3925a0411
          Author: Zhenghua Gao <docete@gmail.com>
          Date: 2017-03-20T11:29:31Z

          FLINK-6124 [Table API & SQL] support max/min aggregations for string type


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user docete opened a pull request: https://github.com/apache/flink/pull/3579 FLINK-6124 [Table API & SQL] support max/min aggregations for strin… currently min/max aggregations on string type is not supported and should be added. When min/max aggregations are used on string column, return min/max value by lexicographically order. You can merge this pull request into a Git repository by running: $ git pull https://github.com/docete/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3579 commit d4c8efd34c2b1b26318452082d8c2ab3925a0411 Author: Zhenghua Gao <docete@gmail.com> Date: 2017-03-20T11:29:31Z FLINK-6124 [Table API & SQL] support max/min aggregations for string type
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3579#discussion_r107069266

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -916,6 +916,10 @@ object AggregateUtil {
          new DecimalMinAggFunction
          case BOOLEAN =>
          new BooleanMinAggFunction
          + case VARCHAR =>
          + new StringMinAggFunction
          + case CHAR =>
          — End diff –

          I do not think we are currently support CHAR type. For instance, I think if you try 'CHAR.max, you will see problems.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3579#discussion_r107069266 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -916,6 +916,10 @@ object AggregateUtil { new DecimalMinAggFunction case BOOLEAN => new BooleanMinAggFunction + case VARCHAR => + new StringMinAggFunction + case CHAR => — End diff – I do not think we are currently support CHAR type. For instance, I think if you try 'CHAR.max, you will see problems.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3579#discussion_r107081399

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -916,6 +916,10 @@ object AggregateUtil {
          new DecimalMinAggFunction
          case BOOLEAN =>
          new BooleanMinAggFunction
          + case VARCHAR =>
          + new StringMinAggFunction
          + case CHAR =>
          — End diff –

          for 1 and 2, i will add max/minWithRetractAggFunction, ut, and integration test.
          for 3, we do support CHAR type in SQL, not in table api.

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on a diff in the pull request: https://github.com/apache/flink/pull/3579#discussion_r107081399 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -916,6 +916,10 @@ object AggregateUtil { new DecimalMinAggFunction case BOOLEAN => new BooleanMinAggFunction + case VARCHAR => + new StringMinAggFunction + case CHAR => — End diff – for 1 and 2, i will add max/minWithRetractAggFunction, ut, and integration test. for 3, we do support CHAR type in SQL, not in table api.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3579#discussion_r107104451

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -916,6 +916,10 @@ object AggregateUtil {
          new DecimalMinAggFunction
          case BOOLEAN =>
          new BooleanMinAggFunction
          + case VARCHAR =>
          + new StringMinAggFunction
          + case CHAR =>
          — End diff –

          The char type occurs if you use literals in SQL e.g. "SELECT 'hello' " but the runtime should actually treat char and varchar the same as they are both strings.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3579#discussion_r107104451 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -916,6 +916,10 @@ object AggregateUtil { new DecimalMinAggFunction case BOOLEAN => new BooleanMinAggFunction + case VARCHAR => + new StringMinAggFunction + case CHAR => — End diff – The char type occurs if you use literals in SQL e.g. "SELECT 'hello' " but the runtime should actually treat char and varchar the same as they are both strings.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3579

          Thanks @docete. The changes look good. I will add the new aggregations to an existing IT cases and merge this afterwards.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3579 Thanks @docete. The changes look good. I will add the new aggregations to an existing IT cases and merge this afterwards.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3579

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3579
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 17dd915e8a18e60fa32ada9500d3632ba162720a

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 17dd915e8a18e60fa32ada9500d3632ba162720a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3579

          @twalthr, I saw you have removed the CHAR type when you merge this. Just want to clarify. In FLINK-3916, in method typeInfoToSqlTypeName, you have added
          ```
          case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
          throw TableException("Character type is not supported.")
          ```
          TableAPI is open for user to pass CHAR type data,
          ```
          val data = List(
          (1L, 1, 1d, 1f, new BigDecimal("1"), 'a', "Hi")
          ...
          .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
          ...
          select('char.count)
          ```
          I am curious the reason why we do not support CHAR type explicitly.

          Also, as I suggested early, we'd better to have all build-in aggregates to support the retraction method. We should add string type for Max/MinWithRetractAggFunction. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3579 @twalthr, I saw you have removed the CHAR type when you merge this. Just want to clarify. In FLINK-3916 , in method typeInfoToSqlTypeName, you have added ``` case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => throw TableException("Character type is not supported.") ``` TableAPI is open for user to pass CHAR type data, ``` val data = List( (1L, 1, 1d, 1f, new BigDecimal("1"), 'a', "Hi") ... .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) ... select('char.count) ``` I am curious the reason why we do not support CHAR type explicitly. Also, as I suggested early, we'd better to have all build-in aggregates to support the retraction method. We should add string type for Max/MinWithRetractAggFunction. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3579

          @shaoxuan-wang I haven't removed the char type, I just inlined the two cases. There is a difference between char type information and Calcites CHAR type. We don't support single characters such as 'a' but since the SQL parser translates fixed length strings to the Calcite CHAR type, we have to do the case distinction in the aggregate classes. But both CHAR and VARCHAR are treated like string.

          I agree we should also support the retraction case, thanks for the hint.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3579 @shaoxuan-wang I haven't removed the char type, I just inlined the two cases. There is a difference between char type information and Calcites CHAR type. We don't support single characters such as 'a' but since the SQL parser translates fixed length strings to the Calcite CHAR type, we have to do the case distinction in the aggregate classes. But both CHAR and VARCHAR are treated like string. I agree we should also support the retraction case, thanks for the hint.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3593

          FLINK-6124 [table] Add min/max string aggregation with retracion

          This PR adds the missing retraction aggregations for strings.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-6124_2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3593.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3593


          commit c1aa7b234c21fbc847159b049196df4d6d0d8ea5
          Author: twalthr <twalthr@apache.org>
          Date: 2017-03-22T09:03:56Z

          FLINK-6124 [table] Add min/max string aggregation with retracion


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3593 FLINK-6124 [table] Add min/max string aggregation with retracion This PR adds the missing retraction aggregations for strings. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6124 _2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3593 commit c1aa7b234c21fbc847159b049196df4d6d0d8ea5 Author: twalthr <twalthr@apache.org> Date: 2017-03-22T09:03:56Z FLINK-6124 [table] Add min/max string aggregation with retracion
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3579

          @shaoxuan-wang I have implemented the missing retraction functions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3579 @shaoxuan-wang I have implemented the missing retraction functions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3593#discussion_r107410733

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala —
          @@ -212,3 +212,11 @@ class DecimalMinWithRetractAggFunction extends MinWithRetractAggFunction[BigDeci
          override def getInitValue: BigDecimal = BigDecimal.ZERO
          override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
          }
          +
          +/**
          + * Built-in String Min with retraction aggregate function
          + */
          +class StringMinWithRetractAggFunction extends MinWithRetractAggFunction[String] {
          + override def getInitValue: String = ""
          — End diff –

          remove ": String"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3593#discussion_r107410733 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala — @@ -212,3 +212,11 @@ class DecimalMinWithRetractAggFunction extends MinWithRetractAggFunction[BigDeci override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Min with retraction aggregate function + */ +class StringMinWithRetractAggFunction extends MinWithRetractAggFunction [String] { + override def getInitValue: String = "" — End diff – remove ": String"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3593#discussion_r107410682

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -201,14 +201,22 @@ class DoubleMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Double]

          • Built-in Boolean Max with retraction aggregate function
            */
            class BooleanMaxWithRetractAggFunction extends MaxWithRetractAggFunction[Boolean] { - override def getInitValue = false + override def getInitValue: Boolean = false override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO }

          /**

          • Built-in Big Decimal Max with retraction aggregate function
            */
            class DecimalMaxWithRetractAggFunction extends MaxWithRetractAggFunction[BigDecimal] { - override def getInitValue = BigDecimal.ZERO + override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO }

            +
            +/**
            + * Built-in String Max with retraction aggregate function
            + */
            +class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction[String] {
            + override def getInitValue: String = ""

              • End diff –

          remove ": String"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3593#discussion_r107410682 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -201,14 +201,22 @@ class DoubleMaxWithRetractAggFunction extends MaxWithRetractAggFunction [Double] Built-in Boolean Max with retraction aggregate function */ class BooleanMaxWithRetractAggFunction extends MaxWithRetractAggFunction [Boolean] { - override def getInitValue = false + override def getInitValue: Boolean = false override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO } /** Built-in Big Decimal Max with retraction aggregate function */ class DecimalMaxWithRetractAggFunction extends MaxWithRetractAggFunction [BigDecimal] { - override def getInitValue = BigDecimal.ZERO + override def getInitValue: BigDecimal = BigDecimal.ZERO override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO } + +/** + * Built-in String Max with retraction aggregate function + */ +class StringMaxWithRetractAggFunction extends MaxWithRetractAggFunction [String] { + override def getInitValue: String = "" End diff – remove ": String"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3579

          @twalthr thanks for the reply and explanation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3579 @twalthr thanks for the reply and explanation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3593

          Thanks for the hint @shaoxuan-wang. I added the tests and will merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3593 Thanks for the hint @shaoxuan-wang. I added the tests and will merge this.
          Hide
          twalthr Timo Walther added a comment -

          Added retraction support in: c5282cbcf898b99593ca753fce7557bae1ae09aa

          Show
          twalthr Timo Walther added a comment - Added retraction support in: c5282cbcf898b99593ca753fce7557bae1ae09aa
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3593

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3593

            People

            • Assignee:
              docete Zhenghua Gao
              Reporter:
              docete Zhenghua Gao
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development