Uploaded image for project: 'Tajo'
  1. Tajo
  2. TAJO-601

Improve distinct aggregation query processing

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: Planner/Optimizer
    • Labels:
      None

      Description

      Currently, distinct aggregation queries are executed as follows:

      • the first stage: it just shuffles tuples by hashing grouping keys.
      • the second stage: it sorts them and executes sort aggregation.

      This way executes queries including distinct aggregation functions with only two stages. But, it leads to large intermediate data during shuffle phase.

      This kind of query can be rewritten as two queries:

      original query
      SELECT grp1, grp2, count(*) as total, count(distinct grp3) as distinct_col from rel1 group by grp1, grp2;
      
      rewritten query
      SELECT grp1, grp2, sum(cnt) as total, count(grp3) as distinct_col from (
        SELECT grp1, grp2, grp3, count(*) as cnt from rel1 group by grp1, grp2, grp3) tmp1 group by grp1, grp2
      ) table1;
      

      I'm expecting that this rewrite will significantly reduce the intermediate data volume and query response time in most cases.

      1. TAJO-601_140220_142800.patch
        83 kB
        Hyunsik Choi
      2. TAJO-601.patch
        83 kB
        Hyunsik Choi

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Tajo-master-build #72 (See https://builds.apache.org/job/Tajo-master-build/72/)
          TAJO-601: Improve distinct aggregation query processing. (hyunsik: https://git-wip-us.apache.org/repos/asf?p=incubator-tajo.git&a=commit&h=6053ad11efc31ed25c6ea8be10d4ce967c34dda3)

          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation1.sql
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation4.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregationWithHaving1.result
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation3.sql
          • tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregationWithHaving1.sql
          • tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation1.result
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testCountDistinct2.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
          • tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregationWithUnion1.sql
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregationWithUnion1.result
          • tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation4.sql
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
          • tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation3.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
          • CHANGES.txt
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
          • tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testCountDistinct.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation2.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
          • tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation5.result
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation2.sql
          • tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation5.sql
          • tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
          • tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-build #72 (See https://builds.apache.org/job/Tajo-master-build/72/ ) TAJO-601 : Improve distinct aggregation query processing. (hyunsik: https://git-wip-us.apache.org/repos/asf?p=incubator-tajo.git&a=commit&h=6053ad11efc31ed25c6ea8be10d4ce967c34dda3 ) tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation1.sql tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation4.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregationWithHaving1.result tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation3.sql tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregationWithHaving1.sql tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation1.result tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testCountDistinct2.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregationWithUnion1.sql tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregationWithUnion1.result tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation4.sql tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation3.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java CHANGES.txt tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java tajo-common/src/main/java/org/apache/tajo/util/TUtil.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testCountDistinct.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation2.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java tajo-core/tajo-core-backend/src/test/resources/results/TestGroupByQuery/testDistinctAggregation5.result tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation2.sql tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation5.sql tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
          Hide
          hyunsik Hyunsik Choi added a comment -

          This issue got +1 on RB. committed to master branch.

          Show
          hyunsik Hyunsik Choi added a comment - This issue got +1 on RB. committed to master branch.
          Hide
          hyunsik Hyunsik Choi added a comment -

          rebased against the latest revision.

          Updated the review request against branch master in reviewboard
          https://reviews.apache.org/r/18210/

          Show
          hyunsik Hyunsik Choi added a comment - rebased against the latest revision. Updated the review request against branch master in reviewboard https://reviews.apache.org/r/18210/
          Hide
          hyunsik Hyunsik Choi added a comment -

          The current approach has shown poor performance. You can see the current approach in the description of this issue.

          This patch improves the performance of distinct aggregation. Unlike the current approach, in the this patch, GlobalPlanner builds three phase plan using two hash shuffles. Then, GlobalPlanner adds an enforcer of sort aggregation to the final execution block. As a result, it can reduce significantly intermediate data volume according to the cardinality of grouping columns.

          This patch also allows Tajo to support multiple distinct functions. For example, the following query works well.

           
          select l_orderkey, count(distinct l_partkey), sum(distinct l_partkey) from lineitem group by l_orderkey;
          

          But, the current patch still has some limitations. The above query includes there are two count distinct functions: count(distinct), sum(distinct). They use the same distinct column 'l_partkey', so it works well. In contrast, the following case where there are two or more distinct columns is not supported yet.

          select l_orderkey, count(distinct l_partkey), sum(distinct l_linenumber) from lineitem group by l_orderkey;
          

          If you submit such a query, you will see the following messages: "different DISTINCT columns are not supported yet: l_partkey, l_linenumber". In order to support this kind of queries, we need additional physical executors. I'll add this feature later in another Jira issue.

          Show
          hyunsik Hyunsik Choi added a comment - The current approach has shown poor performance. You can see the current approach in the description of this issue. This patch improves the performance of distinct aggregation. Unlike the current approach, in the this patch, GlobalPlanner builds three phase plan using two hash shuffles. Then, GlobalPlanner adds an enforcer of sort aggregation to the final execution block. As a result, it can reduce significantly intermediate data volume according to the cardinality of grouping columns. This patch also allows Tajo to support multiple distinct functions. For example, the following query works well. select l_orderkey, count(distinct l_partkey), sum(distinct l_partkey) from lineitem group by l_orderkey; But, the current patch still has some limitations. The above query includes there are two count distinct functions: count(distinct), sum(distinct). They use the same distinct column 'l_partkey', so it works well. In contrast, the following case where there are two or more distinct columns is not supported yet. select l_orderkey, count(distinct l_partkey), sum(distinct l_linenumber) from lineitem group by l_orderkey; If you submit such a query, you will see the following messages: "different DISTINCT columns are not supported yet: l_partkey, l_linenumber". In order to support this kind of queries, we need additional physical executors. I'll add this feature later in another Jira issue.
          Hide
          hyunsik Hyunsik Choi added a comment - - edited

          Created a review request against branch master in reviewboard :
          https://reviews.apache.org/r/18210/

          Show
          hyunsik Hyunsik Choi added a comment - - edited Created a review request against branch master in reviewboard : https://reviews.apache.org/r/18210/

            People

            • Assignee:
              hyunsik Hyunsik Choi
              Reporter:
              hyunsik Hyunsik Choi
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development