Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5722

Implement DISTINCT as dedicated operator

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      DISTINCT is currently implemented for batch Table API / SQL as an aggregate which groups on all fields. Grouped aggregates are implemented as GroupReduce with sort-based combiner.

      This operator can be more efficiently implemented by using ReduceFunction and hinting a HashCombine strategy. The same ReduceFunction can be used for all DISTINCT operations and can be assigned with appropriate forward field annotations.

      We would need a custom conversion rule which translates distinct aggregations (grouping on all fields and returning all fields) into a custom DataSetRelNode.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3471

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3471
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 7a629fc59ff206ba51f22e1bf35fe50882e63538

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 7a629fc59ff206ba51f22e1bf35fe50882e63538
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3471

          Thanks @fhueske. LGTM, merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3471 Thanks @fhueske. LGTM, merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3471

          Oh, i think maybe i didn't check all the IT cases. 5 tests are enough.
          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3471 Oh, i think maybe i didn't check all the IT cases. 5 tests are enough. +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3471

          Thanks for the review @KurtYoung. I think we should be fine regarding the tests. `DistinctReduce` is called in 2 tests in `AggregationsITCase` (`testDistinct()` and `testDistinctAfterAggregate`) and in the union tests in the Table API and SQL `SetOperatorsITCase`. That are 5 tests. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3471 Thanks for the review @KurtYoung. I think we should be fine regarding the tests. `DistinctReduce` is called in 2 tests in `AggregationsITCase` (`testDistinct()` and `testDistinctAfterAggregate`) and in the union tests in the Table API and SQL `SetOperatorsITCase`. That are 5 tests. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3471#discussion_r104613673

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -42,6 +41,14 @@ class DataSetAggregateRule
          return false
          }

          + // distinct is translated into dedicated operator
          + if (agg.getAggCallList.isEmpty &&
          + agg.getGroupCount == agg.getRowType.getFieldCount &&
          — End diff –

          During testing I observed a case where a projection wasn't pushed down.
          Without this check, the grouping would happen on a subset of fields and only those would be emitted. It is not possible to change the input and output types of a ReduceFunction (which is used to implement Distinct in a hash-combinable way), I check that the input and output types are identical.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3471#discussion_r104613673 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -42,6 +41,14 @@ class DataSetAggregateRule return false } + // distinct is translated into dedicated operator + if (agg.getAggCallList.isEmpty && + agg.getGroupCount == agg.getRowType.getFieldCount && — End diff – During testing I observed a case where a projection wasn't pushed down. Without this check, the grouping would happen on a subset of fields and only those would be emitted. It is not possible to change the input and output types of a ReduceFunction (which is used to implement Distinct in a hash-combinable way), I check that the input and output types are identical.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3471#discussion_r104606232

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -42,6 +41,14 @@ class DataSetAggregateRule
          return false
          }

          + // distinct is translated into dedicated operator
          + if (agg.getAggCallList.isEmpty &&
          + agg.getGroupCount == agg.getRowType.getFieldCount &&
          — End diff –

          Why we should check the group count and row type here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3471#discussion_r104606232 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -42,6 +41,14 @@ class DataSetAggregateRule return false } + // distinct is translated into dedicated operator + if (agg.getAggCallList.isEmpty && + agg.getGroupCount == agg.getRowType.getFieldCount && — End diff – Why we should check the group count and row type here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/3471

          FLINK-5722 [table] Add dedicated DataSetDistinct operator.

          A dedicated DISTINCT operator is more efficient because we can use a `ReduceFunction` which (optionally) support hash-based combining.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink tableDistinct

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3471.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3471



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/3471 FLINK-5722 [table] Add dedicated DataSetDistinct operator. A dedicated DISTINCT operator is more efficient because we can use a `ReduceFunction` which (optionally) support hash-based combining. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableDistinct Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3471

            People

            • Assignee:
              fhueske Fabian Hueske
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development