Uploaded image for project: 'Tajo'
  1. Tajo
  2. TAJO-1010

Improve multiple DISTINCT aggregation.

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: Planner/Optimizer
    • Labels:
      None

      Description

      Currently, tajo provides three stage for optimizing distinct query aggregation. But it just supports one column for distinct aggregation as follows:

      Query1
      select a.flag, count(distinct a.id) as cnt, sum(distinct a.id) as total
      from table1
      group by a.flag
      

      If you write two more columns for distinct aggregation, you can't apply optimized distinct aggregation as follows:

      Query2
      select a.flag, count(distinct a.id) as cnt, sum(distinct a.id) as total
      , count(distinct a.name) as cnt2, count(distinct a.code) as cnt3
      from table1
      group by a.flag
      

      In this case, you may see low performance for your query. Thus, we need to improve multiple DISTINCT aggregation. Correctly, we should support three stage for multiple DISTINCT aggregation.

        Issue Links

          Activity

          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Tajo-block_iteration-branch-build #15 (See https://builds.apache.org/job/Tajo-block_iteration-branch-build/15/)
          TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3)

          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
          • tajo-core/src/main/proto/TajoWorkerProtocol.proto
          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
          • tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
          • tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
          • tajo-common/src/main/java/org/apache/tajo/SessionVars.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
          • tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
          • CHANGES
          • tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
          • tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Tajo-block_iteration-branch-build #15 (See https://builds.apache.org/job/Tajo-block_iteration-branch-build/15/ ) TAJO-1010 : Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3) tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result tajo-core/src/main/proto/TajoWorkerProtocol.proto tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result tajo-common/src/main/java/org/apache/tajo/SessionVars.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java CHANGES tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Tajo-master-CODEGEN-build #38 (See https://builds.apache.org/job/Tajo-master-CODEGEN-build/38/)
          TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3)

          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
          • tajo-common/src/main/java/org/apache/tajo/SessionVars.java
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
          • tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
          • tajo-core/src/main/proto/TajoWorkerProtocol.proto
          • tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
          • tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
          • CHANGES
          • tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
          • tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-CODEGEN-build #38 (See https://builds.apache.org/job/Tajo-master-CODEGEN-build/38/ ) TAJO-1010 : Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3) tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java tajo-common/src/main/java/org/apache/tajo/SessionVars.java tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql tajo-core/src/main/proto/TajoWorkerProtocol.proto tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java CHANGES tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Tajo-master-build #396 (See https://builds.apache.org/job/Tajo-master-build/396/)
          TAJO-1010: Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3)

          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
          • tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
          • CHANGES
          • tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
          • tajo-common/src/main/java/org/apache/tajo/SessionVars.java
          • tajo-core/src/main/proto/TajoWorkerProtocol.proto
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql
          • tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result
          • tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result
          • tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql
          • tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
          • tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
          • tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-build #396 (See https://builds.apache.org/job/Tajo-master-build/396/ ) TAJO-1010 : Improve multiple DISTINCT aggregation. (Hyoungjun Kim and jaehwa) (blrunner: rev 0dfa3972c6a52d785b8e55f91d0906456a3926b3) tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java tajo-core/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java CHANGES tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result tajo-common/src/main/java/org/apache/tajo/SessionVars.java tajo-core/src/main/proto/TajoWorkerProtocol.proto tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/DistinctGroupbyNode.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case10.sql tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case9.result tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result tajo-core/src/test/resources/results/TestGroupByQuery/testDistinctAggregation8.result tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation_case9.sql tajo-core/src/test/resources/queries/TestGroupByQuery/testDistinctAggregation8.sql tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/tajo/pull/136

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/tajo/pull/136
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-58210574

          +1 Ship it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-58210574 +1 Ship it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-58142276

          Hi @hyunsik

          Thank you for your review. I also agree with your opinion.
          I've just rebased the patch and remove unnecessary comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-58142276 Hi @hyunsik Thank you for your review. I also agree with your opinion. I've just rebased the patch and remove unnecessary comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-58134795

          Although I tried to give some advice for comments, I couldn't spend time on it now.

          However, this issue was scheduled to 0.9.0, and I think this improvement is important in 0.9.0. So, I think that it is hard to delay the commit of this issue to master.branch.

          This patch already looks good and ready to be committed to master. So, I propose that we commit it now and then revise the comment later.

          Could you rebase it against the latest patch? If so, I'll finish the review on this patch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-58134795 Although I tried to give some advice for comments, I couldn't spend time on it now. However, this issue was scheduled to 0.9.0, and I think this improvement is important in 0.9.0. So, I think that it is hard to delay the commit of this issue to master.branch. This patch already looks good and ready to be committed to master. So, I propose that we commit it now and then revise the comment later. Could you rebase it against the latest patch? If so, I'll finish the review on this patch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-57928001

          I'll give more comments soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-57928001 I'll give more comments soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/136#discussion_r18433188

          — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java —
          @@ -1044,20 +1046,80 @@ public PhysicalExec createWindowAgg(TaskAttemptContext context,WindowAggNode win
          public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context,
          DistinctGroupbyNode distinctNode, PhysicalExec subOp)
          throws IOException {
          +// Enforcer enforcer = context.getEnforcer();
          — End diff –

          Please remove the commented out lines.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/136#discussion_r18433188 — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java — @@ -1044,20 +1046,80 @@ public PhysicalExec createWindowAgg(TaskAttemptContext context,WindowAggNode win public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context, DistinctGroupbyNode distinctNode, PhysicalExec subOp) throws IOException { +// Enforcer enforcer = context.getEnforcer(); — End diff – Please remove the commented out lines.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/136#discussion_r18262290

          — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java —
          @@ -816,9 +820,30 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann
          if (grpNode.getType() == NodeType.GROUP_BY)

          { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; }

          else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {

          • hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
            + // Find current distinct stage node.
            + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
            + if (distinctNode == null) {
            + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
            + distinctNode = (DistinctGroupbyNode)grpNode;
              • End diff –

          I found a bug on production cluster. So, I had to add above codes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on a diff in the pull request: https://github.com/apache/tajo/pull/136#discussion_r18262290 — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java — @@ -816,9 +820,30 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann if (grpNode.getType() == NodeType.GROUP_BY) { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0; + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)grpNode; End diff – I found a bug on production cluster. So, I had to add above codes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-55750897

          Could you elaborate more three phases in each physical executor?

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-55750897 Could you elaborate more three phases in each physical executor?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/136#discussion_r17584429

          — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java —
          @@ -816,9 +820,30 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann
          if (grpNode.getType() == NodeType.GROUP_BY)

          { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; }

          else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {

          • hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0;
            + // Find current distinct stage node.
            + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
            + if (distinctNode == null) {
            + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
            + distinctNode = (DistinctGroupbyNode)grpNode;
              • End diff –

          Is this normal case? Otherwise, is it some potential bug case that currently you cannot ensure?

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/136#discussion_r17584429 — Diff: tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java — @@ -816,9 +820,30 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann if (grpNode.getType() == NodeType.GROUP_BY) { hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { hasGroupColumns = ((DistinctGroupbyNode)grpNode).getGroupingColumns().length > 0; + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)grpNode; End diff – Is this normal case? Otherwise, is it some potential bug case that currently you cannot ensure?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-55363575

          Hi @hyunsik

          Thank you for your review.
          I've just added the description at operators.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-55363575 Hi @hyunsik Thank you for your review. I've just added the description at operators.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/136#discussion_r17463756

          — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java —
          @@ -0,0 +1,300 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.tajo.engine.planner.physical;
          +
          +import org.apache.commons.logging.Log;
          +import org.apache.commons.logging.LogFactory;
          +import org.apache.tajo.catalog.Column;
          +import org.apache.tajo.datum.NullDatum;
          +import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
          +import org.apache.tajo.engine.function.FunctionContext;
          +import org.apache.tajo.engine.planner.Target;
          +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
          +import org.apache.tajo.engine.planner.logical.GroupbyNode;
          +import org.apache.tajo.storage.Tuple;
          +import org.apache.tajo.storage.VTuple;
          +import org.apache.tajo.worker.TaskAttemptContext;
          +
          +import java.io.IOException;
          +import java.util.*;
          +
          +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
          + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
          + private DistinctGroupbyNode plan;
          + private PhysicalExec child;
          +
          + private boolean finished = false;
          +
          + private DistinctFinalAggregator[] aggregators;
          + private DistinctFinalAggregator nonDistinctAggr;
          +
          + private int resultTupleLength;
          + private int numGroupingColumns;
          +
          + private int[] resultTupleIndexes;
          +
          + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
          + throws IOException

          { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + }

          +
          + @Override
          + public void init() throws IOException {
          + this.child.init();
          +
          + numGroupingColumns = plan.getGroupingColumns().length;
          + resultTupleLength = numGroupingColumns;
          +
          + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
          +
          + List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>();
          + int inTupleIndex = 1 + numGroupingColumns;
          + int outTupleIndex = numGroupingColumns;
          + int distinctSeq = 0;
          +
          + for (GroupbyNode eachGroupby : groupbyNodes) {
          + if (eachGroupby.isDistinct())

          { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + }

          else

          { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + }

          + resultTupleLength += eachGroupby.getAggFunctions().length;
          + }
          + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
          +
          + // make output schema mapping index
          + resultTupleIndexes = new int[outSchema.size()];
          + Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>();
          + int resultTupleIndex = 0;
          + for (Column eachColumn: plan.getGroupingColumns())

          { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + }

          + for (GroupbyNode eachGroupby : groupbyNodes) {
          + Set<Column> groupingColumnSet = new HashSet<Column>();
          + for (Column column: eachGroupby.getGroupingColumns())

          { + groupingColumnSet.add(column); + }

          + for (Target eachTarget: eachGroupby.getTargets()) {
          + if (!groupingColumnSet.contains(eachTarget.getNamedColumn()))

          { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + }

          + }
          + }
          +
          + int index = 0;
          + for (Column eachOutputColumn: outSchema.getColumns()) {
          + // If column is avg aggregation function, outschema's column type is float
          + // but groupbyResultTupleIndex's column type is protobuf
          +
          + int matchedIndex = -1;
          + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
          + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName()))

          { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + }

          + }
          + if (matchedIndex < 0)

          { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + }

          + resultTupleIndexes[matchedIndex] = index++;
          + }
          + }
          +
          + Tuple prevKeyTuple = null;
          + Tuple prevTuple = null;
          +
          + @Override
          + public Tuple next() throws IOException {
          + if (finished)

          { + return null; + }

          +
          + Tuple resultTuple = new VTuple(resultTupleLength);
          +
          + while (!context.isStopped()) {
          + Tuple childTuple = child.next();
          + // Last tuple
          + if (childTuple == null) {
          + finished = true;
          +
          + if (prevTuple == null) {
          + // Empty case
          + if (numGroupingColumns == 0)

          { + // No grouping column, return null tuple + return makeEmptyTuple(); + }

          else

          { + return null; + }

          + }
          +
          + for (int i = 0; i < numGroupingColumns; i++)

          { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + }

          + for (DistinctFinalAggregator eachAggr: aggregators)

          { + eachAggr.terminate(resultTuple); + }

          + break;
          + }
          +
          + Tuple tuple = null;
          + try

          { + tuple = childTuple.clone(); + }

          catch (CloneNotSupportedException e)

          { + throw new IOException(e.getMessage(), e); + }

          +
          + int distinctSeq = tuple.get(0).asInt2();
          — End diff –

          I expected that the maximum number of distinct aggregation functions will not overtake short int. If it is necessary, we should describe it to users later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on a diff in the pull request: https://github.com/apache/tajo/pull/136#discussion_r17463756 — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java — @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.Target; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; + +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private DistinctFinalAggregator[] aggregators; + private DistinctFinalAggregator nonDistinctAggr; + + private int resultTupleLength; + private int numGroupingColumns; + + private int[] resultTupleIndexes; + + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + resultTupleLength = numGroupingColumns; + + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + + List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>(); + int inTupleIndex = 1 + numGroupingColumns; + int outTupleIndex = numGroupingColumns; + int distinctSeq = 0; + + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + } else { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + } + resultTupleLength += eachGroupby.getAggFunctions().length; + } + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + + // make output schema mapping index + resultTupleIndexes = new int [outSchema.size()] ; + Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>(); + int resultTupleIndex = 0; + for (Column eachColumn: plan.getGroupingColumns()) { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + } + for (GroupbyNode eachGroupby : groupbyNodes) { + Set<Column> groupingColumnSet = new HashSet<Column>(); + for (Column column: eachGroupby.getGroupingColumns()) { + groupingColumnSet.add(column); + } + for (Target eachTarget: eachGroupby.getTargets()) { + if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + } + } + } + + int index = 0; + for (Column eachOutputColumn: outSchema.getColumns()) { + // If column is avg aggregation function, outschema's column type is float + // but groupbyResultTupleIndex's column type is protobuf + + int matchedIndex = -1; + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) { + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + } + } + if (matchedIndex < 0) { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + } + resultTupleIndexes [matchedIndex] = index++; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple resultTuple = new VTuple(resultTupleLength); + + while (!context.isStopped()) { + Tuple childTuple = child.next(); + // Last tuple + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + if (numGroupingColumns == 0) { + // No grouping column, return null tuple + return makeEmptyTuple(); + } else { + return null; + } + } + + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); — End diff – I expected that the maximum number of distinct aggregation functions will not overtake short int. If it is necessary, we should describe it to users later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/136#issuecomment-55274187

          It's great work. The patch looks great to me. In addition, the algorithm is very well defined and code is very clean. It's readability is very nice.

          But, there is lack of documentation to explain overall three steps of distinct aggregation. Only someone who already knows the algorithm can understand the source code. DistinctGroupbyBuilder may be good place to have the description.

          Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/136#issuecomment-55274187 It's great work. The patch looks great to me. In addition, the algorithm is very well defined and code is very clean. It's readability is very nice. But, there is lack of documentation to explain overall three steps of distinct aggregation. Only someone who already knows the algorithm can understand the source code. DistinctGroupbyBuilder may be good place to have the description. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/136#discussion_r17423223

          — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java —
          @@ -0,0 +1,300 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.tajo.engine.planner.physical;
          +
          +import org.apache.commons.logging.Log;
          +import org.apache.commons.logging.LogFactory;
          +import org.apache.tajo.catalog.Column;
          +import org.apache.tajo.datum.NullDatum;
          +import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
          +import org.apache.tajo.engine.function.FunctionContext;
          +import org.apache.tajo.engine.planner.Target;
          +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
          +import org.apache.tajo.engine.planner.logical.GroupbyNode;
          +import org.apache.tajo.storage.Tuple;
          +import org.apache.tajo.storage.VTuple;
          +import org.apache.tajo.worker.TaskAttemptContext;
          +
          +import java.io.IOException;
          +import java.util.*;
          +
          +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
          + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
          + private DistinctGroupbyNode plan;
          + private PhysicalExec child;
          +
          + private boolean finished = false;
          +
          + private DistinctFinalAggregator[] aggregators;
          + private DistinctFinalAggregator nonDistinctAggr;
          +
          + private int resultTupleLength;
          + private int numGroupingColumns;
          +
          + private int[] resultTupleIndexes;
          +
          + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
          + throws IOException

          { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + }

          +
          + @Override
          + public void init() throws IOException {
          + this.child.init();
          +
          + numGroupingColumns = plan.getGroupingColumns().length;
          + resultTupleLength = numGroupingColumns;
          +
          + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
          +
          + List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>();
          + int inTupleIndex = 1 + numGroupingColumns;
          + int outTupleIndex = numGroupingColumns;
          + int distinctSeq = 0;
          +
          + for (GroupbyNode eachGroupby : groupbyNodes) {
          + if (eachGroupby.isDistinct())

          { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + }

          else

          { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + }

          + resultTupleLength += eachGroupby.getAggFunctions().length;
          + }
          + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
          +
          + // make output schema mapping index
          + resultTupleIndexes = new int[outSchema.size()];
          + Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>();
          + int resultTupleIndex = 0;
          + for (Column eachColumn: plan.getGroupingColumns())

          { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + }

          + for (GroupbyNode eachGroupby : groupbyNodes) {
          + Set<Column> groupingColumnSet = new HashSet<Column>();
          + for (Column column: eachGroupby.getGroupingColumns())

          { + groupingColumnSet.add(column); + }

          + for (Target eachTarget: eachGroupby.getTargets()) {
          + if (!groupingColumnSet.contains(eachTarget.getNamedColumn()))

          { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + }

          + }
          + }
          +
          + int index = 0;
          + for (Column eachOutputColumn: outSchema.getColumns()) {
          + // If column is avg aggregation function, outschema's column type is float
          + // but groupbyResultTupleIndex's column type is protobuf
          +
          + int matchedIndex = -1;
          + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
          + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName()))

          { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + }

          + }
          + if (matchedIndex < 0)

          { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + }

          + resultTupleIndexes[matchedIndex] = index++;
          + }
          + }
          +
          + Tuple prevKeyTuple = null;
          + Tuple prevTuple = null;
          +
          + @Override
          + public Tuple next() throws IOException {
          + if (finished)

          { + return null; + }

          +
          + Tuple resultTuple = new VTuple(resultTupleLength);
          +
          + while (!context.isStopped()) {
          + Tuple childTuple = child.next();
          + // Last tuple
          + if (childTuple == null) {
          + finished = true;
          +
          + if (prevTuple == null) {
          + // Empty case
          + if (numGroupingColumns == 0)

          { + // No grouping column, return null tuple + return makeEmptyTuple(); + }

          else

          { + return null; + }

          + }
          +
          + for (int i = 0; i < numGroupingColumns; i++)

          { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + }

          + for (DistinctFinalAggregator eachAggr: aggregators)

          { + eachAggr.terminate(resultTuple); + }

          + break;
          + }
          +
          + Tuple tuple = null;
          + try

          { + tuple = childTuple.clone(); + }

          catch (CloneNotSupportedException e)

          { + throw new IOException(e.getMessage(), e); + }

          +
          + int distinctSeq = tuple.get(0).asInt2();
          — End diff –

          Is the maximum number of distinct aggregation functions in a SQL block is 2^16-1? It's just wondering. If so, I'll describe it in Tajo user guide later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/136#discussion_r17423223 — Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java — @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.engine.eval.AggregationFunctionCallEval; +import org.apache.tajo.engine.function.FunctionContext; +import org.apache.tajo.engine.planner.Target; +import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.*; + +public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class); + private DistinctGroupbyNode plan; + private PhysicalExec child; + + private boolean finished = false; + + private DistinctFinalAggregator[] aggregators; + private DistinctFinalAggregator nonDistinctAggr; + + private int resultTupleLength; + private int numGroupingColumns; + + private int[] resultTupleIndexes; + + public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) + throws IOException { + super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); + this.plan = plan; + this.child = sortExec; + } + + @Override + public void init() throws IOException { + this.child.init(); + + numGroupingColumns = plan.getGroupingColumns().length; + resultTupleLength = numGroupingColumns; + + List<GroupbyNode> groupbyNodes = plan.getGroupByNodes(); + + List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>(); + int inTupleIndex = 1 + numGroupingColumns; + int outTupleIndex = numGroupingColumns; + int distinctSeq = 0; + + for (GroupbyNode eachGroupby : groupbyNodes) { + if (eachGroupby.isDistinct()) { + aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby)); + distinctSeq++; + + Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); + inTupleIndex += distinctGroupingColumns.length; + outTupleIndex += eachGroupby.getAggFunctions().length; + } else { + nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby); + outTupleIndex += eachGroupby.getAggFunctions().length; + } + resultTupleLength += eachGroupby.getAggFunctions().length; + } + aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{}); + + // make output schema mapping index + resultTupleIndexes = new int [outSchema.size()] ; + Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column, Integer>(); + int resultTupleIndex = 0; + for (Column eachColumn: plan.getGroupingColumns()) { + groupbyResultTupleIndex.put(eachColumn, resultTupleIndex); + resultTupleIndex++; + } + for (GroupbyNode eachGroupby : groupbyNodes) { + Set<Column> groupingColumnSet = new HashSet<Column>(); + for (Column column: eachGroupby.getGroupingColumns()) { + groupingColumnSet.add(column); + } + for (Target eachTarget: eachGroupby.getTargets()) { + if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) { + //aggr function + groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex); + resultTupleIndex++; + } + } + } + + int index = 0; + for (Column eachOutputColumn: outSchema.getColumns()) { + // If column is avg aggregation function, outschema's column type is float + // but groupbyResultTupleIndex's column type is protobuf + + int matchedIndex = -1; + for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) { + if (eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName())) { + matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn); + break; + } + } + if (matchedIndex < 0) { + throw new IOException("Can't find proper output column mapping: " + eachOutputColumn); + } + resultTupleIndexes [matchedIndex] = index++; + } + } + + Tuple prevKeyTuple = null; + Tuple prevTuple = null; + + @Override + public Tuple next() throws IOException { + if (finished) { + return null; + } + + Tuple resultTuple = new VTuple(resultTupleLength); + + while (!context.isStopped()) { + Tuple childTuple = child.next(); + // Last tuple + if (childTuple == null) { + finished = true; + + if (prevTuple == null) { + // Empty case + if (numGroupingColumns == 0) { + // No grouping column, return null tuple + return makeEmptyTuple(); + } else { + return null; + } + } + + for (int i = 0; i < numGroupingColumns; i++) { + resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1)); + } + for (DistinctFinalAggregator eachAggr: aggregators) { + eachAggr.terminate(resultTuple); + } + break; + } + + Tuple tuple = null; + try { + tuple = childTuple.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + + int distinctSeq = tuple.get(0).asInt2(); — End diff – Is the maximum number of distinct aggregation functions in a SQL block is 2^16-1? It's just wondering. If so, I'll describe it in Tajo user guide later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user blrunner opened a pull request:

          https://github.com/apache/tajo/pull/136

          TAJO-1010: Improve multiple DISTINCT aggregation. (hyoungjun, jaehwa)

          Tajo supports various options for count distinct. Current option is to execute a count distinct query with two execution blocks. It made by DistinctGroupbyBuilder::buildPlan. But now, new option is to execute the query with three execution blocks. You can use this option for set SessionVars.COUNT_DISTINCT_ALGORITHM to three_stages.

          • In first stage, tajo operator incremented each row to more rows by grouping columns. In addition, the operator must creates each row because of aggregation non-distinct columns.
          • In second stage, tajo operator aggregates the output of the first stage. For reference, it shuffled by grouping columns and aggregation columns.
          • In third stage, tajo operator merges the output of the second stage. For reference, it shuffled by just grouping columns.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/blrunner/tajo TAJO-1010

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/tajo/pull/136.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #136


          commit 615d84f13e8dd496c9c096cf2eeb6f7e3e16dfa2
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-09-11T06:30:31Z

          TAJO-1010: Improve multiple DISTINCT aggregation. (hyoungjun, jaehwa)


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user blrunner opened a pull request: https://github.com/apache/tajo/pull/136 TAJO-1010 : Improve multiple DISTINCT aggregation. (hyoungjun, jaehwa) Tajo supports various options for count distinct. Current option is to execute a count distinct query with two execution blocks. It made by DistinctGroupbyBuilder::buildPlan. But now, new option is to execute the query with three execution blocks. You can use this option for set SessionVars.COUNT_DISTINCT_ALGORITHM to three_stages. In first stage, tajo operator incremented each row to more rows by grouping columns. In addition, the operator must creates each row because of aggregation non-distinct columns. In second stage, tajo operator aggregates the output of the first stage. For reference, it shuffled by grouping columns and aggregation columns. In third stage, tajo operator merges the output of the second stage. For reference, it shuffled by just grouping columns. You can merge this pull request into a Git repository by running: $ git pull https://github.com/blrunner/tajo TAJO-1010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #136 commit 615d84f13e8dd496c9c096cf2eeb6f7e3e16dfa2 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-09-11T06:30:31Z TAJO-1010 : Improve multiple DISTINCT aggregation. (hyoungjun, jaehwa)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner closed the pull request at:

          https://github.com/apache/tajo/pull/126

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner closed the pull request at: https://github.com/apache/tajo/pull/126
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on the pull request:

          https://github.com/apache/tajo/pull/126#issuecomment-55224128

          Hi @hyunsik

          I'll create this issue again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on the pull request: https://github.com/apache/tajo/pull/126#issuecomment-55224128 Hi @hyunsik I'll create this issue again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/126#issuecomment-55072740

          Is it ready to be reviewed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/126#issuecomment-55072740 Is it ready to be reviewed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user blrunner opened a pull request:

          https://github.com/apache/tajo/pull/126

          TAJO-1010: Improve multiple DISTINCT aggregation.

          Tajo supports various options for count distinct. Current option is to execute a count distinct query with two execution blocks. It made by DistinctGroupbyBuilder::buildPlan. But now, new option is to execute the query with three execution blocks. You can use this option for set SessionVars.COUNT_DISTINCT_ALGORITHM to three_stages.

          • In first stage, tajo operator incremented each row to more rows by grouping columns. In addition, the operator must creates each row because of aggregation non-distinct columns.
          • In second stage, tajo operator aggregates the output of the first stage. For reference, it shuffled by grouping columns and aggregation columns.
          • In third stage, tajo operator merges the output of the second stage. For reference, it shuffled by just grouping columns.

          For reference, this patch need to implement empty input data handling function and union with distinct count.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/blrunner/tajo TAJO-1010

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/tajo/pull/126.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #126


          commit 7c98709f0fcb06dfb675acae3d6489a6126f55b5
          Author: jinossy <jinossy@gmail.com>
          Date: 2014-08-06T08:43:35Z

          TAJO-995: HiveMetaStoreClient wrapper should retry the connection

          commit 415d0867ae4a4543f47360294bead1fc7f41e292
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-10T06:07:24Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo

          commit 7a7b4fd26f61df89cacdb4fc41faf9c2abe456b2
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-11T02:28:48Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo

          commit 45f5ed3adba931f4706f26dda1d3c03240ee11d3
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-11T05:40:25Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo

          commit aa01e83859ef553ac4eb90c1678e3bc6be20c6c9
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-18T09:56:24Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo

          commit 18cc27a64ee081dcd02af389e44db5d7ecfa1017
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-20T03:15:37Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010

          commit 68b593b9462534951e3a8f7f4b0a7d2a3a16ae0d
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-22T06:58:52Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010

          commit 2a97e00b70e4300067ab6758525ee7a541bff14a
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-22T07:16:48Z

          Added DistinctNullDatum

          commit b14d25a894cfcd4db566058a21b1a5762e39a525
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-22T07:54:34Z

          Fixed DistinctNullDatum Error.

          commit e3ab71ef11e3400c665ba687e0e26d4c7b424888
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-22T08:04:51Z

          Added MetaDataTuple::IsDistinctNull.

          commit 7323279a138502b840c912536dd2f9f7658fee4d
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-25T07:05:59Z

          Implemented DistinctGroupbyIntermediateAggregationExec.

          commit 27fa0c6a7b17296b7cad03d5d00ed081c54bac26
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-25T07:07:00Z

          Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010

          commit ccd01c5bdfb15c713102c1742c0763e177efc45b
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-25T08:35:46Z

          Remove unused code.

          commit 13bc50bde92f4bf309d94e623cea5eda5cda69db
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-25T09:21:59Z

          Implemented DistinctGroupbyInitWriterExec.

          commit 82edd24ced63484e7d50a79540ef50e0d1f0b5db
          Author: Jaehwa Jung <blrunner@apache.org>
          Date: 2014-08-26T18:59:51Z

          Implemented operators for count distinct three stages.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user blrunner opened a pull request: https://github.com/apache/tajo/pull/126 TAJO-1010 : Improve multiple DISTINCT aggregation. Tajo supports various options for count distinct. Current option is to execute a count distinct query with two execution blocks. It made by DistinctGroupbyBuilder::buildPlan. But now, new option is to execute the query with three execution blocks. You can use this option for set SessionVars.COUNT_DISTINCT_ALGORITHM to three_stages. In first stage, tajo operator incremented each row to more rows by grouping columns. In addition, the operator must creates each row because of aggregation non-distinct columns. In second stage, tajo operator aggregates the output of the first stage. For reference, it shuffled by grouping columns and aggregation columns. In third stage, tajo operator merges the output of the second stage. For reference, it shuffled by just grouping columns. For reference, this patch need to implement empty input data handling function and union with distinct count. You can merge this pull request into a Git repository by running: $ git pull https://github.com/blrunner/tajo TAJO-1010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/126.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #126 commit 7c98709f0fcb06dfb675acae3d6489a6126f55b5 Author: jinossy <jinossy@gmail.com> Date: 2014-08-06T08:43:35Z TAJO-995 : HiveMetaStoreClient wrapper should retry the connection commit 415d0867ae4a4543f47360294bead1fc7f41e292 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-10T06:07:24Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo commit 7a7b4fd26f61df89cacdb4fc41faf9c2abe456b2 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-11T02:28:48Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo commit 45f5ed3adba931f4706f26dda1d3c03240ee11d3 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-11T05:40:25Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo commit aa01e83859ef553ac4eb90c1678e3bc6be20c6c9 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-18T09:56:24Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo commit 18cc27a64ee081dcd02af389e44db5d7ecfa1017 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-20T03:15:37Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010 commit 68b593b9462534951e3a8f7f4b0a7d2a3a16ae0d Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-22T06:58:52Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010 commit 2a97e00b70e4300067ab6758525ee7a541bff14a Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-22T07:16:48Z Added DistinctNullDatum commit b14d25a894cfcd4db566058a21b1a5762e39a525 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-22T07:54:34Z Fixed DistinctNullDatum Error. commit e3ab71ef11e3400c665ba687e0e26d4c7b424888 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-22T08:04:51Z Added MetaDataTuple::IsDistinctNull. commit 7323279a138502b840c912536dd2f9f7658fee4d Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-25T07:05:59Z Implemented DistinctGroupbyIntermediateAggregationExec. commit 27fa0c6a7b17296b7cad03d5d00ed081c54bac26 Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-25T07:07:00Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1010 commit ccd01c5bdfb15c713102c1742c0763e177efc45b Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-25T08:35:46Z Remove unused code. commit 13bc50bde92f4bf309d94e623cea5eda5cda69db Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-25T09:21:59Z Implemented DistinctGroupbyInitWriterExec. commit 82edd24ced63484e7d50a79540ef50e0d1f0b5db Author: Jaehwa Jung <blrunner@apache.org> Date: 2014-08-26T18:59:51Z Implemented operators for count distinct three stages.

            People

            • Assignee:
              blrunner Jaehwa Jung
              Reporter:
              blrunner Jaehwa Jung
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development