Uploaded image for project: 'Calcite'
  1. Calcite
  2. CALCITE-1828

Push the FILTER clause into Druid as a Filtered Aggregator

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.12.0
    • Fix Version/s: 1.14.0
    • Component/s: druid
    • Labels:
      None

      Description

      Druid has support for a special aggregator it calls the Filtered Aggregator that allows aggregations to occur with filters independent to other filters in the Druid query.

      An example where the filtered aggregator is useful:

      SELECT 
      sum("col1") FILTER (WHERE <condition1>),
      sum("col2") FILTER (WHERE <condition2>)
      FROM "table"; 
      

      Currently, calcite will scan Druid, then do the filtering and aggregation itself. With filtered aggregators, both the filter and aggregation and be pushed into Druid.

      A few comments/questions:

      1) If all conditions in the filter clause are the same, then instead of pushing filtered aggregators individually, it would make more sense to push 1 single filter into the Druid query. I.e the filters can be factored out into 1 filter. I don't see calcite currently do this, does it have such a rule in place already?

      2) The filters can/should only be pushed if they are filtering on dimension columns

      3) Currently, the above query would create the following relation:
      DruidQuery -> Project -> Aggregate. There is already a rule called DruidAggregateProjectRule which matches the previous relation. Is it better to add logic to that rule, or to create a new rule that also matches that relation?

        Issue Links

          Activity

          Hide
          michaelmior Michael Mior added a comment -

          Resolved in release 1.14.0 (2017-10-01)

          Show
          michaelmior Michael Mior added a comment - Resolved in release 1.14.0 (2017-10-01)
          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Fixed in http://git-wip-us.apache.org/repos/asf/calcite/commit/551b562 , thanks Zain Humayun !
          Hide
          julianhyde Julian Hyde added a comment -

          Jesus Camacho Rodriguez or slim bouguerra, Can you please review, test and commit? I am swamped with PRs right now.

          Show
          julianhyde Julian Hyde added a comment - Jesus Camacho Rodriguez or slim bouguerra , Can you please review, test and commit? I am swamped with PRs right now.
          Hide
          zhumayun Zain Humayun added a comment -
          Show
          zhumayun Zain Humayun added a comment - Created a PR: https://github.com/apache/calcite/pull/472
          Hide
          zhumayun Zain Humayun added a comment -

          Jesus Camacho Rodriguez here are some sample queries with their final plans

          SELECT COUNT("brand_name") FILTER (WHERE "the_year" >= 1997) FROM "foodmart";
          
           EnumerableInterpreter
            BindableAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1])
              BindableProject(brand_name=[$0], $f1=[IS TRUE(>=(CAST($1):BIGINT, 1997))])
                DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]])
          
          SELECT COUNT(DISTINCT "brand_name") FILTER (where "the_year" >= 1997) FROM "foodmart";
          
           EnumerableInterpreter
            BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])
              BindableAggregate(group=[{0}])
                BindableProject(i$brand_name=[CASE(IS TRUE(>=(CAST($1):BIGINT, 1997)), $0, null)])
                  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]])
          

          Notice that with this plan there are two aggregates on top of each other because of the distinct clause. We'll probably need a special rule to catch this relation.

          SELECT COUNT("brand_name") FILTER (WHERE "the_year" >= 1997), SUM("store_sales") FILTER (WHERE "store_city" = 'Seattle') FROM "foodmart";
          
           EnumerableInterpreter
            BindableAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], EXPR$1=[SUM($2) FILTER $3])
              BindableProject(brand_name=[$0], $f1=[IS TRUE(>=(CAST($2):BIGINT, 1997))], store_sales=[$3], $f3=[IS TRUE(=($1, 'Seattle'))])
                DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $62, $83, $90]])
          
          SELECT SUM(CASE WHEN "store_city" = 'Seattle' THEN "store_sales" END) FROM "foodmart";
          
           EnumerableInterpreter
            BindableAggregate(group=[{}], EXPR$0=[SUM($0)])
              BindableProject($f0=[CASE(=($0, 'Seattle'), $1, null)])
                DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$62, $90]])
          

          Looks like this produces something slightly different than the FILTER (WHERE <condition>) syntax.

          SELECT COUNT(DISTINCT CASE WHEN "the_year" >= 1997 THEN "brand_name" END) FROM "foodmart";
          
           EnumerableInterpreter
            BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])
              BindableAggregate(group=[{0}])
                BindableProject($f0=[CASE(>=(CAST($1):BIGINT, 1997), $0, null)])
                  DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]])
          

          slim bouguerra Adding the OR of all filters is a good idea, i'll take a look and see if that can also be added.

          Show
          zhumayun Zain Humayun added a comment - Jesus Camacho Rodriguez here are some sample queries with their final plans SELECT COUNT( "brand_name" ) FILTER ( WHERE "the_year" >= 1997) FROM "foodmart" ; EnumerableInterpreter BindableAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1]) BindableProject(brand_name=[$0], $f1=[IS TRUE(>=(CAST($1):BIGINT, 1997))]) DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]]) SELECT COUNT(DISTINCT "brand_name" ) FILTER ( where "the_year" >= 1997) FROM "foodmart" ; EnumerableInterpreter BindableAggregate(group=[{}], EXPR$0=[COUNT($0)]) BindableAggregate(group=[{0}]) BindableProject(i$brand_name=[CASE(IS TRUE(>=(CAST($1):BIGINT, 1997)), $0, null)]) DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]]) Notice that with this plan there are two aggregates on top of each other because of the distinct clause. We'll probably need a special rule to catch this relation. SELECT COUNT( "brand_name" ) FILTER ( WHERE "the_year" >= 1997), SUM( "store_sales" ) FILTER ( WHERE "store_city" = 'Seattle') FROM "foodmart" ; EnumerableInterpreter BindableAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], EXPR$1=[SUM($2) FILTER $3]) BindableProject(brand_name=[$0], $f1=[IS TRUE(>=(CAST($2):BIGINT, 1997))], store_sales=[$3], $f3=[IS TRUE(=($1, 'Seattle'))]) DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $62, $83, $90]]) SELECT SUM(CASE WHEN "store_city" = 'Seattle' THEN "store_sales" END) FROM "foodmart" ; EnumerableInterpreter BindableAggregate(group=[{}], EXPR$0=[SUM($0)]) BindableProject($f0=[CASE(=($0, 'Seattle'), $1, null)]) DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$62, $90]]) Looks like this produces something slightly different than the FILTER (WHERE <condition>) syntax. SELECT COUNT(DISTINCT CASE WHEN "the_year" >= 1997 THEN "brand_name" END) FROM "foodmart" ; EnumerableInterpreter BindableAggregate(group=[{}], EXPR$0=[COUNT($0)]) BindableAggregate(group=[{0}]) BindableProject($f0=[CASE(>=(CAST($1):BIGINT, 1997), $0, null)]) DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], projects=[[$2, $83]]) slim bouguerra Adding the OR of all filters is a good idea, i'll take a look and see if that can also be added.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Zain Humayun, it would help if you could post a sample query together with the plan generated by Calcite to know how the filter predicates for the query look like, etc. Extending DruidAggregateProjectRule should be fine, but it would be good to understand the scope of the extension first: the logical plan will help with that.

          FWIW, many databases do not support the sum("col1") FILTER (WHERE <condition1>) construct. In those cases, same behavior is often reproduced with CASE statements, e.g., sum(CASE WHEN <condition1> THEN "col1" END). It would be good to know the plan that is generated in those cases too, mainly to see whether it is similar (since CASE might be transformed into AND/OR clauses by Calcite) or there is a gap that we could close in a follow-up JIRA.

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Zain Humayun , it would help if you could post a sample query together with the plan generated by Calcite to know how the filter predicates for the query look like, etc. Extending DruidAggregateProjectRule should be fine, but it would be good to understand the scope of the extension first: the logical plan will help with that. FWIW, many databases do not support the sum("col1") FILTER (WHERE <condition1>) construct. In those cases, same behavior is often reproduced with CASE statements, e.g., sum(CASE WHEN <condition1> THEN "col1" END) . It would be good to know the plan that is generated in those cases too, mainly to see whether it is similar (since CASE might be transformed into AND/OR clauses by Calcite) or there is a gap that we could close in a follow-up JIRA.
          Hide
          bslim slim bouguerra added a comment -

          I agree on 1 if it is doable. Also i would like to see (if possible) to have an OR of the filter to be added upfront as first Filter. That will act as a first pruning filter to select the rows needed for the aggregation then will do the rest as `Filtered aggregator`. Taking a simple case where cond1 is country=US, cond2 is country=CA, The first filter will be country = US OR country = CA.
          Again i feel like with arbitrary filter expression might be hard to construct such filter but maybe calcite can do that.

          2 -> yes, i thought we already do this.

          3 -> not sure maybe Julian Hyde or Jesus Camacho Rodriguez can answer.

          Show
          bslim slim bouguerra added a comment - I agree on 1 if it is doable. Also i would like to see (if possible) to have an OR of the filter to be added upfront as first Filter. That will act as a first pruning filter to select the rows needed for the aggregation then will do the rest as `Filtered aggregator`. Taking a simple case where cond1 is country=US, cond2 is country=CA, The first filter will be country = US OR country = CA. Again i feel like with arbitrary filter expression might be hard to construct such filter but maybe calcite can do that. 2 -> yes, i thought we already do this. 3 -> not sure maybe Julian Hyde or Jesus Camacho Rodriguez can answer.

            People

            • Assignee:
              zhumayun Zain Humayun
              Reporter:
              zhumayun Zain Humayun
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development