Uploaded image for project: 'Calcite'
  1. Calcite
  2. CALCITE-1803

Push Project that follows Aggregate down to Druid

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.11.0
    • Fix Version/s: 1.14.0
    • Component/s: druid
    • Labels:
      None

      Description

      Druid post aggregations are not supported when parsing SQL queries. By implementing post aggregations, we can offload some computation to the druid cluster rather than aggregate on the client side.

      Example usage:
      SELECT SUM("column1") - SUM("column2") FROM "table";
      This query will be parsed into two separate Druid aggregations according to current rules. Then the results will be subtracted in Calcite. By using the postAggregations field in the druid query, the subtraction could be done in Druid cluster. Although the previous example is simple, the difference will be obvious when the number of result rows are large. (Multiple rows result will happen when group by is used).
      Questions:
      After I push Post aggregation into Druid query, what should I change on the project relational correlation? In the case of the example above, the BindableProject will have the expression to representation the subtraction. If I push the post aggregation into druid query, the expression of subtraction should be replaced by the representation of the post aggregations result. For now, the project expression seems can only point to the aggregations results. Since post aggregations have to point to aggregations results too, it could not be placed in the parallel level as aggregation. Where should I put post aggregations?

        Issue Links

          Activity

          Hide
          julianhyde Julian Hyde added a comment -

          Do you mind if we change the terminology? "Post aggregation" suggests aggregation that happens after something. But I think you mean "Post-aggregation projects". Or in simpler English, "Projects after aggregation".

          To answer your question: You will need to have a DruidQuery that contains a Scan followed by an Aggregate followed by a Project.

          Currently DruidProjectRule will not allow the Project to be pushed in, because "sap" (scan, aggregate, project) is not a valid signature according to DruidQuery.VALID_SIG. But you should make it valid.

          I'm curious:

          • Does Druid allow filters after aggregation? (I.e. HAVING)
          • I know that Druid allows sort after aggregation. But is this before or after the post-aggregation projects?
          Show
          julianhyde Julian Hyde added a comment - Do you mind if we change the terminology? "Post aggregation" suggests aggregation that happens after something. But I think you mean "Post-aggregation projects". Or in simpler English, "Projects after aggregation". To answer your question: You will need to have a DruidQuery that contains a Scan followed by an Aggregate followed by a Project. Currently DruidProjectRule will not allow the Project to be pushed in, because "sap" (scan, aggregate, project) is not a valid signature according to DruidQuery.VALID_SIG. But you should make it valid. I'm curious: Does Druid allow filters after aggregation? (I.e. HAVING) I know that Druid allows sort after aggregation. But is this before or after the post-aggregation projects?
          Hide
          axeisghost Junxian Wu added a comment -

          Yes, "Post Aggregation" means doing something after aggregation. Here, I am referring to the "post-aggregator" in druid query.(http://druid.io/docs/0.10.0/querying/post-aggregations.html) By implementing this, Calcite can push more things into the Druid query.
          The post aggregation in Druid is similar to the idea of "Projects after aggregation" you mentioned in Calcite, but it also has consequences when I try to implement it.

          If I change the DruidProjectRule and allow the project to be pushed into Druid Query as a post-aggregator, the result of the Druid query will contain the result of the project (Minus operation project). Then the project should point to the result of the post-aggregator in the Druid query, but for now, it seems the project can only point to the result of an aggregator in a druid query.

          For your questions:
          1. Filter cannot be done after aggregation because "fields" in druid filter cannot refer to the aggregated columns.
          2. The "ordering" should be the implementation of sort in Druid, and both aggregator and post aggregator have "ordering" field. I think the sort could be done in post-aggregation because that's what the "ordering" field in post-aggregator does.

          Show
          axeisghost Junxian Wu added a comment - Yes, "Post Aggregation" means doing something after aggregation. Here, I am referring to the "post-aggregator" in druid query.( http://druid.io/docs/0.10.0/querying/post-aggregations.html ) By implementing this, Calcite can push more things into the Druid query. The post aggregation in Druid is similar to the idea of "Projects after aggregation" you mentioned in Calcite, but it also has consequences when I try to implement it. If I change the DruidProjectRule and allow the project to be pushed into Druid Query as a post-aggregator, the result of the Druid query will contain the result of the project (Minus operation project). Then the project should point to the result of the post-aggregator in the Druid query, but for now, it seems the project can only point to the result of an aggregator in a druid query. For your questions: 1. Filter cannot be done after aggregation because "fields" in druid filter cannot refer to the aggregated columns. 2. The "ordering" should be the implementation of sort in Druid, and both aggregator and post aggregator have "ordering" field. I think the sort could be done in post-aggregation because that's what the "ordering" field in post-aggregator does.
          Hide
          julianhyde Julian Hyde added a comment -

          Somewhat off-topic, but I'll note that we're being held back by Druid's query language. Druid can (I assume) implement a pipeline of (project, filter, aggregate, sort, limit) relational operators, and Calcite can represent that, but Druid's query language can't express it.

          MongoDB added an aggregation pipeline capability a while back, and it was really useful.

          Show
          julianhyde Julian Hyde added a comment - Somewhat off-topic, but I'll note that we're being held back by Druid's query language. Druid can (I assume) implement a pipeline of (project, filter, aggregate, sort, limit) relational operators, and Calcite can represent that, but Druid's query language can't express it. MongoDB added an aggregation pipeline capability a while back, and it was really useful.
          Hide
          bslim slim bouguerra added a comment -

          Junxian Wu druid support the equivalent of sql having as per the doc

          Show
          bslim slim bouguerra added a comment - Junxian Wu druid support the equivalent of sql having as per the doc
          Hide
          bslim slim bouguerra added a comment -

          In fact we can support the push of project over aggregate to druid and use post aggregates druid operators. Junxian Wu you are on the good track, using druid post aggregate we can leverage more cases like the one you mentioned or even something like AVG which can be decomposed to SUM and Count then do the Sum/Count at the post aggregate level.

          Show
          bslim slim bouguerra added a comment - In fact we can support the push of project over aggregate to druid and use post aggregates druid operators. Junxian Wu you are on the good track, using druid post aggregate we can leverage more cases like the one you mentioned or even something like AVG which can be decomposed to SUM and Count then do the Sum/Count at the post aggregate level.
          Hide
          julianhyde Julian Hyde added a comment -

          We already have a JIRA case for HAVING (what I guess you Druid folks would call a post-aggregation filter): CALCITE-1206.

          I am going to re-title this case to just focus on post-aggregation project.

          Show
          julianhyde Julian Hyde added a comment - We already have a JIRA case for HAVING (what I guess you Druid folks would call a post-aggregation filter): CALCITE-1206 . I am going to re-title this case to just focus on post-aggregation project.
          Hide
          julianhyde Julian Hyde added a comment -

          slim bouguerra, If you specify both a project and a filter post-aggregation, which happens first?

          In SQL, the filter (HAVING clause) occurs before project (SELECT), but the filter and project can contain arbitrary expressions. E.g.

          SELECT deptno, COUNT(*) FROM Emp GROUP BY deptno HAVING SUM(sal) + 3 * SUM(commission) > 1000

          Since Druid's expression language is more limited (and the limitations are different in different contexts) then we might have trouble with expressions like that.

          Show
          julianhyde Julian Hyde added a comment - slim bouguerra , If you specify both a project and a filter post-aggregation, which happens first? In SQL, the filter (HAVING clause) occurs before project (SELECT), but the filter and project can contain arbitrary expressions. E.g. SELECT deptno, COUNT(*) FROM Emp GROUP BY deptno HAVING SUM(sal) + 3 * SUM(commission) > 1000 Since Druid's expression language is more limited (and the limitations are different in different contexts) then we might have trouble with expressions like that.
          Hide
          bslim slim bouguerra added a comment -

          Julian Hyde the having is applied to individual group by results before we trigger the post aggregate.
          in fact having is a simple filter on the top of single aggregate results thus it can not be used for arbitrary expressions. There is some work in order to enable such arbitrary expressions via virtual columns but we are not there yet.
          Hence the bottom line we can push some of the project after aggregates by using post aggregate function.

          Show
          bslim slim bouguerra added a comment - Julian Hyde the having is applied to individual group by results before we trigger the post aggregate. in fact having is a simple filter on the top of single aggregate results thus it can not be used for arbitrary expressions. There is some work in order to enable such arbitrary expressions via virtual columns but we are not there yet. Hence the bottom line we can push some of the project after aggregates by using post aggregate function.
          Hide
          julianhyde Julian Hyde added a comment - - edited

          Since Calcite rewrites AVG(x) to SUM(x) / COUNT(x), this change should enable

          SELECT "store_state", AVG("unit_sales") FROM "foodmart" GROUP BY "store_state"

          to be pushed down to Druid in its entirety.

          Show
          julianhyde Julian Hyde added a comment - - edited Since Calcite rewrites AVG(x) to SUM(x) / COUNT(x) , this change should enable SELECT "store_state" , AVG( "unit_sales" ) FROM "foodmart" GROUP BY "store_state" to be pushed down to Druid in its entirety.
          Hide
          axeisghost Junxian Wu added a comment - - edited

          I updated the ticket with a new pull request in public github repo.
          Description and detailed solution can be viewed from the https://github.com/apache/calcite/pull/471.
          Please let me know anything on your guys mind.

          Show
          axeisghost Junxian Wu added a comment - - edited I updated the ticket with a new pull request in public github repo. Description and detailed solution can be viewed from the https://github.com/apache/calcite/pull/471 . Please let me know anything on your guys mind.
          Hide
          axeisghost Junxian Wu added a comment -

          I updated the ticket after CALC-1805 get merged in.
          Bascially since

          COUNT(X)
          

          will not be pushed down to DruidQuery as a Aggregate any more, the average function will always be taken care out of the DruidQuery too, so the current implementation of AVG() will not be related to Post aggregator any more. I changed the test cases according to the new logic.
          Please review anytime.

          Show
          axeisghost Junxian Wu added a comment - I updated the ticket after CALC-1805 get merged in. Bascially since COUNT(X) will not be pushed down to DruidQuery as a Aggregate any more, the average function will always be taken care out of the DruidQuery too, so the current implementation of AVG() will not be related to Post aggregator any more. I changed the test cases according to the new logic. Please review anytime.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Junxian Wu, sorry I was working on other issues and it took a bit of time to get back to this and check the latest changes.

          Latest PR LGTM, I have pushed it in http://git-wip-us.apache.org/repos/asf/calcite/commit/9a5cd27. Thanks for your contribution!

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Junxian Wu , sorry I was working on other issues and it took a bit of time to get back to this and check the latest changes. Latest PR LGTM, I have pushed it in http://git-wip-us.apache.org/repos/asf/calcite/commit/9a5cd27 . Thanks for your contribution!
          Hide
          julianhyde Julian Hyde added a comment -

          Junxian Wu, Coverity discovered an issue with your code. Please review and correct if necessary.

          *** CID 150320:  Control flow issues  (MISSING_BREAK)
          /druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java: 353 in org.apache.calcite.adapter.druid.DruidConnectionImpl.parseField(java.util.List, java.util.List, int, org.apache.calcite.interpreter.Row$RowBuilder, com.fasterxml.jackson.core.JsonParser)()
          347             case PRIMITIVE_SHORT:
          348             case INTEGER:
          349             case PRIMITIVE_INT:
          350               if (s.equals("Infinity") || s.equals("-Infinity") || s.equals("NaN")) {
          351                 throw new RuntimeException("/ by zero");
          352               }
             CID 150320:  Control flow issues  (MISSING_BREAK)
             The above case falls through to this one.
          353             case FLOAT:
          354             case PRIMITIVE_FLOAT:
          355             case PRIMITIVE_DOUBLE:
          356             case NUMBER:
          357             case DOUBLE:
          358               if (s.equals("Infinity")) {
          
          
          Show
          julianhyde Julian Hyde added a comment - Junxian Wu , Coverity discovered an issue with your code. Please review and correct if necessary. *** CID 150320: Control flow issues (MISSING_BREAK) /druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java: 353 in org.apache.calcite.adapter.druid.DruidConnectionImpl.parseField(java.util.List, java.util.List, int, org.apache.calcite.interpreter.Row$RowBuilder, com.fasterxml.jackson.core.JsonParser)() 347 case PRIMITIVE_SHORT: 348 case INTEGER: 349 case PRIMITIVE_INT: 350 if (s.equals("Infinity") || s.equals("-Infinity") || s.equals("NaN")) { 351 throw new RuntimeException("/ by zero"); 352 } CID 150320: Control flow issues (MISSING_BREAK) The above case falls through to this one. 353 case FLOAT: 354 case PRIMITIVE_FLOAT: 355 case PRIMITIVE_DOUBLE: 356 case NUMBER: 357 case DOUBLE: 358 if (s.equals("Infinity")) {
          Hide
          axeisghost Junxian Wu added a comment -

          Julian Hyde Thank you for letting me know. Here should be a //fallthrough comment like the one above. Since when the returning value is String but not String that indicates non-numeric value, it should be processed like what Druid did before, which is the default branch doing. I added on the one below but not here.
          Should I open another PR with this comment? I am not sure how to push commit to a closed PR.

          Show
          axeisghost Junxian Wu added a comment - Julian Hyde Thank you for letting me know. Here should be a //fallthrough comment like the one above. Since when the returning value is String but not String that indicates non-numeric value, it should be processed like what Druid did before, which is the default branch doing. I added on the one below but not here. Should I open another PR with this comment? I am not sure how to push commit to a closed PR.
          Hide
          julianhyde Julian Hyde added a comment -

          Probably a "// fall through", yes. But also clean up the logic; it doesn't make sense to check the same conditions twice.

          Either a new PR, or attach a patch to this case. (We usually require a PR, but this is a small change.)

          Show
          julianhyde Julian Hyde added a comment - Probably a "// fall through", yes. But also clean up the logic; it doesn't make sense to check the same conditions twice. Either a new PR, or attach a patch to this case. (We usually require a PR, but this is a small change.)
          Hide
          axeisghost Junxian Wu added a comment - - edited

          I think the problem here is that if the first check in INT case failed (does not throw exception) then it will fallthrough to DOUBLE one and it is not right to check it again.
          So for the fix, I might do this:

          switch
          cases for INT:
            if (Inf || -Inf || NaN) {
               throw
            }
            rowbuilder.set(i,s)
            break
          cases for DOUBLE:
            like before
            rowbuilder.set(i,s)
            break
            //fallthrough
           default {
             break
           }
          

          Moving the rowBuilder.set inside each switch case instead inside default make it possible to break in each branch.
          Will that work?

          Show
          axeisghost Junxian Wu added a comment - - edited I think the problem here is that if the first check in INT case failed (does not throw exception) then it will fallthrough to DOUBLE one and it is not right to check it again. So for the fix, I might do this: switch cases for INT: if (Inf || -Inf || NaN) { throw } rowbuilder.set(i,s) break cases for DOUBLE: like before rowbuilder.set(i,s) break //fallthrough default { break } Moving the rowBuilder.set inside each switch case instead inside default make it possible to break in each branch. Will that work?
          Hide
          julianhyde Julian Hyde added a comment -

          Or maybe move rowBuilder.set outside of the switch.

          Show
          julianhyde Julian Hyde added a comment - Or maybe move rowBuilder.set outside of the switch.
          Hide
          axeisghost Junxian Wu added a comment -

          It probably will not work when rowBuilder.set is outside. Since each branch of condition in side DOUBLE case will set the i-th column of row with corresponding Double value, but the rowBuilder.set(i,s) is setting i-th column to s which is a String. It is not possible to change the type of s to Double and assign Double value I think, so we need to do the rowBuilder inside the case. If leave it outside, after DOUBLE case, it will run rowBuilder.set twice and the String format of s will be set.
          Or is it possible to set S a more generic type that can be either String or Double? then I can assign Double value to s, then rowBuilder.set can be outside of the switch.

          Show
          axeisghost Junxian Wu added a comment - It probably will not work when rowBuilder.set is outside. Since each branch of condition in side DOUBLE case will set the i-th column of row with corresponding Double value, but the rowBuilder.set(i,s) is setting i-th column to s which is a String. It is not possible to change the type of s to Double and assign Double value I think, so we need to do the rowBuilder inside the case. If leave it outside, after DOUBLE case, it will run rowBuilder.set twice and the String format of s will be set. Or is it possible to set S a more generic type that can be either String or Double? then I can assign Double value to s, then rowBuilder.set can be outside of the switch.
          Hide
          julianhyde Julian Hyde added a comment -

          I agree, rowBuilder.set(i, Double.NaN) and the 2 similar statements need to remain. Do whatever you think is clearest. What we currently have is not clear IMHO because it looks as if the author might have forgotten a "break".

          Show
          julianhyde Julian Hyde added a comment - I agree, rowBuilder.set(i, Double.NaN) and the 2 similar statements need to remain. Do whatever you think is clearest. What we currently have is not clear IMHO because it looks as if the author might have forgotten a "break".
          Hide
          axeisghost Junxian Wu added a comment - - edited

          Thanks. I just pushed the fix of the flow control as a new PR. Please check.

          Show
          axeisghost Junxian Wu added a comment - - edited Thanks. I just pushed the fix of the flow control as a new PR . Please check.
          Hide
          julianhyde Julian Hyde added a comment -

          Thanks for the clean-ups; I've committed them as http://git-wip-us.apache.org/repos/asf/calcite/commit/cbbe627f.

          Show
          julianhyde Julian Hyde added a comment - Thanks for the clean-ups; I've committed them as http://git-wip-us.apache.org/repos/asf/calcite/commit/cbbe627f .

            People

            • Assignee:
              axeisghost Junxian Wu
              Reporter:
              axeisghost Junxian Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 336h
                336h
                Remaining:
                Remaining Estimate - 336h
                336h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development