Uploaded image for project: 'Calcite'
  1. Calcite
  2. CALCITE-1578

Druid adapter: wrong semantics of topN query limit with granularity

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.11.0
    • Fix Version/s: 1.12.0
    • Component/s: druid
    • Labels:
      None

      Description

      Semantics of Druid topN query with limit and granularity is not equivalent to input SQL. In particular, limit is applied on each granularity value, not on the overall query.

      Currently, the following query will be transformed into a topN query:

      SELECT i_brand_id, floor_day(`__time`), max(ss_quantity), sum(ss_wholesale_cost) as s
      FROM store_sales_sold_time_subset
      GROUP BY i_brand_id, floor_day(`__time`)
      ORDER BY s DESC
      LIMIT 10;
      

      Previous query outputs at most 10 rows. In turn, the equivalent SQL query for a Druid topN query should be expressed as:

      SELECT rs.i_brand_id, rs.d, rs.m, rs.s
      FROM (
          SELECT i_brand_id, floor_day(`__time`) as d, max(ss_quantity) as m, sum(ss_wholesale_cost) as s,
                 ROW_NUMBER() OVER (PARTITION BY floor_day(`__time`) ORDER BY sum(ss_wholesale_cost) DESC ) AS rownum
          FROM store_sales_sold_time_subset
          GROUP BY i_brand_id, floor_day(`__time`)
      ) rs
      WHERE rownum <= 10;
      

        Issue Links

          Activity

          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Julian Hyde, could you double-check the alternative SQL formulation? Thanks

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Julian Hyde , could you double-check the alternative SQL formulation? Thanks
          Hide
          julianhyde Julian Hyde added a comment -

          The problem of using ROW_NUMBER() is that it will output the same value for ties. Thus if the 10th and 11th rows have the same value you will get 11 rows. But I think it's the best we can do within standard SQL.

          I think it would be useful to find a way to express this in the algebra, in a way that is more direct than windowed aggregation + filter on row_number. In Calcite's algebra as you know LIMIT and OFFSET are part of the Sort operator. I think we could have Sort([x ASC, y DESC, z DESC], limitWithinGroup: 2, fetch: 10), which means take the top 10 z values within each (x, y) group.

          For current dialects of SQL we would implement using windowed aggregation + filter on row_number, but at least we'd have the full picture in the algebra.

          Show
          julianhyde Julian Hyde added a comment - The problem of using ROW_NUMBER() is that it will output the same value for ties. Thus if the 10th and 11th rows have the same value you will get 11 rows. But I think it's the best we can do within standard SQL. I think it would be useful to find a way to express this in the algebra, in a way that is more direct than windowed aggregation + filter on row_number. In Calcite's algebra as you know LIMIT and OFFSET are part of the Sort operator. I think we could have Sort( [x ASC, y DESC, z DESC] , limitWithinGroup: 2, fetch: 10) , which means take the top 10 z values within each (x, y) group. For current dialects of SQL we would implement using windowed aggregation + filter on row_number, but at least we'd have the full picture in the algebra.
          Hide
          julianhyde Julian Hyde added a comment -

          Jesus Camacho Rodriguez, I cannot reproduce this issue in current Calcite. In my dev sandbox I added two tests:

          • testGroupBySingleSortLimit groups by one column (brand_name) and generates a topN Druid query;
          • testGroupBySortLimit groups by two columns (brand_name and generate) and generates a "groupBy" query with a "limitSpec".

          Both queries return 3 rows, as they should.

          Show
          julianhyde Julian Hyde added a comment - Jesus Camacho Rodriguez , I cannot reproduce this issue in current Calcite. In my dev sandbox I added two tests: testGroupBySingleSortLimit groups by one column (brand_name) and generates a topN Druid query; testGroupBySortLimit groups by two columns (brand_name and generate) and generates a "groupBy" query with a "limitSpec". Both queries return 3 rows, as they should.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited

          Julian Hyde, I will check the queries, but they key ingredient to reproduce the problem is the granularity: a topN query returns at most n rows for each different granularity value.

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited Julian Hyde , I will check the queries, but they key ingredient to reproduce the problem is the granularity : a topN query returns at most n rows for each different granularity value.
          Hide
          julianhyde Julian Hyde added a comment -

          When we generate a topN query we generate it with granularity: all, so we get the desired result.

          Is it possible that Hive is executing multiple Druid queries in parallel? That would explain why it shows up in Hive and not in Calcite.

          Show
          julianhyde Julian Hyde added a comment - When we generate a topN query we generate it with granularity: all , so we get the desired result. Is it possible that Hive is executing multiple Druid queries in parallel? That would explain why it shows up in Hive and not in Calcite.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          We might recognize _ floor_day_ and adjust the query granularity: https://github.com/apache/calcite/blob/master/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java#L474
          This was one of the changes introduced with the recognition for Druid topN queries.

          What we could do to fix this issue is to avoid pushing the SortLimit operator if granularity is not all. That would give us the right semantics, which is the immediate goal.

          Then, we could focus on enriching the algebra in a follow-up as you proposed (I like your proposal) so we could recognize easier this kind of queries.

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - We might recognize _ floor_day_ and adjust the query granularity: https://github.com/apache/calcite/blob/master/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java#L474 This was one of the changes introduced with the recognition for Druid topN queries. What we could do to fix this issue is to avoid pushing the SortLimit operator if granularity is not all. That would give us the right semantics, which is the immediate goal. Then, we could focus on enriching the algebra in a follow-up as you proposed (I like your proposal) so we could recognize easier this kind of queries.
          Hide
          julianhyde Julian Hyde added a comment -

          I was thinking of something similar: if LIMIT is present, we will set granularity to "all". This will cause us to use a groupBy Druid query with a limitSpec rather than topN. A little bit less efficient, but at least the sort/limit occurs in Druid.

          Show
          julianhyde Julian Hyde added a comment - I was thinking of something similar: if LIMIT is present, we will set granularity to "all". This will cause us to use a groupBy Druid query with a limitSpec rather than topN. A little bit less efficient, but at least the sort/limit occurs in Druid.
          Hide
          julianhyde Julian Hyde added a comment -

          Cancel that. In a groupBy query, limitSpec seems to operate over the granularity rather than globally. That's not what we want.

          So, best we can do is generate a groupBy query with granularity = all and to the sort/limit outside.

          Jesus Camacho Rodriguez, Please review https://github.com/julianhyde/calcite/tree/1578-druid-aggregate-sort-limit, which has a fix for this and CALCITE-1579.

          Gian Merlino, Let me know if you can think of a way we could be generating more efficient queries (either topN, or groupBy with a limitSpec) for SQL queries that have GROUP BY ... LIMIT.

          Show
          julianhyde Julian Hyde added a comment - Cancel that. In a groupBy query, limitSpec seems to operate over the granularity rather than globally. That's not what we want. So, best we can do is generate a groupBy query with granularity = all and to the sort/limit outside. Jesus Camacho Rodriguez , Please review https://github.com/julianhyde/calcite/tree/1578-druid-aggregate-sort-limit , which has a fix for this and CALCITE-1579 . Gian Merlino , Let me know if you can think of a way we could be generating more efficient queries (either topN , or groupBy with a limitSpec ) for SQL queries that have GROUP BY ... LIMIT .
          Hide
          gian Gian Merlino added a comment -

          "granularity" on topN/groupBy behaves differently enough from how SQL wants them to work that I decided to stick to granularity = all for topN/groupBy for the builtin Druid SQL layer if there's an ORDER BY or LIMIT.

          Show
          gian Gian Merlino added a comment - "granularity" on topN/groupBy behaves differently enough from how SQL wants them to work that I decided to stick to granularity = all for topN/groupBy for the builtin Druid SQL layer if there's an ORDER BY or LIMIT.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited

          Julian Hyde, patch LGTM, thanks for pulling this out. A small note: I think the new condition in L540 in DruidQuery.java could be removed? Maybe an assertion granularity == Granularity.ALL within the clause could be added, since we should not have pushed the SortLimit if granularity was not 'all'?

          I think this should fix CALCITE-1580 too, could you confirm that? Thanks

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited Julian Hyde , patch LGTM, thanks for pulling this out. A small note: I think the new condition in L540 in DruidQuery.java could be removed? Maybe an assertion granularity == Granularity.ALL within the clause could be added, since we should not have pushed the SortLimit if granularity was not 'all'? I think this should fix CALCITE-1580 too, could you confirm that? Thanks
          Hide
          bslim slim bouguerra added a comment -

          as per Gian Merlino suggested the best way to do this is to keep Granularity=ALL and use druid extraction function to rollup data to the desired Granularity. This can be achieved by adding an extra druid dimension that is a projection of the time dimension for instance here is the `day`.

          Show
          bslim slim bouguerra added a comment - as per Gian Merlino suggested the best way to do this is to keep Granularity=ALL and use druid extraction function to rollup data to the desired Granularity. This can be achieved by adding an extra druid dimension that is a projection of the time dimension for instance here is the `day`.
          Hide
          nishantbangarwa Nishant Bangarwa added a comment -

          Here is a sample query using extraction function, in case it helps -

          {
            "queryType": "groupBy",
            "dataSource": "druid_tpcds_ss_sold_time_subset",
            "granularity": "ALL",
            "dimensions": [
              "i_brand_id",
              {
                "type" : "extraction",
                "dimension" : "__time",
                "outputName" :  "year",
                "extractionFn" : {
                  "type" : "timeFormat",
                  "granularity" : "YEAR"
                }
              }
            ],
            "limitSpec": {
              "type": "default",
              "limit": 10,
              "columns": [
                {
                  "dimension": "$f3",
                  "direction": "ascending"
                }
              ]
            },
            "aggregations": [
              {
                "type": "longMax",
                "name": "$f2",
                "fieldName": "ss_quantity"
              },
              {
                "type": "doubleSum",
                "name": "$f3",
                "fieldName": "ss_wholesale_cost"
              }
            ],
            "intervals": [
              "1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"
            ]
          }
          
          Show
          nishantbangarwa Nishant Bangarwa added a comment - Here is a sample query using extraction function, in case it helps - { "queryType" : "groupBy" , "dataSource" : "druid_tpcds_ss_sold_time_subset" , "granularity" : "ALL" , "dimensions" : [ "i_brand_id" , { "type" : "extraction" , "dimension" : "__time" , "outputName" : "year" , "extractionFn" : { "type" : "timeFormat" , "granularity" : "YEAR" } } ], "limitSpec" : { "type" : " default " , "limit" : 10, "columns" : [ { "dimension" : "$f3" , "direction" : "ascending" } ] }, "aggregations" : [ { "type" : "longMax" , "name" : "$f2" , "fieldName" : "ss_quantity" }, { "type" : "doubleSum" , "name" : "$f3" , "fieldName" : "ss_wholesale_cost" } ], "intervals" : [ "1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z" ] }
          Hide
          bslim slim bouguerra added a comment -
          Show
          bslim slim bouguerra added a comment - Julian Hyde please check the comments on https://github.com/apache/calcite/pull/354/files .
          Show
          julianhyde Julian Hyde added a comment - Fixed in http://git-wip-us.apache.org/repos/asf/calcite/commit/a118f821 .
          Hide
          julianhyde Julian Hyde added a comment -

          Resolved in release 1.12.0 (2017-03-24).

          Show
          julianhyde Julian Hyde added a comment - Resolved in release 1.12.0 (2017-03-24).

            People

            • Assignee:
              julianhyde Julian Hyde
              Reporter:
              jcamachorodriguez Jesus Camacho Rodriguez
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development