Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.12.0
    • Fix Version/s: 1.14.0
    • Component/s: druid
    • Labels:
      None

      Description

      Currently, the Druid adapter does not support the thetaSketch aggregate type, which is used to measure the cardinality of a column quickly. Many Druid instances support theta sketches, so I think it would be a nice feature to have.

      I've been looking at the Druid adapter, and propose we add a new DruidType called thetaSketch and then add logic in the getJsonAggregation method in class DruidQuery to generate the thetaSketch aggregate. This will require accessing information about the columns (what data type they are) so that the thetaSketch aggregate is only produced if the column's type is thetaSketch.

      Also, I've noticed that a hyperUnique DruidType is currently defined, but a hyperUnique aggregate is never produced. Since both are approximate aggregators, I could also couple in the logic for hyperUnique.

      I'd love to hear your thoughts on my approach, and any suggestions you have for this feature.

        Issue Links

          Activity

          Hide
          julianhyde Julian Hyde added a comment -

          Does it need to be a new type? Can it not just be a new expression (of type varbinary or whatever)? There's some precedent for this. We have an aggregate function called HISTOGRAM_AGG which represents a partially evaluated aggregate containing a sorted set. From this result you can derive other aggregates: min, max, median. Partial aggregates are common in databases so I'd rather have a simpler approach than adding a new type for each one.

          Show
          julianhyde Julian Hyde added a comment - Does it need to be a new type? Can it not just be a new expression (of type varbinary or whatever)? There's some precedent for this. We have an aggregate function called HISTOGRAM_AGG which represents a partially evaluated aggregate containing a sorted set. From this result you can derive other aggregates: min, max, median. Partial aggregates are common in databases so I'd rather have a simpler approach than adding a new type for each one.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          Apologies if I didn't fully understand your comment, but I have a few questions:
          1. If we were to create a new expression, how would calcite know that the DB we are connecting to supports partial aggregates before the expression is built? For example, not all Druid instances support thetaSketches (aslo not all metrics are of type thetaSketch), and the Druid adapter only gets this information in DruidConnectionImpl#metadata - after (I believe) the expressions have been derived.

          2. Can you give me a sample query where the HISTOGRAM_AGG is used?

          Thanks!

          Show
          zhumayun Zain Humayun added a comment - - edited Apologies if I didn't fully understand your comment, but I have a few questions: 1. If we were to create a new expression, how would calcite know that the DB we are connecting to supports partial aggregates before the expression is built? For example, not all Druid instances support thetaSketches (aslo not all metrics are of type thetaSketch), and the Druid adapter only gets this information in DruidConnectionImpl#metadata - after (I believe) the expressions have been derived. 2. Can you give me a sample query where the HISTOGRAM_AGG is used? Thanks!
          Hide
          julianhyde Julian Hyde added a comment -

          Regarding 1. I'm assuming that you want to be able to write "select thetaSketch(customerId) from sales".

          If so, I think that's not a great idea, because it's not declarative. You don't write "emp MERGE JOIN dept", you write "emp JOIN dept" and let the optimizer decide which algorithm to use.

          I'd prefer we wrote "select count(distinct customerId) approximate (algorithm thetaSketch) from sales" or just "select count(distinct customerId) from sales" (using a session preference that thetaSketch or hyperLogLog can be used).

          Regarding 2. HISTOGRAM_AGG is still in the code (see SqlStdOperatorTable) but it is not currently used. When it was used, we would generate plans like this:

          SELECT orderId, productId, min(quantity) OVER w, max(quantity) OVER w
          FROM Orders
          WINDOW w AS (PARTITION BY productId
            ORDER BY orderTimestamp
            RANGE INTERVAL '1' HOUR PRECEDING)
          
          Project($0, $1, $HistogramMin($2), $HistogramMax($2))
            Window($0, $1, HISTOGRAM_AGG($3) over (partition by $1 order by $2 range interval '1' hour preceding))
              Scan(Orders)
          

          As you can see, we compute one aggregate, a histogram (basically TreeSet on top of a FIFO queue), then we have two extractor functions ($HistogramMin and $HistogramMax) to get the min and max from it.

          Show
          julianhyde Julian Hyde added a comment - Regarding 1. I'm assuming that you want to be able to write "select thetaSketch(customerId) from sales". If so, I think that's not a great idea, because it's not declarative. You don't write "emp MERGE JOIN dept", you write "emp JOIN dept" and let the optimizer decide which algorithm to use. I'd prefer we wrote "select count(distinct customerId) approximate (algorithm thetaSketch) from sales" or just "select count(distinct customerId) from sales" (using a session preference that thetaSketch or hyperLogLog can be used). Regarding 2. HISTOGRAM_AGG is still in the code (see SqlStdOperatorTable) but it is not currently used. When it was used, we would generate plans like this: SELECT orderId, productId, min(quantity) OVER w, max(quantity) OVER w FROM Orders WINDOW w AS (PARTITION BY productId ORDER BY orderTimestamp RANGE INTERVAL '1' HOUR PRECEDING) Project($0, $1, $HistogramMin($2), $HistogramMax($2)) Window($0, $1, HISTOGRAM_AGG($3) over (partition by $1 order by $2 range interval '1' hour preceding)) Scan(Orders) As you can see, we compute one aggregate, a histogram (basically TreeSet on top of a FIFO queue), then we have two extractor functions ($HistogramMin and $HistogramMax) to get the min and max from it.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          My aim is to be able to write something like

          SELECT COUNT(DISTINCT "col") FROM "table"; and let calcite generate one of:

          • thetaSketch aggregate
          • hyperUnique aggregate
          • cardinality aggregate (already in calcite, default aggregate for count distinct queries)

          Calcite can determine which aggregate to generate by looking at the DruidType for "col" from the metadata query. The logic for that would just go in the getJsonAggregation method in DruidQuery.

          Show
          zhumayun Zain Humayun added a comment - - edited My aim is to be able to write something like SELECT COUNT(DISTINCT "col") FROM "table"; and let calcite generate one of: thetaSketch aggregate hyperUnique aggregate cardinality aggregate (already in calcite, default aggregate for count distinct queries) Calcite can determine which aggregate to generate by looking at the DruidType for "col" from the metadata query. The logic for that would just go in the getJsonAggregation method in DruidQuery.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          I've created two pull requests:
          1) For calcite (https://github.com/apache/calcite/pull/455)
          2) For the test dataset (https://github.com/vlsi/calcite-test-dataset/pull/21)

          2 is needed because the current druid instance does not support the data-sketches extension needed for thetaSketches. I've also added tests for the hyperUnique aggregator.

          Show
          zhumayun Zain Humayun added a comment - - edited I've created two pull requests: 1) For calcite ( https://github.com/apache/calcite/pull/455 ) 2) For the test dataset ( https://github.com/vlsi/calcite-test-dataset/pull/21 ) 2 is needed because the current druid instance does not support the data-sketches extension needed for thetaSketches. I've also added tests for the hyperUnique aggregator.
          Hide
          bslim slim bouguerra added a comment -

          In my opinion there is something missing here. The missing piece is the post aggregators, in fact as per theta sketch docs , you need to use both filtered aggregators and post aggregators to answer to queries like

           select count(distinct "sketch_column") where conditionA and conditionB from table 

          . I guess we need some ground work for post aggregators and aggregated filters first. Please let me know if i am missing something.

          Show
          bslim slim bouguerra added a comment - In my opinion there is something missing here. The missing piece is the post aggregators, in fact as per theta sketch docs , you need to use both filtered aggregators and post aggregators to answer to queries like select count(distinct "sketch_column" ) where conditionA and conditionB from table . I guess we need some ground work for post aggregators and aggregated filters first. Please let me know if i am missing something.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          While I agree that post aggregations make theta sketch aggregators more useful, the theta sketch aggregators can still be used independently of post aggregators as the example shows in the docs you linked. CALCITE-1803 is currently open for post aggregations, so the thetaSketch post aggregation functionality should be added there.

          Does Druid currently support aggregated filters?

          Show
          zhumayun Zain Humayun added a comment - - edited While I agree that post aggregations make theta sketch aggregators more useful, the theta sketch aggregators can still be used independently of post aggregators as the example shows in the docs you linked. CALCITE-1803 is currently open for post aggregations, so the thetaSketch post aggregation functionality should be added there. Does Druid currently support aggregated filters?
          Hide
          bslim slim bouguerra added a comment -

          Zain Humayun yes druid does handle that. I am not against using partially theta sketches but it need to done in way where will not get wrong results as per the example i stated, would love to see testing around that.

          Show
          bslim slim bouguerra added a comment - Zain Humayun yes druid does handle that. I am not against using partially theta sketches but it need to done in way where will not get wrong results as per the example i stated, would love to see testing around that.
          Hide
          julianhyde Julian Hyde added a comment -

          Review comments:

          • I think you should remove CalciteTrace.getDruidQueryInfoTracer(). Druid is an optional module, and there will be a ClassNotFoundException if people are running in an environment where calcite-druid is not on classpath.
          • Make DruidType a package-level class.
          • There are several occurrences of the string "thetaSketch" in the code. Replace them with DruidType.thetaSketch.
          • Use "DruidType.valueOf" to look up types.
          • In DruidType.create, combine fieldMap and columnTypeMap into a single `Map<Pair<SqlTypeName, DruidType>>`, and check that both types are not null.
          • Update druid_adapter.md

          Overall, looks good, if you can address slim bouguerra's concerns.

          Show
          julianhyde Julian Hyde added a comment - Review comments: I think you should remove CalciteTrace.getDruidQueryInfoTracer(). Druid is an optional module, and there will be a ClassNotFoundException if people are running in an environment where calcite-druid is not on classpath. Make DruidType a package-level class. There are several occurrences of the string "thetaSketch" in the code. Replace them with DruidType.thetaSketch. Use "DruidType.valueOf" to look up types. In DruidType.create, combine fieldMap and columnTypeMap into a single `Map<Pair<SqlTypeName, DruidType>>`, and check that both types are not null. Update druid_adapter.md Overall, looks good, if you can address slim bouguerra 's concerns.
          Hide
          zhumayun Zain Humayun added a comment -

          slim bouguerra that seems reasonable. So i've taken the liberty to test two queries, which should have the same semantics:

          1) select count(distinct "user_unique") from "foodmart" where "store_sales" >= 0 and "store_cost" >= 0;

          and

          2) select count(distinct "user_unique") from "foodmart";

          Both produce the same value, 5581. However, for #1 the plan looks something like:

          BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])
          BindableAggregate(group=[{2}])
          BindableFilter(condition=[AND(>=($0, 0), >=($1, 0))])
          DruidQuery(table=[[foodmart, foodmart]], intervals=[...], projects=[[$90, $91, $92]])

          I noticed calcite doesn't push the where clause into the Druid query (in the "filter" field). Is this a missing feature/rule or am I misunderstanding something?

          Show
          zhumayun Zain Humayun added a comment - slim bouguerra that seems reasonable. So i've taken the liberty to test two queries, which should have the same semantics: 1) select count(distinct "user_unique") from "foodmart" where "store_sales" >= 0 and "store_cost" >= 0; and 2) select count(distinct "user_unique") from "foodmart"; Both produce the same value, 5581. However, for #1 the plan looks something like: BindableAggregate(group=[{}], EXPR$0=[COUNT($0)]) BindableAggregate(group=[{2}]) BindableFilter(condition=[AND(>=($0, 0), >=($1, 0))]) DruidQuery(table=[[foodmart, foodmart]], intervals=[...], projects=[[$90, $91, $92]]) I noticed calcite doesn't push the where clause into the Druid query (in the "filter" field). Is this a missing feature/rule or am I misunderstanding something?
          Hide
          zhumayun Zain Humayun added a comment -

          Julian Hyde Those review comments are reasonable, i'll make those changes on my pull request.

          Show
          zhumayun Zain Humayun added a comment - Julian Hyde Those review comments are reasonable, i'll make those changes on my pull request.
          Hide
          bslim slim bouguerra added a comment -

          Zain Humayun the filter is not pushed because store_sales is a metric, currently calcite push only druid dimension filters, there is a gap that can be fixed as well.
          Still, i am not sure i am following your example what are you trying to test exactly ?

          Show
          bslim slim bouguerra added a comment - Zain Humayun the filter is not pushed because store_sales is a metric, currently calcite push only druid dimension filters, there is a gap that can be fixed as well. Still, i am not sure i am following your example what are you trying to test exactly ?
          Hide
          zhumayun Zain Humayun added a comment - - edited

          slim bouguerra Ahh, I see. I didn't realize store_sales was a metric. I was just trying to see if calcite would generate a filtered aggregate, but I suppose I'll need a different query for that.

          Show
          zhumayun Zain Humayun added a comment - - edited slim bouguerra Ahh, I see. I didn't realize store_sales was a metric. I was just trying to see if calcite would generate a filtered aggregate, but I suppose I'll need a different query for that.
          Hide
          bslim slim bouguerra added a comment -

          Zain Humayun druid-calcite adapted does not generate aggregated filters.

          Show
          bslim slim bouguerra added a comment - Zain Humayun druid-calcite adapted does not generate aggregated filters.
          Hide
          julianhyde Julian Hyde added a comment -

          Can we all please use precise language? A "filtered aggregate" is not the same as an "aggregated filter". I suggest that we talk about relational operators (filter, aggregate, project, scan) and use prepositions (before, after). That is clearer than using adjectives ("filtered" or "post-") when English is not everyone's first language.

          CALCITE-1206 deals with HAVING (filter after aggregate).

          Show
          julianhyde Julian Hyde added a comment - Can we all please use precise language? A "filtered aggregate" is not the same as an "aggregated filter". I suggest that we talk about relational operators (filter, aggregate, project, scan) and use prepositions (before, after). That is clearer than using adjectives ("filtered" or "post-") when English is not everyone's first language. CALCITE-1206 deals with HAVING (filter after aggregate).
          Hide
          zhumayun Zain Humayun added a comment -

          Julian Hyde I've pushed a new commit addressing your review comments. Please let me know if I missed something or if you have any other comments. Addressing slim bouguerras concerns, I'm currently working with Junxian Wu to add post aggregation support into calcite (we sit beside each other at work). Once i'm done CALCITE-1787, i'll switch over to CALCITE-1803. Since calcite does not currently have support for filtered aggregators, I was wondering if we should bundle that feature in with CALCITE-1803. slim bouguerra could you comment on use cases for filtered aggregations in calcite? i.e queries in which it is advantageous to use filtered aggregators in addition to/as opposed to regular filters.

          Show
          zhumayun Zain Humayun added a comment - Julian Hyde I've pushed a new commit addressing your review comments. Please let me know if I missed something or if you have any other comments. Addressing slim bouguerra s concerns, I'm currently working with Junxian Wu to add post aggregation support into calcite (we sit beside each other at work). Once i'm done CALCITE-1787 , i'll switch over to CALCITE-1803 . Since calcite does not currently have support for filtered aggregators, I was wondering if we should bundle that feature in with CALCITE-1803 . slim bouguerra could you comment on use cases for filtered aggregations in calcite? i.e queries in which it is advantageous to use filtered aggregators in addition to/as opposed to regular filters.
          Hide
          bslim slim bouguerra added a comment - - edited

          Filters are applied to prune the rows before getting to the aggregation. Filtered Aggregator is a kind of aggregator that allow you to prune rows while doing the aggregation.
          Thus filter will be the first funnel then we can have a more fine grain filter per aggregator.
          So both play together in scenarios like the following.
          Assume your task is to compute the ratio of sales between two states let say CA and NY.
          To do this in an efficient way the druid query will have filter = rows that contain CA or NY, then will have two filtered aggregators (the first contains filter = CA while the second has filter = NY)
          Thus in one pass over the data we are able to compute the SUM of sales and we can compute the ratio as post aggregate.
          I hope you got the idea.
          I am not sure what is the equivalent to this in the realm of relational algebra maybe Julian Hyde has better examples.
          Also the druid doc has a good explanation .

          Show
          bslim slim bouguerra added a comment - - edited Filters are applied to prune the rows before getting to the aggregation. Filtered Aggregator is a kind of aggregator that allow you to prune rows while doing the aggregation. Thus filter will be the first funnel then we can have a more fine grain filter per aggregator. So both play together in scenarios like the following. Assume your task is to compute the ratio of sales between two states let say CA and NY. To do this in an efficient way the druid query will have filter = rows that contain CA or NY, then will have two filtered aggregators (the first contains filter = CA while the second has filter = NY) Thus in one pass over the data we are able to compute the SUM of sales and we can compute the ratio as post aggregate. I hope you got the idea. I am not sure what is the equivalent to this in the realm of relational algebra maybe Julian Hyde has better examples. Also the druid doc has a good explanation .
          Hide
          julianhyde Julian Hyde added a comment -

          I'll say again. Please use precise language. Nouns and prepositions.

          Show
          julianhyde Julian Hyde added a comment - I'll say again. Please use precise language. Nouns and prepositions.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          Apologies Julian Hyde, I did not see your comment earlier when I posted. slim bouguerra thanks. I would also be interested to see what that kind of query looks like in SQL. Also, I know you've expressed concerns about this addition being incomplete, what are your thoughts on adding those missing pieces in another ticket, say CALCITE-1803?

          Show
          zhumayun Zain Humayun added a comment - - edited Apologies Julian Hyde , I did not see your comment earlier when I posted. slim bouguerra thanks. I would also be interested to see what that kind of query looks like in SQL. Also, I know you've expressed concerns about this addition being incomplete, what are your thoughts on adding those missing pieces in another ticket, say CALCITE-1803 ?
          Hide
          bslim slim bouguerra added a comment -

          Zain Humayun in my opinion it is better to have something working correctly within one commit/bug_id. This makes code easier to read/maintain and use.

          Show
          bslim slim bouguerra added a comment - Zain Humayun in my opinion it is better to have something working correctly within one commit/bug_id. This makes code easier to read/maintain and use.
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited

          Zain Humayun, slim bouguerra, if I understand correctly, the query you are talking about could be expressed in SQL with CTEs:

          WITH cte AS
          (
              SELECT state, SUM(sales) AS state_sales
              FROM table_x
              WHERE state = 'CA' OR state = 'NY'
              GROUP BY state
          )
          SELECT (cte1.state_sales / cte2.state_sales) as ratio
          FROM cte cte1 JOIN cte cte2
          WHERE cte1.state = 'CA' AND cte2.state = 'NY'
          

          Problem is that currently CTEs are expanded by the optimizer, thus you end up executing the WITH clause twice. It seems till CALCITE-481 is completed and Spool operator introduced in Calcite, we will not be able to push this kind of queries to Druid.

          Back to Zain Humayun work and due to the complexity of the extension, I was wondering if this patch should be already checked in? slim bouguerra, do you have concerns about the correctness or is it about the filtering extension? If I understand correctly the discussion above, the only issue is that we will not be pushing some of the filters to Druid, but maybe I am mistaken.

          If we do not produce incorrect results and we can get significant gains in some cases, I am inclined to check it in already.

          EDIT:
          Related to CALCITE-1828, I did not realize this could be make much simpler/efficient without CTE as:

          SELECT (
              SUM(CASE WHEN state = 'CA' THEN state_sales END) /
              SUM(CASE WHEN state = 'NY' THEN state_sales END)
          ) as ratio
          FROM table_x
          GROUP BY state
          
          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - - edited Zain Humayun , slim bouguerra , if I understand correctly, the query you are talking about could be expressed in SQL with CTEs: WITH cte AS ( SELECT state, SUM(sales) AS state_sales FROM table_x WHERE state = 'CA' OR state = 'NY' GROUP BY state ) SELECT (cte1.state_sales / cte2.state_sales) as ratio FROM cte cte1 JOIN cte cte2 WHERE cte1.state = 'CA' AND cte2.state = 'NY' Problem is that currently CTEs are expanded by the optimizer, thus you end up executing the WITH clause twice. It seems till CALCITE-481 is completed and Spool operator introduced in Calcite, we will not be able to push this kind of queries to Druid. Back to Zain Humayun work and due to the complexity of the extension, I was wondering if this patch should be already checked in? slim bouguerra , do you have concerns about the correctness or is it about the filtering extension? If I understand correctly the discussion above, the only issue is that we will not be pushing some of the filters to Druid, but maybe I am mistaken. If we do not produce incorrect results and we can get significant gains in some cases, I am inclined to check it in already. EDIT: Related to CALCITE-1828 , I did not realize this could be make much simpler/efficient without CTE as: SELECT ( SUM(CASE WHEN state = 'CA' THEN state_sales END) / SUM(CASE WHEN state = 'NY' THEN state_sales END) ) as ratio FROM table_x GROUP BY state
          Hide
          bslim slim bouguerra added a comment -

          As per the comments above this will produce incorrect results when the user issue a query that has conjunction of filters.
          In addition we need to make sure that some tests are in place:
          For instance i am wondering if the user issue a query group by sketch what happen ?
          OR when the user issue a query Sum(sketch) what happens ?

          Show
          bslim slim bouguerra added a comment - As per the comments above this will produce incorrect results when the user issue a query that has conjunction of filters. In addition we need to make sure that some tests are in place: For instance i am wondering if the user issue a query group by sketch what happen ? OR when the user issue a query Sum(sketch) what happens ?
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Thanks slim bouguerra.

          Zain Humayun, could you address slim bouguerra concerns and add some additional testing? All the correctness issues should be addressed before checking the code in. Thanks

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Thanks slim bouguerra . Zain Humayun , could you address slim bouguerra concerns and add some additional testing? All the correctness issues should be addressed before checking the code in. Thanks
          Hide
          zhumayun Zain Humayun added a comment -

          slim bouguerra
          At the moment, an exception is thrown when the following group by query is issued:

          SELECT "user_unique", count("brand_name") FROM "foodmart" GROUP BY "user_unique";
          
          java.lang.IllegalStateException: Unhandled value type: class java.lang.String
          	at org.apache.calcite.avatica.util.AbstractCursor$BinaryAccessor.getString(AbstractCursor.java:813)
          	at org.apache.calcite.avatica.AvaticaResultSet.getString(AvaticaResultSet.java:245)
          	at sqlline.Rows$Row.<init>(Rows.java:183)
          	at sqlline.IncrementalRows.hasNext(IncrementalRows.java:66)
          	at sqlline.TableOutputFormat.print(TableOutputFormat.java:33)
          	at sqlline.SqlLine.print(SqlLine.java:1663)
          	at sqlline.Commands.execute(Commands.java:833)
          	at sqlline.Commands.sql(Commands.java:732)
          	at sqlline.SqlLine.dispatch(SqlLine.java:813)
          	at sqlline.SqlLine.begin(SqlLine.java:686)
          	at sqlline.SqlLine.start(SqlLine.java:398)
          	at sqlline.SqlLine.main(SqlLine.java:291)
          

          It appears that columns with sql type SqlTypeName.VARBINARY ("user_unique" from above) cause some trouble when being printed. To me, it doesn't seem to make much sense to allow queries that group by columns with type hyperUnique/thetaSketch. They have a very specialized purpose. I’m thinking we should instead display some sort of error message instead. If that is undesirable, then another solution would be to change the sql type to SqlTypeName.VARCHAR and internally treat these columns as varchars. I’d be interested to hear your thoughts on this.

          Furthermore, the following sum query

          SELECT SUM("user_unique") FROM "foodmart";
          

          Will fail because the SUM function expects a column with type NUMERIC, but “user_unique” is of type VARBINARY. This behavior is correct, and I’ll add some test cases for it.

          Lastly, addressing filters. Conjunction of filters work fine when they’re pushed into Druid such as the simple case below

          SELECT COUNT(DISTINCT "user_unique") FROM "foodmart" WHERE "the_month" = 'April' AND "store_city" = 'Seattle';
          

          The potential issue I see is when there is a filter that cannot be pushed into Druid, such as trying to filter by another metric. In those cases I’m a little unclear on what the behavior should be, since calcite will be handling the thetaSketch/hyperUnique objects returned directly.

          Ex:

          SELECT COUNT(DISTINCT "user_unique") FROM "foodmart" WHERE "store_sales" > 0;
          

          Calcite will retrieve raw Druid rows and then internally do a count distinct on the “user_unique” column.

          Show
          zhumayun Zain Humayun added a comment - slim bouguerra At the moment, an exception is thrown when the following group by query is issued: SELECT "user_unique" , count( "brand_name" ) FROM "foodmart" GROUP BY "user_unique" ; java.lang.IllegalStateException: Unhandled value type: class java.lang. String at org.apache.calcite.avatica.util.AbstractCursor$BinaryAccessor.getString(AbstractCursor.java:813) at org.apache.calcite.avatica.AvaticaResultSet.getString(AvaticaResultSet.java:245) at sqlline.Rows$Row.<init>(Rows.java:183) at sqlline.IncrementalRows.hasNext(IncrementalRows.java:66) at sqlline.TableOutputFormat.print(TableOutputFormat.java:33) at sqlline.SqlLine.print(SqlLine.java:1663) at sqlline.Commands.execute(Commands.java:833) at sqlline.Commands.sql(Commands.java:732) at sqlline.SqlLine.dispatch(SqlLine.java:813) at sqlline.SqlLine.begin(SqlLine.java:686) at sqlline.SqlLine.start(SqlLine.java:398) at sqlline.SqlLine.main(SqlLine.java:291) It appears that columns with sql type SqlTypeName.VARBINARY ("user_unique" from above) cause some trouble when being printed. To me, it doesn't seem to make much sense to allow queries that group by columns with type hyperUnique/thetaSketch. They have a very specialized purpose. I’m thinking we should instead display some sort of error message instead. If that is undesirable, then another solution would be to change the sql type to SqlTypeName.VARCHAR and internally treat these columns as varchars. I’d be interested to hear your thoughts on this. Furthermore, the following sum query SELECT SUM( "user_unique" ) FROM "foodmart" ; Will fail because the SUM function expects a column with type NUMERIC , but “user_unique” is of type VARBINARY . This behavior is correct, and I’ll add some test cases for it. Lastly, addressing filters. Conjunction of filters work fine when they’re pushed into Druid such as the simple case below SELECT COUNT(DISTINCT "user_unique" ) FROM "foodmart" WHERE "the_month" = 'April' AND "store_city" = 'Seattle'; The potential issue I see is when there is a filter that cannot be pushed into Druid, such as trying to filter by another metric. In those cases I’m a little unclear on what the behavior should be, since calcite will be handling the thetaSketch/hyperUnique objects returned directly. Ex: SELECT COUNT(DISTINCT "user_unique" ) FROM "foodmart" WHERE "store_sales" > 0; Calcite will retrieve raw Druid rows and then internally do a count distinct on the “user_unique” column.
          Hide
          bslim slim bouguerra added a comment - - edited

          Zain Humayun please read the sketch docs.
          1 - Don't agree with the claim that queries like

           SELECT COUNT(DISTINCT "user_unique") FROM "foodmart" WHERE "store_city" = 'Chicago' AND "store_city" = 'Seattle'; 

          works fine.
          Pushing only filters to druid will produce the wrong results, you need post aggregation and filtered aggregator to do the intersection between sketches, without intersection the result you get is the union which means you have counted duplication thus you are not getting unique counts.
          2 - for the filters on metrics or the more general case when we can not push filter/query to druid in fact calcite can not do much, sketch is a binary blob that needs ser/desr library, not sure what is the perfect path to take, i don't know calcite well to provide an answer to the question.

          Show
          bslim slim bouguerra added a comment - - edited Zain Humayun please read the sketch docs. 1 - Don't agree with the claim that queries like SELECT COUNT(DISTINCT "user_unique" ) FROM "foodmart" WHERE "store_city" = 'Chicago' AND "store_city" = 'Seattle'; works fine. Pushing only filters to druid will produce the wrong results, you need post aggregation and filtered aggregator to do the intersection between sketches, without intersection the result you get is the union which means you have counted duplication thus you are not getting unique counts. 2 - for the filters on metrics or the more general case when we can not push filter/query to druid in fact calcite can not do much, sketch is a binary blob that needs ser/desr library, not sure what is the perfect path to take, i don't know calcite well to provide an answer to the question.
          Hide
          bslim slim bouguerra added a comment -

          For instance if you want to query for how many unique users visited both product A and B? the query to druid should look like

           
          {
            "queryType": "groupBy",
            "dataSource": "test_datasource",
            "granularity": "ALL",
            "dimensions": [],
            "filter": {
              "type": "or",
              "fields": [
                {"type": "selector", "dimension": "product", "value": "A"},
                {"type": "selector", "dimension": "product", "value": "B"}
              ]
            },
            "aggregations": [
              {
                "type" : "filtered",
                "filter" : {
                  "type" : "selector",
                  "dimension" : "product",
                  "value" : "A"
                },
                "aggregator" :     {
                  "type": "thetaSketch", "name": "A_unique_users", "fieldName": "user_id_sketch"
                }
              },
              {
                "type" : "filtered",
                "filter" : {
                  "type" : "selector",
                  "dimension" : "product",
                  "value" : "B"
                },
                "aggregator" :     {
                  "type": "thetaSketch", "name": "B_unique_users", "fieldName": "user_id_sketch"
                }
              }
            ],
            "postAggregations": [
              {
                "type": "thetaSketchEstimate",
                "name": "final_unique_users",
                "field":
                {
                  "type": "thetaSketchSetOp",
                  "name": "final_unique_users_sketch",
                  "func": "INTERSECT",
                  "fields": [
                    {
                      "type": "fieldAccess",
                      "fieldName": "A_unique_users"
                    },
                    {
                      "type": "fieldAccess",
                      "fieldName": "B_unique_users"
                    }
                  ]
                }
              }
            ],
            "intervals": [
              "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
            ]
          }
          
          Show
          bslim slim bouguerra added a comment - For instance if you want to query for how many unique users visited both product A and B? the query to druid should look like { "queryType" : "groupBy" , "dataSource" : "test_datasource" , "granularity" : "ALL" , "dimensions" : [], "filter" : { "type" : "or" , "fields" : [ { "type" : "selector" , "dimension" : "product" , "value" : "A" }, { "type" : "selector" , "dimension" : "product" , "value" : "B" } ] }, "aggregations" : [ { "type" : "filtered" , "filter" : { "type" : "selector" , "dimension" : "product" , "value" : "A" }, "aggregator" : { "type" : "thetaSketch" , "name" : "A_unique_users" , "fieldName" : "user_id_sketch" } }, { "type" : "filtered" , "filter" : { "type" : "selector" , "dimension" : "product" , "value" : "B" }, "aggregator" : { "type" : "thetaSketch" , "name" : "B_unique_users" , "fieldName" : "user_id_sketch" } } ], "postAggregations" : [ { "type" : "thetaSketchEstimate" , "name" : "final_unique_users" , "field" : { "type" : "thetaSketchSetOp" , "name" : "final_unique_users_sketch" , "func" : "INTERSECT" , "fields" : [ { "type" : "fieldAccess" , "fieldName" : "A_unique_users" }, { "type" : "fieldAccess" , "fieldName" : "B_unique_users" } ] } } ], "intervals" : [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ] }
          Hide
          julianhyde Julian Hyde added a comment -

          I don't think that "user_unique" should appear in queries. They want the (approximate) number of distinct users, not the number of distinct "user_unique" values. "user_unique" is an implementation detail.

          slim bouguerra, "how many unique users visited both product A and B" is an interesting query. But let's first look at how you'd write that in SQL (hint: no "user_unique") then figure out how to map onto Druid.

          Since we're all meeting this afternoon, can we discuss then?

          Show
          julianhyde Julian Hyde added a comment - I don't think that "user_unique" should appear in queries. They want the (approximate) number of distinct users, not the number of distinct "user_unique" values. "user_unique" is an implementation detail. slim bouguerra , "how many unique users visited both product A and B" is an interesting query. But let's first look at how you'd write that in SQL (hint: no "user_unique") then figure out how to map onto Druid. Since we're all meeting this afternoon, can we discuss then?
          Hide
          bslim slim bouguerra added a comment -

          i will take back my first point i am sorry i have miss read the example, what i have in mind was queries like how many unique users visited both product A and B which i guess in SQL does not translates

           SELECT COUNT(DISTINCT "user_unique") FROM "foodmart" WHERE "store_city" = 'Chicago' AND "store_city" = 'Seattle';  
          Show
          bslim slim bouguerra added a comment - i will take back my first point i am sorry i have miss read the example, what i have in mind was queries like how many unique users visited both product A and B which i guess in SQL does not translates SELECT COUNT(DISTINCT "user_unique" ) FROM "foodmart" WHERE "store_city" = 'Chicago' AND "store_city" = 'Seattle';
          Hide
          julianhyde Julian Hyde added a comment - - edited

          Both of the following queries are valid SQL and we should push them down to Druid and use theta sketches (or HLL, whichever is available):

          select count(distinct "customer_id")
          from "foodmart"
          where "store_city" in ('Chicago', 'Seattle')
          
          select count(distinct "customer_id") filter (where "store_city" in ('Chicago', 'Seattle'))
          from "foodmart"
          

          That's the what. Now let's figure out the how.

          Show
          julianhyde Julian Hyde added a comment - - edited Both of the following queries are valid SQL and we should push them down to Druid and use theta sketches (or HLL, whichever is available): select count(distinct "customer_id" ) from "foodmart" where "store_city" in ('Chicago', 'Seattle') select count(distinct "customer_id" ) filter (where "store_city" in ('Chicago', 'Seattle')) from "foodmart" That's the what. Now let's figure out the how.
          Hide
          julianhyde Julian Hyde added a comment -

          Zain Humayun, I reviewed your pull request. I think we would be making a mistake to allow count(distinct sketchColumn) in SQL; the syntax should be count(distinct column) and Calcite should automatically rewrite to a sketch if possible. In particular the 3 test cases https://github.com/apache/calcite/pull/455/files#diff-88c776102776d0dfedaf74e4be27854fR2126 are

          select count(distinct \"user_unique\") as users from \"wiki\"
          select count(distinct \"added\") as \"added\" from \"wiki\"
          select count(distinct \"user_unique\") as users from \"foodmart\"
          

          but I think they should be

          select count(distinct \"user_id\") as users from \"wiki\"
          select count(distinct \"added\") as \"added\" from \"wiki\"
          select count(distinct \"customer_id\") as users from \"foodmart\"
          

          This means adding a "user_id" column to the "wiki" table, and Druid (or Calcite's Druid adapter) needs to know that the "user_unique" sketch is a "hyperUnique" sketch of "user_id".

          It is possible that "user_id" is never used directly; I'm no expert on designing Druid schemas, but I imagine it will use fewer resources if it is declared as a dimension rather than a metric.

          In "foodmart", the "customer_id" column already exists; we will need to make the Druid adapter know that "user_unique" is a "thetaSketch" of "customer_id".

          Lastly, all of these rewrites would be enabled only if the approximateDistinctCount property (added in CALCITE-1587) is true. Thus sql(sql) in your tests becomes something like this:

          CalciteAssert.that()
                  .enable(enabled())
                 .with(ImmutableMap.of("model", FOODMART.getPath()))
                 .with(CalciteConnectionProperty.APPROXIMATE_DISTINCT_COUNT.name(), true)
                 .query(sql)
          Show
          julianhyde Julian Hyde added a comment - Zain Humayun , I reviewed your pull request. I think we would be making a mistake to allow count(distinct sketchColumn) in SQL; the syntax should be count(distinct column) and Calcite should automatically rewrite to a sketch if possible. In particular the 3 test cases https://github.com/apache/calcite/pull/455/files#diff-88c776102776d0dfedaf74e4be27854fR2126 are select count(distinct \"user_unique\") as users from \"wiki\" select count(distinct \"added\") as \"added\" from \"wiki\" select count(distinct \"user_unique\") as users from \"foodmart\" but I think they should be select count(distinct \"user_id\") as users from \"wiki\" select count(distinct \"added\") as \"added\" from \"wiki\" select count(distinct \"customer_id\") as users from \"foodmart\" This means adding a "user_id" column to the "wiki" table, and Druid (or Calcite's Druid adapter) needs to know that the "user_unique" sketch is a "hyperUnique" sketch of "user_id". It is possible that "user_id" is never used directly; I'm no expert on designing Druid schemas, but I imagine it will use fewer resources if it is declared as a dimension rather than a metric. In "foodmart", the "customer_id" column already exists; we will need to make the Druid adapter know that "user_unique" is a "thetaSketch" of "customer_id". Lastly, all of these rewrites would be enabled only if the approximateDistinctCount property (added in CALCITE-1587 ) is true. Thus sql(sql) in your tests becomes something like this: CalciteAssert.that() .enable(enabled()) .with(ImmutableMap.of( "model" , FOODMART.getPath())) .with(CalciteConnectionProperty.APPROXIMATE_DISTINCT_COUNT.name(), true ) .query(sql)
          Hide
          joshwalters Joshua Walters added a comment - - edited

          Julian Hyde There is a problem with this approach due to how Druid schema is usually designed. This link explains this: http://druid.io/docs/latest/ingestion/schema-design.html#high-cardinality-dimensions-e-g-unique-ids

          In Druid, you don't want to store very high cardinality columns like user_id as dimensions, you want to store those as aggregates (sketches). This is because for each distinct dimension combination, there will be a rollup stored in Druid. If you have a dimension column with cardinality in the billions, then Druid will have to store billions of rows. In practice, if a dimension has a cardinality above a few hundred thousand in Druid, it should be a metric.

          In summary, if you have a column like user_id in Druid, you store it only as a metric, and never as a dimension. You can't filter on it, it can only be an output metric.

          Show
          joshwalters Joshua Walters added a comment - - edited Julian Hyde There is a problem with this approach due to how Druid schema is usually designed. This link explains this: http://druid.io/docs/latest/ingestion/schema-design.html#high-cardinality-dimensions-e-g-unique-ids In Druid, you don't want to store very high cardinality columns like user_id as dimensions, you want to store those as aggregates (sketches). This is because for each distinct dimension combination, there will be a rollup stored in Druid. If you have a dimension column with cardinality in the billions, then Druid will have to store billions of rows. In practice, if a dimension has a cardinality above a few hundred thousand in Druid, it should be a metric. In summary, if you have a column like user_id in Druid, you store it only as a metric, and never as a dimension. You can't filter on it, it can only be an output metric.
          Hide
          julianhyde Julian Hyde added a comment -

          Well, maybe in that case we should make the column "abstract". (For some definition of the word "abstract".) SQL's model is relational, so the experience to the user needs to be consistent with them querying a table with N rows of data. Even if those rows may exist only in summary, sampled or otherwise approximated form. (And may have different summaries tomorrow.)

          Show
          julianhyde Julian Hyde added a comment - Well, maybe in that case we should make the column "abstract". (For some definition of the word "abstract".) SQL's model is relational, so the experience to the user needs to be consistent with them querying a table with N rows of data. Even if those rows may exist only in summary, sampled or otherwise approximated form. (And may have different summaries tomorrow.)
          Hide
          julianhyde Julian Hyde added a comment -

          Another thought: would it be possible for the "user_id" column to only exist in Calcite's Druid adapter, not in Druid?

          Show
          julianhyde Julian Hyde added a comment - Another thought: would it be possible for the "user_id" column to only exist in Calcite's Druid adapter, not in Druid?
          Hide
          joshwalters Joshua Walters added a comment - - edited

          The sketch column (theta, HLL, etc) has to exist in Druid as a metric column, Druid needs it when building the segment files.

          We could make the column abstract, the problem is how should the user be informed that this column can't be used for certain things. You can't filter on user_id = 123 for example, or get distinct value with DISTINCT user_id, but you can do COUNT(DISTINCT user_id). Eventually you could also do intersections and other set operations.

          The problem is that this column is a pre-computed binary metric.

          It almost seems like a UDF concept would be best here. The column is a binary type, which would correctly restrict usage in SQL from filters. If we could register a UDF to do COUNT DISTINCT or SET INTERSECTION we wouldn't have to muddle with the syntax of SQL. But then there is the overhead of having to build and register UDFs.

          Edit: Also, if we have a UDF concept, then computation could also be performed locally if the results are pulled in.

          Edit: Now that I think about it, a UDF might pose several more problems. I don't think there is a way to get the raw binary result from Druid, only the COUNT DISTINCT estimate.

          Show
          joshwalters Joshua Walters added a comment - - edited The sketch column (theta, HLL, etc) has to exist in Druid as a metric column, Druid needs it when building the segment files. We could make the column abstract, the problem is how should the user be informed that this column can't be used for certain things. You can't filter on user_id = 123 for example, or get distinct value with DISTINCT user_id , but you can do COUNT(DISTINCT user_id) . Eventually you could also do intersections and other set operations. The problem is that this column is a pre-computed binary metric. It almost seems like a UDF concept would be best here. The column is a binary type, which would correctly restrict usage in SQL from filters. If we could register a UDF to do COUNT DISTINCT or SET INTERSECTION we wouldn't have to muddle with the syntax of SQL. But then there is the overhead of having to build and register UDFs. Edit: Also, if we have a UDF concept, then computation could also be performed locally if the results are pulled in. Edit: Now that I think about it, a UDF might pose several more problems. I don't think there is a way to get the raw binary result from Druid, only the COUNT DISTINCT estimate.
          Hide
          joshwalters Joshua Walters added a comment -

          Another possibility, we could just do a column rename/mapping in the Calcite layer. user_unique could be mapped to user_id, Calcite would know that user_id is actually user_unique, and that this is a sketch column. There would have to be some way of letting Calcite know that the column is renamed.

          If a filter is performed it will fail with operation not supported. Would this be acceptable?

          Show
          joshwalters Joshua Walters added a comment - Another possibility, we could just do a column rename/mapping in the Calcite layer. user_unique could be mapped to user_id , Calcite would know that user_id is actually user_unique , and that this is a sketch column. There would have to be some way of letting Calcite know that the column is renamed. If a filter is performed it will fail with operation not supported. Would this be acceptable?
          Hide
          bslim slim bouguerra added a comment -

          am wondering what the renaming will buy us ?

          Show
          bslim slim bouguerra added a comment - am wondering what the renaming will buy us ?
          Hide
          joshwalters Joshua Walters added a comment -

          The new user_id column would still be a sketch metric and it would be limited by the relevant type restrictions. It would be a superficial change.

          Show
          joshwalters Joshua Walters added a comment - The new user_id column would still be a sketch metric and it would be limited by the relevant type restrictions. It would be a superficial change.
          Hide
          zhumayun Zain Humayun added a comment -

          Joshua Walters I think the rename/mapping is essentially the same idea as Julian Hyde had with only having a "user_id" column in the Druid Adapter. That probably seems like the cleanest solution so far. I'm wondering how calcite will become aware of such a column. Perhaps the model definition?

          Show
          zhumayun Zain Humayun added a comment - Joshua Walters I think the rename/mapping is essentially the same idea as Julian Hyde had with only having a "user_id" column in the Druid Adapter. That probably seems like the cleanest solution so far. I'm wondering how calcite will become aware of such a column. Perhaps the model definition?
          Hide
          bslim slim bouguerra added a comment -

          my 2 cents.
          I think renaming is adding some complexity and the outcome is similar-ish to if we leave it as it is.
          I think going the route of udf is better as you can see per the sketch-hive docs https://datasketches.github.io/docs/Theta/ThetaHiveUDFs.html it is treated as UDF am assuming SQL/Hive users at Yahoo are already using those function so making calcite inline with this syntax will make perfect sense to me.

          Show
          bslim slim bouguerra added a comment - my 2 cents. I think renaming is adding some complexity and the outcome is similar-ish to if we leave it as it is. I think going the route of udf is better as you can see per the sketch-hive docs https://datasketches.github.io/docs/Theta/ThetaHiveUDFs.html it is treated as UDF am assuming SQL/Hive users at Yahoo are already using those function so making calcite inline with this syntax will make perfect sense to me.
          Hide
          julianhyde Julian Hyde added a comment -

          Regarding slim bouguerra's proposal to use user-defined aggregate functions. The experience for the end user wouldn't be quite as pleasant: the tool would have to know about the aggregate function, and also know about which sketch columns are available. But I wouldn't object to it.

          My position remains that sketches are an implementation detail, and that if you include them in the query the model is no longer declarative. It's exactly analogous to requiring users to rewrite their queries to reference the hidden table and columns that store a b-tree index if they want to use that index in a query. So the abstract "user_id" column, and a mapping onto its "user_unique" sketch column applied automatically by the planner, would still be the ideal solution.

          Show
          julianhyde Julian Hyde added a comment - Regarding slim bouguerra 's proposal to use user-defined aggregate functions. The experience for the end user wouldn't be quite as pleasant: the tool would have to know about the aggregate function, and also know about which sketch columns are available. But I wouldn't object to it. My position remains that sketches are an implementation detail, and that if you include them in the query the model is no longer declarative. It's exactly analogous to requiring users to rewrite their queries to reference the hidden table and columns that store a b-tree index if they want to use that index in a query. So the abstract "user_id" column, and a mapping onto its "user_unique" sketch column applied automatically by the planner, would still be the ideal solution.
          Hide
          joshwalters Joshua Walters added a comment -

          slim bouguerra: Yes, our Hive users use the UDFs for sketches, so they would be familiar with that approach.

          Julian Hyde: Both approaches sound reasonable, we can move forward with whichever you prefer.

          Just to define the scope of the "alias" solution (so we know what to develop):

          Taking this example: https://calcite.apache.org/docs/druid_adapter.html

          There is a column user, which contains user ids. It is converted to a metric and renamed to user_unique on Druid ingestion. What if instead of this, it is converted to a metric but it keeps the same name user. Its type would still be hyperUnique, but as a hyperUnique column it can only be used in projections (with some aggregate functions like COUNT DISTINCT). If it is used in a filter or a group by, it would throw an error as the user column does not support such actions.

          Would this work? Or am I misunderstanding what you are looking for?

          Show
          joshwalters Joshua Walters added a comment - slim bouguerra : Yes, our Hive users use the UDFs for sketches, so they would be familiar with that approach. Julian Hyde : Both approaches sound reasonable, we can move forward with whichever you prefer. Just to define the scope of the "alias" solution (so we know what to develop): Taking this example: https://calcite.apache.org/docs/druid_adapter.html There is a column user , which contains user ids. It is converted to a metric and renamed to user_unique on Druid ingestion. What if instead of this, it is converted to a metric but it keeps the same name user . Its type would still be hyperUnique , but as a hyperUnique column it can only be used in projections (with some aggregate functions like COUNT DISTINCT ). If it is used in a filter or a group by, it would throw an error as the user column does not support such actions. Would this work? Or am I misunderstanding what you are looking for?
          Hide
          julianhyde Julian Hyde added a comment - - edited

          The model currently has "dimensions" and "metrics" collections, and "user" currently belongs to "dimensions". I was thinking of removing "user" from "dimensions" and adding it to a new collection "abstractMetrics":

            "abstractMetrics": [
              "user"
            ]
          

          "userUnique" continues to be a metric:

            {
              "name" : "user_unique",
              "type" : "hyperUnique",
              "fieldName" : "user"
            }
          

          Then if you write "select count(distinct user) ..." the adapter will use the "user_unique" sketch in the background; if you write "select user ..." the adapter will give an error because the raw values of "user" are not stored.

          If in future you add a histogram sketch:

            {
              "name" : "user_histogram",
              "type" : "histogram",
              "fieldName" : "user"
            }
          

          then you will be able to do approximate median queries: "select count(distinct user), median(user), count(user) filter (where user > 50), ..." and the adapter will use the appropriate histogram for each aggregate function.

          But notice that "user_histogram" and "user_unique" do not appear in queries. (You could use them in queries, but why would you?)

          Show
          julianhyde Julian Hyde added a comment - - edited The model currently has "dimensions" and "metrics" collections, and "user" currently belongs to "dimensions". I was thinking of removing "user" from "dimensions" and adding it to a new collection "abstractMetrics": "abstractMetrics" : [ "user" ] "userUnique" continues to be a metric: { "name" : "user_unique" , "type" : "hyperUnique" , "fieldName" : "user" } Then if you write "select count(distinct user) ..." the adapter will use the "user_unique" sketch in the background; if you write "select user ..." the adapter will give an error because the raw values of "user" are not stored. If in future you add a histogram sketch: { "name" : "user_histogram" , "type" : "histogram" , "fieldName" : "user" } then you will be able to do approximate median queries: "select count(distinct user), median(user), count(user) filter (where user > 50), ..." and the adapter will use the appropriate histogram for each aggregate function. But notice that "user_histogram" and "user_unique" do not appear in queries. (You could use them in queries, but why would you?)
          Hide
          bslim slim bouguerra added a comment -

          +1 for the idea of abstract metric or what we call in druid complex metric.
          Are we saying that for this to work the druid user has to follow this naming convention for columns?
          Does this still work if we have multiple sketches for user ? (it is pretty common use case where the user is tracked via multiple streams hence multiple sketches)
          How calcite will be able to know the details about whether this sketch can be used as a histogram or count ?
          Keep in mind that hyperUnique like Theta-sketches or Quantile-Histogram are UDFs so we can have different UDFs that does the same thing in the same table where each UDF has its own API and capabilities.
          As an example Theta-Sketches (Yahoo sketches) and druid HLL can be used to compute unique user estimate but T-Sketch can do intersection/subtract/union while HLL can only do union.

          Show
          bslim slim bouguerra added a comment - +1 for the idea of abstract metric or what we call in druid complex metric. Are we saying that for this to work the druid user has to follow this naming convention for columns? Does this still work if we have multiple sketches for user ? (it is pretty common use case where the user is tracked via multiple streams hence multiple sketches) How calcite will be able to know the details about whether this sketch can be used as a histogram or count ? Keep in mind that hyperUnique like Theta-sketches or Quantile-Histogram are UDFs so we can have different UDFs that does the same thing in the same table where each UDF has its own API and capabilities. As an example Theta-Sketches (Yahoo sketches) and druid HLL can be used to compute unique user estimate but T-Sketch can do intersection/subtract/union while HLL can only do union.
          Hide
          julianhyde Julian Hyde added a comment -

          +1 for the idea of abstract metric or what we call in druid complex metric.

          OK, let's call this collection "complexMetrics".

          Are we saying that for this to work the druid user has to follow this naming convention for columns?

          If by "druid user" you mean someone writing Druid JSON queries, they would ignore the "user" complex metric and write their queries in terms of the sketches "user_unique" etc.

          The person writing SQL would usually reference "user" in their SQL query. They could reference "user_unique" and "user_histogram" in their query but they would be VARBINARY values so there's not much they can do with them.

          Does this still work if we have multiple sketches for user ? (it is pretty common use case where the user is tracked via multiple streams hence multiple sketches)

          Yes. I'm trying to make it easier to have multiple sketches for "user". And also to be able to add / remove sketches without re-writing the queries.

          How calcite will be able to know the details about whether this sketch can be used as a histogram or count ?

          The mapping

            {
              "name" : "user_unique",
              "type" : "hyperUnique",
              "fieldName" : "user"
            }
          

          provides sufficient information for the planner to write approximate count(distinct user) to use the Druid's hyperUnique aggregator.

          Keep in mind that hyperUnique like Theta-sketches or Quantile-Histogram are UDFs so we can have different UDFs that does the same thing in the same table where each UDF has its own API and capabilities. As an example Theta-Sketches (Yahoo sketches) and druid HLL can be used to compute unique user estimate but T-Sketch can do intersection/subtract/union while HLL can only do union.

          The Calcite's planner will only be able to write UDFs that it is aware of. For this release, that will be "hyperUnique" and "thetaSketch". We can add more but that will require a (small) code change to Druid adapter.

          Show
          julianhyde Julian Hyde added a comment - +1 for the idea of abstract metric or what we call in druid complex metric. OK, let's call this collection "complexMetrics". Are we saying that for this to work the druid user has to follow this naming convention for columns? If by "druid user" you mean someone writing Druid JSON queries, they would ignore the "user" complex metric and write their queries in terms of the sketches "user_unique" etc. The person writing SQL would usually reference "user" in their SQL query. They could reference "user_unique" and "user_histogram" in their query but they would be VARBINARY values so there's not much they can do with them. Does this still work if we have multiple sketches for user ? (it is pretty common use case where the user is tracked via multiple streams hence multiple sketches) Yes. I'm trying to make it easier to have multiple sketches for "user". And also to be able to add / remove sketches without re-writing the queries. How calcite will be able to know the details about whether this sketch can be used as a histogram or count ? The mapping { "name" : "user_unique", "type" : "hyperUnique", "fieldName" : "user" } provides sufficient information for the planner to write approximate count(distinct user) to use the Druid's hyperUnique aggregator. Keep in mind that hyperUnique like Theta-sketches or Quantile-Histogram are UDFs so we can have different UDFs that does the same thing in the same table where each UDF has its own API and capabilities. As an example Theta-Sketches (Yahoo sketches) and druid HLL can be used to compute unique user estimate but T-Sketch can do intersection/subtract/union while HLL can only do union. The Calcite's planner will only be able to write UDFs that it is aware of. For this release, that will be "hyperUnique" and "thetaSketch". We can add more but that will require a (small) code change to Druid adapter.
          Hide
          zhumayun Zain Humayun added a comment -

          Looks good to me. I've closed my PR for now, and i'll open a new one once I get this implemented and CALCITE-1819 is ready for a PR. How should calcite handle the case when two or more sketches point to the same fieldName? Julian Hyde In your above example, you've removed "user" from the dimension column, would it still be "allowed" to be in "dimensions" of a valid model definition?

          Show
          zhumayun Zain Humayun added a comment - Looks good to me. I've closed my PR for now, and i'll open a new one once I get this implemented and CALCITE-1819 is ready for a PR. How should calcite handle the case when two or more sketches point to the same fieldName? Julian Hyde In your above example, you've removed "user" from the dimension column, would it still be "allowed" to be in "dimensions" of a valid model definition?
          Hide
          julianhyde Julian Hyde added a comment -

          Sure, you can have "user" as a dimension, in which case you can query it in SQL, but (as I understand it) incur extra storage cost in Druid.

          Or you can make "user" it a "complexMetric", in which case you can query aggregates of "user" but not "user" directly.

          But either way, from a SQL perspective, "user" is a column.

          Show
          julianhyde Julian Hyde added a comment - Sure, you can have "user" as a dimension, in which case you can query it in SQL, but (as I understand it) incur extra storage cost in Druid. Or you can make "user" it a "complexMetric", in which case you can query aggregates of "user" but not "user" directly. But either way, from a SQL perspective, "user" is a column.
          Hide
          zhumayun Zain Humayun added a comment -

          Recap and some implementation questions:

          Columns of type thetaSketch/hyperUnique should be moved from the "metrics" field to a new "complexMetrics" field from the model definition.
          Each complex metric will have the form:

          {
            "name" : <name used in SQL statements>,
            "type" : <type>,
            "meticName" : <name of underlying metric in Druid>
          }
          

          This data will be saved into DruidTable. Note: while this information will be provided by model definitions, calcite will have to rename any sketch columns in the meta data query (when the model definition isn't available).

          Calcite should reject any SQL statements that use the complex metrics in correctly. Ideally, complex metrics should be able to indicate to validation code what kind of statements they can be used in. Any ideas on the best way to do so? Where is the best place to interrupt the validation process and check for this kind of condition? At that point, we'll also need access to the DruidTable because it will hold the information about the columns.

          Once validation has finished, DruidQuery will be responsible for figuring out that the actual column (sketch column) is based on the name and context in which it's used.

          I believe the most complicated part of this will be validation. Do you have any general suggestions on where to start? I'm not very familiar with the calcite-core code. Thanks.

          Show
          zhumayun Zain Humayun added a comment - Recap and some implementation questions: Columns of type thetaSketch/hyperUnique should be moved from the "metrics" field to a new "complexMetrics" field from the model definition. Each complex metric will have the form: { "name" : <name used in SQL statements>, "type" : <type>, "meticName" : <name of underlying metric in Druid> } This data will be saved into DruidTable. Note: while this information will be provided by model definitions, calcite will have to rename any sketch columns in the meta data query (when the model definition isn't available). Calcite should reject any SQL statements that use the complex metrics in correctly. Ideally, complex metrics should be able to indicate to validation code what kind of statements they can be used in. Any ideas on the best way to do so? Where is the best place to interrupt the validation process and check for this kind of condition? At that point, we'll also need access to the DruidTable because it will hold the information about the columns. Once validation has finished, DruidQuery will be responsible for figuring out that the actual column (sketch column) is based on the name and context in which it's used. I believe the most complicated part of this will be validation. Do you have any general suggestions on where to start? I'm not very familiar with the calcite-core code. Thanks.
          Hide
          julianhyde Julian Hyde added a comment -

          I would remove the "metricName" field.

          Consider the case of the "user" field. We would allow it to be queried via "count(distinct user)" (if the "hyperUnique" metric exists) and maybe via "where user > 1000" (if some kind of histogram sketch exists) but we would not allow people to write "select user from table" because we do not store the raw data for "user".

          So, the "user" field is virtual. You can query certain expressions derived from "user", but you cannot query it itself. That's why I would remove the "metricName" field. A complex metric isn't derived from any other metric.

          I know a virtual field is difficult concept for people to get their heads around. But it creates a greater simplicity, because it means we are presenting the data via the relational model. (Even though the relational data, the original rows and columns, has been discarded.)

          If someone tried to execute "select user from table", I would imagine that the adapter would throw. But if people would prefer that "user" evaluates to some expression, say 0, I could support that too.

          Show
          julianhyde Julian Hyde added a comment - I would remove the "metricName" field. Consider the case of the "user" field. We would allow it to be queried via "count(distinct user)" (if the "hyperUnique" metric exists) and maybe via "where user > 1000" (if some kind of histogram sketch exists) but we would not allow people to write "select user from table" because we do not store the raw data for "user". So, the "user" field is virtual. You can query certain expressions derived from "user", but you cannot query it itself. That's why I would remove the "metricName" field. A complex metric isn't derived from any other metric. I know a virtual field is difficult concept for people to get their heads around. But it creates a greater simplicity, because it means we are presenting the data via the relational model. (Even though the relational data, the original rows and columns, has been discarded.) If someone tried to execute "select user from table", I would imagine that the adapter would throw. But if people would prefer that "user" evaluates to some expression, say 0, I could support that too.
          Hide
          zhumayun Zain Humayun added a comment - - edited

          wouldn't the "metricName" field need to be there to tell calcite which sketch to refer to? For example,

          {
            "name" : "user",
            "type" : "hyperUnique",
            "meticName" : "user_unique"
          }
          

          "user_unique" would not be exposed to the person writing the query, and an error would be given if they tried to use it in a query. But, when "user" is used correctly in a statement, say, count(distinct user), then the druid adapter will know to use the "user_unique" column under the hood. In this case, "user" is not actually a column in Druid anywhere, and "user_unique" is defined as a metric in the Druid ingestion spec. Also, is there a specific existing exception the adapter should throw?

          Show
          zhumayun Zain Humayun added a comment - - edited wouldn't the "metricName" field need to be there to tell calcite which sketch to refer to? For example, { "name" : "user", "type" : "hyperUnique", "meticName" : "user_unique" } "user_unique" would not be exposed to the person writing the query, and an error would be given if they tried to use it in a query. But, when "user" is used correctly in a statement, say, count(distinct user), then the druid adapter will know to use the "user_unique" column under the hood. In this case, "user" is not actually a column in Druid anywhere, and "user_unique" is defined as a metric in the Druid ingestion spec. Also, is there a specific existing exception the adapter should throw?
          Hide
          julianhyde Julian Hyde added a comment -

          I can't think of a case where "name" and "metricName" would be different for a complex metric. (Unlike a regular metric, where there might be several metrics that are different aggregate functions applied to the same argument: "select sum(amount) as sum_amount, min(amount) as min_amount, hyperUnique(amount) as hyper_amount, ...".)

          Show
          julianhyde Julian Hyde added a comment - I can't think of a case where "name" and "metricName" would be different for a complex metric. (Unlike a regular metric, where there might be several metrics that are different aggregate functions applied to the same argument: "select sum(amount) as sum_amount, min(amount) as min_amount, hyperUnique(amount) as hyper_amount, ...".)
          Hide
          zhumayun Zain Humayun added a comment -
          Show
          zhumayun Zain Humayun added a comment - Ok, it's been a while but i've submitted my updated PRs: https://github.com/apache/calcite/pull/503 https://github.com/vlsi/calcite-test-dataset/pull/22
          Hide
          zhumayun Zain Humayun added a comment -

          Julian Hyde, slim bouguerra, orJesus Camacho Rodriguez can you please review when you get time? Thanks

          Show
          zhumayun Zain Humayun added a comment - Julian Hyde , slim bouguerra , or Jesus Camacho Rodriguez can you please review when you get time? Thanks
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Zain Humayun, I will take a look at this today. Thanks

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Zain Humayun , I will take a look at this today. Thanks
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -
          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Pushed in http://git-wip-us.apache.org/repos/asf/calcite/commit/025eaf1 . Thanks for your contribution Zain Humayun !
          Hide
          zhumayun Zain Humayun added a comment -

          Jesus Camacho Rodriguez, it appears that mvn site fails because of my changes. The culprit seems to be Table.java line 77.

          {@param call}

          should be changed to

          {@code call}

          or <code>call</code> (not sure which style the Calcite community prefers). Do you want me to make another PR fixing this? Another JIRA ticket?

          Show
          zhumayun Zain Humayun added a comment - Jesus Camacho Rodriguez , it appears that mvn site fails because of my changes. The culprit seems to be Table.java line 77. {@param call} should be changed to {@code call} or <code>call</code> (not sure which style the Calcite community prefers). Do you want me to make another PR fixing this? Another JIRA ticket?
          Hide
          jcamachorodriguez Jesus Camacho Rodriguez added a comment -

          Thanks for catching this one Zain Humayun. I have just pushed the fix in http://git-wip-us.apache.org/repos/asf/calcite/commit/ea4095a .

          Show
          jcamachorodriguez Jesus Camacho Rodriguez added a comment - Thanks for catching this one Zain Humayun . I have just pushed the fix in http://git-wip-us.apache.org/repos/asf/calcite/commit/ea4095a .
          Hide
          michaelmior Michael Mior added a comment -

          Resolved in release 1.14.0 (2017-10-01)

          Show
          michaelmior Michael Mior added a comment - Resolved in release 1.14.0 (2017-10-01)

            People

            • Assignee:
              zhumayun Zain Humayun
              Reporter:
              zhumayun Zain Humayun
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development