Apache Drill
  1. Apache Drill
  2. DRILL-13

Storage Engine: Define Java Interface

    Details

    • Type: Task Task
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:
      None

      Description

      We're going to need to define a storage engine API. At a minimum, we'll need to generate a Java one. We will probably need to also create a CPP one. This task is for the former. Things that are likely to be included in a the Java interface are: reader (scanner), writer, capabilities interface, schema interface, statistics interface, data layout and ordering

        Issue Links

          Activity

          Hide
          David Alves added a comment -

          Julian: I misinterpreted your comment. You're basically saying that we can add the bloom filter stuff for the iface later, on some other JIRA.

          I guess that is fair enough, although in the case that I'm suggesting the join operator gets broken down into two portions one above the SE layer that is common to any store that uses bloom filters on one of the joined fields (if the bloom filter is kept opaque) and one portion below the SE layer that is specific to each SE.

          Show
          David Alves added a comment - Julian: I misinterpreted your comment. You're basically saying that we can add the bloom filter stuff for the iface later, on some other JIRA. I guess that is fair enough, although in the case that I'm suggesting the join operator gets broken down into two portions one above the SE layer that is common to any store that uses bloom filters on one of the joined fields (if the bloom filter is kept opaque) and one portion below the SE layer that is specific to each SE.
          Hide
          Timothy Chen added a comment -

          I have the same question as David, which is where does this storage engine fit and its input/output.
          Does the storage engine only optimizes logical plan, or part of the logical plan -> physical plan resolution, or merely executes a physical plan?

          Show
          Timothy Chen added a comment - I have the same question as David, which is where does this storage engine fit and its input/output. Does the storage engine only optimizes logical plan, or part of the logical plan -> physical plan resolution, or merely executes a physical plan?
          Hide
          Julian Hyde added a comment -

          To expand on Jacques' comments. It's interesting to declare that a storage engine supports bloom filters, but I don't think it's useful, because each engine functions so differently. I think the code to push down bloom filters would have to be written separately for each engine. The way I'd solve it for Optiq would be to have a bloom filter rule for each engine.

          To put it another way. The real use cases are "I want Drill to use bloom filters to work against HBase" and "I want Drill to use bloom filters to work against Cassandra". Log those as jira cases. Whoever fixes them can extend the storage engine interface if they see fit. But my hunch is that they won't.

          Show
          Julian Hyde added a comment - To expand on Jacques' comments. It's interesting to declare that a storage engine supports bloom filters, but I don't think it's useful, because each engine functions so differently. I think the code to push down bloom filters would have to be written separately for each engine. The way I'd solve it for Optiq would be to have a bloom filter rule for each engine. To put it another way. The real use cases are "I want Drill to use bloom filters to work against HBase" and "I want Drill to use bloom filters to work against Cassandra". Log those as jira cases. Whoever fixes them can extend the storage engine interface if they see fit. But my hunch is that they won't.
          Hide
          David Alves added a comment -

          Question: What gets pushed below the SE layer? Is that a rewritten logical plan that the SE will transform into something it can run? or is the plan in some other form (physical ?)?

          Because certain operators get broken down into a portion that executes below the SE and another that executes above the SE (like aggregation into partial aggregation + merge or joins into a local join + distributed join) it would make sense to make the plan passed to the SE the physical one, correct?

          Also the plan that is passed as argument to the SE should only include the relevant portions, correct ?(I mean the header and the ops being executed by the SE).

          Thoughts?

          Show
          David Alves added a comment - Question: What gets pushed below the SE layer? Is that a rewritten logical plan that the SE will transform into something it can run? or is the plan in some other form (physical ?)? Because certain operators get broken down into a portion that executes below the SE and another that executes above the SE (like aggregation into partial aggregation + merge or joins into a local join + distributed join) it would make sense to make the plan passed to the SE the physical one, correct? Also the plan that is passed as argument to the SE should only include the relevant portions, correct ?(I mean the header and the ops being executed by the SE). Thoughts?
          Hide
          David Alves added a comment -

          for instance bloom filters can be used to perform a semi-join phase before a distributed join.
          Since bloom filters are usually associated with partitions I think bloom filter capabilities would go along nicely with the partition information and as a sub-api of SE e.g.:

          interface PartitionInfo {
          
            public enum PartitioningType {
              NONE,ORDERED,RANDOM;
            }
            
            public enum BloomFitlerType {
              NONE,KEY,COMPOUND;
            }
          
            public PartitionType partitionType();
          
            public PartitionFunction function();
          
            public BloomFilterType bloomFilterType();
          
          }
          

          That being said, for this particular issue maybe we could try and focus on the main interfaces required and leave these details to other issues.

          Show
          David Alves added a comment - for instance bloom filters can be used to perform a semi-join phase before a distributed join. Since bloom filters are usually associated with partitions I think bloom filter capabilities would go along nicely with the partition information and as a sub-api of SE e.g.: interface PartitionInfo { public enum PartitioningType { NONE,ORDERED,RANDOM; } public enum BloomFitlerType { NONE,KEY,COMPOUND; } public PartitionType partitionType(); public PartitionFunction function(); public BloomFilterType bloomFilterType(); } That being said, for this particular issue maybe we could try and focus on the main interfaces required and leave these details to other issues.
          Hide
          Jacques Nadeau added a comment -

          I think we need to keep the concepts a bit more abstract. I've been struggling with the capabilities interface when you mentioned partial aggregation. After thinking about it further, I think we need to stay in the logical realm. However, we should have a scope capability to the capabilities. For example, a group-by logical capability could be by partition or global scope. As far as bloom filters go, can you share a possible optimization that you think would be valuable that couldn't be revealed as a higher level capability?

          Show
          Jacques Nadeau added a comment - I think we need to keep the concepts a bit more abstract. I've been struggling with the capabilities interface when you mentioned partial aggregation. After thinking about it further, I think we need to stay in the logical realm. However, we should have a scope capability to the capabilities. For example, a group-by logical capability could be by partition or global scope. As far as bloom filters go, can you share a possible optimization that you think would be valuable that couldn't be revealed as a higher level capability?
          Hide
          David Alves added a comment -

          I know this might be a bit too much detail for now but we should track bloom filter capabilities of the underlying engine e.g. HBase can have ROW and ROW_COL bloom filters and Cassandra Key bloom filters.
          Being able to know they exist, which points in the schema they pertain to and extract them from the underlying store is very interesting to optimize a series of use cases.

          Show
          David Alves added a comment - I know this might be a bit too much detail for now but we should track bloom filter capabilities of the underlying engine e.g. HBase can have ROW and ROW_COL bloom filters and Cassandra Key bloom filters. Being able to know they exist, which points in the schema they pertain to and extract them from the underlying store is very interesting to optimize a series of use cases.
          Hide
          David Alves added a comment -

          I like the rule advertising idea.

          I don't know how decisive the optional schemaless -ness will be for the main iface but it will definitely be for the capability sub-api. maybe that doesn't have to be completely set in stone before we make progress with this particular issue.

          Show
          David Alves added a comment - I like the rule advertising idea. I don't know how decisive the optional schemaless -ness will be for the main iface but it will definitely be for the capability sub-api. maybe that doesn't have to be completely set in stone before we make progress with this particular issue.
          Hide
          Lisen Mu added a comment -
          • about schema info

          Maybe its not the goal of drill to strong management of data schema, but at least for hbase, SE must know mapping info from record's field to rowkeys or CFs.

          Besides, partition info potentially requires part of the schema info, right?

          Show
          Lisen Mu added a comment - about schema info Maybe its not the goal of drill to strong management of data schema, but at least for hbase, SE must know mapping info from record's field to rowkeys or CFs. Besides, partition info potentially requires part of the schema info, right?
          Hide
          Lisen Mu added a comment -

          questions and thoughts, considering query across heterogenious SEs:

          • about push down

          It would be simpler if capability API can be declarative. but is it nessesary for SE to provide some stats info of current sub-DAG for drill to do further optimization or further execution?

          • about partition & distribution and query dispatch

          whats your thoughts about partition information?

          It is good if partition info can be declarative and unified too. the master drill daemon(who recieved query directly from the user) could use partition information of different SE to better optimize, aggregation over join for example, before dispatching subquery. but it seems more difficult than cappability api to make abstraction of data distribution: distributed-managed list-based (cassandra), centralized-managed list-based(hbase), simple rule-based(redis), user-handcrafted maybe rule-based (mysql)

          Show
          Lisen Mu added a comment - questions and thoughts, considering query across heterogenious SEs: about push down It would be simpler if capability API can be declarative. but is it nessesary for SE to provide some stats info of current sub-DAG for drill to do further optimization or further execution? about partition & distribution and query dispatch whats your thoughts about partition information? It is good if partition info can be declarative and unified too. the master drill daemon(who recieved query directly from the user) could use partition information of different SE to better optimize, aggregation over join for example, before dispatching subquery. but it seems more difficult than cappability api to make abstraction of data distribution: distributed-managed list-based (cassandra), centralized-managed list-based(hbase), simple rule-based(redis), user-handcrafted maybe rule-based (mysql)
          Hide
          Miguel Branco added a comment -

          I don't have concrete thoughts on this but a few points:

          • JOIN-pushdowns are useful, where the storage engine is smart enough to locally join data before shipping it up
          • Jacques mentioned incremental schema support. That's critical and I'm guessing it will be decisive to this interface.
          • Perhaps something in the lines of rule-based query rewrites would be useful. Engines advertise rules that get applied to the plan.
          Show
          Miguel Branco added a comment - I don't have concrete thoughts on this but a few points: JOIN-pushdowns are useful, where the storage engine is smart enough to locally join data before shipping it up The PostgreSQL foreign data wrappers API might be a useful reference: http://www.postgresql.org/docs/9.2/static/fdw-functions.html . This is work in progress but e.g. join-pushdowns were smtg being discussed Jacques mentioned incremental schema support. That's critical and I'm guessing it will be decisive to this interface. Perhaps something in the lines of rule-based query rewrites would be useful. Engines advertise rules that get applied to the plan.
          Hide
          David Alves added a comment -

          wrt to the storage engine and push down:

          I don't know if pushing logical plan rewriting into the storage engine impl is the best idea, but I definitely agree with the goal (i.e., "In a the ultimate case (select into in single system), the data would never actually leave the datastore/storage engine.").
          My main pain points are the following:

          • engines would have to be aware of more than they need wrt to logical plans
          • login plan processing logic would have to be replicated/used in multiple places
          • a simplification triggered by an engine could allow simplifications from other engines making us have to cycle through the intervening engines multiple times.
          • it's really an external capability of the SE itself (e.g. we wouldn't want to call a live SE proxy, making a remote call to simplify a plan)

          Do you see an issue in generalizing this logic in query pre-processor (not calling it an optimizer since there's really the one rule).
          I mean have something simple like having SE's return a StorageEngineCapability such as :

          public void HBaseStorageEngine {

          public Capabilities capabilities() {
          return new Capabilities() {

          public void List<Class<LogicalOperator>> internalProcessingAbility()

          { return ImmutableList.of(Filter.class,Project.class,PartialAggregation.class, Sink.class). }

          ...
          }
          }
          ...
          }

          and then have the query pre-processor push nodes in the DAG above the SE's scanner below it if they match the SE's ability.
          If all the datasources (leafs) are from the SE and the SE supports Sink and all the operators in between then we'd push the whole plan below the engine's scanner.

          Now if I'm over simplifying and there are cases that I haven't thought of where this doesn't work then I think we might want to have the engine provide a MyStorageEnginePreProcessor as the entity that does the modifications to the LogicalPlan.

          wdyt?

          Show
          David Alves added a comment - wrt to the storage engine and push down: I don't know if pushing logical plan rewriting into the storage engine impl is the best idea, but I definitely agree with the goal (i.e., "In a the ultimate case (select into in single system), the data would never actually leave the datastore/storage engine."). My main pain points are the following: engines would have to be aware of more than they need wrt to logical plans login plan processing logic would have to be replicated/used in multiple places a simplification triggered by an engine could allow simplifications from other engines making us have to cycle through the intervening engines multiple times. it's really an external capability of the SE itself (e.g. we wouldn't want to call a live SE proxy, making a remote call to simplify a plan) Do you see an issue in generalizing this logic in query pre-processor (not calling it an optimizer since there's really the one rule). I mean have something simple like having SE's return a StorageEngineCapability such as : public void HBaseStorageEngine { public Capabilities capabilities() { return new Capabilities() { public void List<Class<LogicalOperator>> internalProcessingAbility() { return ImmutableList.of(Filter.class,Project.class,PartialAggregation.class, Sink.class). } ... } } ... } and then have the query pre-processor push nodes in the DAG above the SE's scanner below it if they match the SE's ability. If all the datasources (leafs) are from the SE and the SE supports Sink and all the operators in between then we'd push the whole plan below the engine's scanner. Now if I'm over simplifying and there are cases that I haven't thought of where this doesn't work then I think we might want to have the engine provide a MyStorageEnginePreProcessor as the entity that does the modifications to the LogicalPlan. wdyt?
          Hide
          Jacques Nadeau added a comment -
          • Storage engines have local interface accessible within the Drill daemon that is colocated with the underlying system's daemons, correct?
          • specifically I mean that for NoSQL stores like Cassandra or HBase there will be a local daemon in each node that does inter-process communication with the underlying store and provides information on the local partitions so that the query planner can take that into account.

          >> Correct

          • We will need a meta store for ad-hoc schema matching/schema caching correct?
          • While Cassandra has a schema that is easy to use and read when queried with CQL3 we could probably use storing the schema of certain HBase tables so that the values in it can be returned in some form other than byte[]s, the user would be responsible for maintaining this.

          >> Yes(ish). Couple of things different than a traditional vision. 1) We want to support pure schemaless queries. E.g. point at hbase zk address and do select * from tablename. This means the entire infrastructure should work with minimal schema managment. (Especially true when you think of future sources like MongoDB.) 2) Our hope is that schema management comes out of views on top of schemaless queries (as opposed to something more like external table definitions) so that query-level data provenance is very clear for analysts. 3) When we do store schema data, hopefully we can leverage the hive metastore. (as well as support data sources that are already configured in hive metastore without much complexity).

          >>> RE: Push down. In the ideal world, I've been thinking that the storage engine interface should be able to receive a logical plan and return back an updated logical plan with a more complex scan node and a simplified plan. Basically the storage engine would use the opaque selection property to encapsulate removed logical nodes that the storage engine can take care of internally. The storage engine implementation would thus be internally responsible for converting the portions of the logical plan that it took over ownership of into whatever its native formats were. In a the ultimate case (select into in single system), the data would never actually leave the datastore/storage engine.

          Show
          Jacques Nadeau added a comment - Storage engines have local interface accessible within the Drill daemon that is colocated with the underlying system's daemons, correct? specifically I mean that for NoSQL stores like Cassandra or HBase there will be a local daemon in each node that does inter-process communication with the underlying store and provides information on the local partitions so that the query planner can take that into account. >> Correct We will need a meta store for ad-hoc schema matching/schema caching correct? While Cassandra has a schema that is easy to use and read when queried with CQL3 we could probably use storing the schema of certain HBase tables so that the values in it can be returned in some form other than byte[]s, the user would be responsible for maintaining this. >> Yes(ish). Couple of things different than a traditional vision. 1) We want to support pure schemaless queries. E.g. point at hbase zk address and do select * from tablename. This means the entire infrastructure should work with minimal schema managment. (Especially true when you think of future sources like MongoDB.) 2) Our hope is that schema management comes out of views on top of schemaless queries (as opposed to something more like external table definitions) so that query-level data provenance is very clear for analysts. 3) When we do store schema data, hopefully we can leverage the hive metastore. (as well as support data sources that are already configured in hive metastore without much complexity). >>> RE: Push down. In the ideal world, I've been thinking that the storage engine interface should be able to receive a logical plan and return back an updated logical plan with a more complex scan node and a simplified plan. Basically the storage engine would use the opaque selection property to encapsulate removed logical nodes that the storage engine can take care of internally. The storage engine implementation would thus be internally responsible for converting the portions of the logical plan that it took over ownership of into whatever its native formats were. In a the ultimate case (select into in single system), the data would never actually leave the datastore/storage engine.
          Hide
          David Alves added a comment - - edited

          My thoughts/doubts on this:

          Meta APIs:
          Agree with Statistics, Metadata but think Serialization and Deserialization should be encapsulated within the storage engine itself.

          Engine Api:

          • Takes over subtree of the query plan (probably of the physical plan when logical plan operators have been divided into partitions, for instance) based on the engine capability.
            ex. HBase can take filters, projections and partial aggregations, and push them down below HBase Scan operator.
            ex: if we add a RDBMS engine later we can rebuild portion of the plan that can be pushed down in SQL form and run as is.
          • Provides a Reader/Scanner interface for scanning the results of executing the subtree
          • Provides a Writer interface for outputting intermediate or final results.

          Questions/Doubts:

          • Storage engines have local interface accessible within the Drill daemon that is colocated with the underlying system's daemons, correct?
          • specifically I mean that for NoSQL stores like Cassandra or HBase there will be a local daemon in each node that does inter-process communication with the underlying store and provides information on the local partitions so that the query planner can take that into account.
          • We will need a meta store for ad-hoc schema matching/schema caching correct?
          • While Cassandra has a schema that is easy to use and read when queried with CQL3 we could probably use storing the schema of certain HBase tables so that the values in it can be returned in some form other than byte[]s, the user would be responsible for maintaining this.
          Show
          David Alves added a comment - - edited My thoughts/doubts on this: Meta APIs: Agree with Statistics, Metadata but think Serialization and Deserialization should be encapsulated within the storage engine itself. Engine Api: Takes over subtree of the query plan (probably of the physical plan when logical plan operators have been divided into partitions, for instance) based on the engine capability. ex. HBase can take filters, projections and partial aggregations, and push them down below HBase Scan operator. ex: if we add a RDBMS engine later we can rebuild portion of the plan that can be pushed down in SQL form and run as is. Provides a Reader/Scanner interface for scanning the results of executing the subtree Provides a Writer interface for outputting intermediate or final results. Questions/Doubts: Storage engines have local interface accessible within the Drill daemon that is colocated with the underlying system's daemons, correct? specifically I mean that for NoSQL stores like Cassandra or HBase there will be a local daemon in each node that does inter-process communication with the underlying store and provides information on the local partitions so that the query planner can take that into account. We will need a meta store for ad-hoc schema matching/schema caching correct? While Cassandra has a schema that is easy to use and read when queried with CQL3 we could probably use storing the schema of certain HBase tables so that the values in it can be returned in some form other than byte[]s, the user would be responsible for maintaining this.
          Hide
          Jacques Nadeau added a comment -

          New re-thoughts...

          The primary interface is the Storage Engine Capabilities API. It should describe everything that the particular storage engine supports. This includes whether the storage engine supports serialization, deserialization, what types of logical operator capabilities it supports internally. It also needs to include a description of statistics capabilities (e.g. supports approximate row keys, average row size, total data size, data distribution statistics, etc) and metadata capabilities

          Statistics API: Provide the actual statistics information that is utilized during query planning.
          Metadata API: Provide information about the available sub data sources (tables, keyspaces, etc) along with locality information, schema information, type information, primary and secondary indices types, partitioning information, etc. Portions of this information are used in query parsing. Others in query planning. Others portions in Execution planning.
          DeserializationAPI - Convert a particular data source into one of our two canonical in-memory formats. (row-based or column-based). Additionally support particular types of logical operation pushdown.
          Serialization - Serialize the in-memory format back into the persistent storage format.

          Show
          Jacques Nadeau added a comment - New re-thoughts... The primary interface is the Storage Engine Capabilities API. It should describe everything that the particular storage engine supports. This includes whether the storage engine supports serialization, deserialization, what types of logical operator capabilities it supports internally. It also needs to include a description of statistics capabilities (e.g. supports approximate row keys, average row size, total data size, data distribution statistics, etc) and metadata capabilities Statistics API: Provide the actual statistics information that is utilized during query planning. Metadata API: Provide information about the available sub data sources (tables, keyspaces, etc) along with locality information, schema information, type information, primary and secondary indices types, partitioning information, etc. Portions of this information are used in query parsing. Others in query planning. Others portions in Execution planning. DeserializationAPI - Convert a particular data source into one of our two canonical in-memory formats. (row-based or column-based). Additionally support particular types of logical operation pushdown. Serialization - Serialize the in-memory format back into the persistent storage format.
          Hide
          Jacques Nadeau added a comment -

          To clarify the original description:

          Input/Output (at least one)
          Record Reading
          Record Writing

          Metadata:
          capabilities interface (what can be pushed down, what types of schemas might be available)
          schema interface (optional, ability to provide schema about various data sources)
          statistics interface (optional, provide information about ,
          data layout and ordering (optional, location of data e.g. which disks and nodes, information about primary and secondary sorts... maybe this is an extension of the schema interface.)

          Show
          Jacques Nadeau added a comment - To clarify the original description: Input/Output (at least one) Record Reading Record Writing Metadata: capabilities interface (what can be pushed down, what types of schemas might be available) schema interface (optional, ability to provide schema about various data sources) statistics interface (optional, provide information about , data layout and ordering (optional, location of data e.g. which disks and nodes, information about primary and secondary sorts... maybe this is an extension of the schema interface.)
          Hide
          Ted Dunning added a comment -

          I think that this misses what we need. The actual storage substrate is something that Drill shouldn't care about.

          What Drill cares about is:

          • how are records read?
          • is the data stored in columnar fashion?
          • what is the schema for the data?
          • are there multiple views of the data (for instance if the scanner does side effecting columnization)?
          • is there locality information available?

          I think that the reference interpreter will be clarifying many of these issues.

          Show
          Ted Dunning added a comment - I think that this misses what we need. The actual storage substrate is something that Drill shouldn't care about. What Drill cares about is: how are records read? is the data stored in columnar fashion? what is the schema for the data? are there multiple views of the data (for instance if the scanner does side effecting columnization)? is there locality information available? I think that the reference interpreter will be clarifying many of these issues.
          Hide
          goog cheng added a comment - - edited

          protocol-buffers and a nosql like couchDB? do we really need a database to store the columnar representation data? It seems a distributed filesystem like GFS is ok. Is Storage Engine Read Engine which is to read data from other data sources? That i got from the paper i would be sorry if there is something error i made

          Show
          goog cheng added a comment - - edited protocol-buffers and a nosql like couchDB? do we really need a database to store the columnar representation data? It seems a distributed filesystem like GFS is ok. Is Storage Engine Read Engine which is to read data from other data sources? That i got from the paper i would be sorry if there is something error i made

            People

            • Assignee:
              Jacques Nadeau
              Reporter:
              Jacques Nadeau
            • Votes:
              1 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:

                Development