Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: Future
    • Component/s: None
    • Labels:
      None
    1. DRILL-92.patch
      132 kB
      Yash Sharma
    2. DRILL-92-CassandraStorage.patch
      136 kB
      Yash Sharma
    3. DRILL-92-Cassandra-Storage.patch
      136 kB
      Yash Sharma
    4. DRILL-CASSANDRA.patch
      120 kB
      Yash Sharma

      Issue Links

        Activity

        Hide
        yash360@gmail.com Yash Sharma added a comment -

        Review board: https://reviews.apache.org/r/29816/

        Cassandra storage engine implementation using the datastax Java driver.
        Storage plugin format:

        { "type": "cassandra", "host": "localhost", "port": 9042, "enabled": true }
        Show
        yash360@gmail.com Yash Sharma added a comment - Review board: https://reviews.apache.org/r/29816/ Cassandra storage engine implementation using the datastax Java driver. Storage plugin format: { "type": "cassandra", "host": "localhost", "port": 9042, "enabled": true }
        Hide
        yash360@gmail.com Yash Sharma added a comment -

        Updated patch.
        Review board: https://reviews.apache.org/r/29816/

        Show
        yash360@gmail.com Yash Sharma added a comment - Updated patch. Review board: https://reviews.apache.org/r/29816/
        Hide
        snazy Robert Stupp added a comment -

        I went over your patch just to see how the actual C* integration has been implemented. Tbh - I don’t know how Drill works - but I know how C* and the Java Driver work.

        Please let me explain some things in advance. A Cassandra cluster exists of many nodes. Some of them might be down without affecting the integrity of the whole cluster. Due to the fact that some hosts might be down, the Datastax Java Driver allows to specify multiple initial contact points - and multiple initial contact points (maybe 3 per data center) should be passed to Cluster.Builder. All connections to a C* cluster are managed by the Cluster instance - not directly by Session. That means: to effectively close connections to a cluster, you have to close the Cluster instance. Further, the Cluster instance learns about all other nodes in the C* cluster - i.e. it will know all nodes in the cluster, which token ranges they server, and it will perform a best-effort-approach to route direct DML statements (SELECT/INSERT/UPDATE/DELETE) to the nodes that hold replicas of them. A usual application does not care about where data actually lives - it’s handled by the Java Driver for you.

        The lines in CassandraGroupScan calling com.datastax.driver.core.Metadata#getReplicas are wrong. Which nodes are replicas for a keyspace are defined by the replication strategy and the per-keyspace configuration. The method you’re calling determines the hosts for a specific partition key - but you’re passing in the class name of partitioner. Those are completely different things.
        Although not completely wrong, I’d encourage you not to assume which nodes hold the tokens you intend to request (in CassandraUtil). There are several other things that influence where data ”lives” - e.g. datacenter and rack awareness.

        In CassandraSchemaFactory is a keyspace cache and a table cache. That’s completely superfluous since the Java Driver already holds that information in the Cluster instance and it gets automagically updated when the cluster topology and/or the schema changes. That kind of metadata is essential for the Java Driver to work and always present.

        I’d recommend to start with a different approach and consider the current patch as a proof-of-concept (you may of course take over working code):

        1. Learn a bit more about C* and the Java Driver architecture
        2. Forget about accessing the ”nearest” node in an initial attempt - you can add that later anyway. BTW that does only make sense, if you have Drill slaves (don’t know if such exist) running on each C* node.
        3. Start with a simple cluster to work against. Take a look at ccm - it’s a neat tool that spawns a C* using multiple nodes on your local machine: https://github.com/pcmanus/ccm/.
        4. If you have a basic implementation running you may improve it by adding datacenter-awareness to your client (it’s basically just a simple configuration using Cluster.Builder, authentication against the C* cluster and some other fine tuning

        Feel free to ask questions on C* user mailing list user@cassandra.apache.org or on freenode IRC #cassandra. There are many people happy to answer individual questions. Just ask - don’t ask to ask

        Show
        snazy Robert Stupp added a comment - I went over your patch just to see how the actual C* integration has been implemented. Tbh - I don’t know how Drill works - but I know how C* and the Java Driver work. Please let me explain some things in advance. A Cassandra cluster exists of many nodes. Some of them might be down without affecting the integrity of the whole cluster. Due to the fact that some hosts might be down, the Datastax Java Driver allows to specify multiple initial contact points - and multiple initial contact points (maybe 3 per data center) should be passed to Cluster.Builder . All connections to a C* cluster are managed by the Cluster instance - not directly by Session . That means: to effectively close connections to a cluster, you have to close the Cluster instance. Further, the Cluster instance learns about all other nodes in the C* cluster - i.e. it will know all nodes in the cluster, which token ranges they server, and it will perform a best-effort-approach to route direct DML statements (SELECT/INSERT/UPDATE/DELETE) to the nodes that hold replicas of them. A usual application does not care about where data actually lives - it’s handled by the Java Driver for you. The lines in CassandraGroupScan calling com.datastax.driver.core.Metadata#getReplicas are wrong. Which nodes are replicas for a keyspace are defined by the replication strategy and the per-keyspace configuration. The method you’re calling determines the hosts for a specific partition key - but you’re passing in the class name of partitioner. Those are completely different things. Although not completely wrong, I’d encourage you not to assume which nodes hold the tokens you intend to request (in CassandraUtil ). There are several other things that influence where data ”lives” - e.g. datacenter and rack awareness. In CassandraSchemaFactory is a keyspace cache and a table cache. That’s completely superfluous since the Java Driver already holds that information in the Cluster instance and it gets automagically updated when the cluster topology and/or the schema changes. That kind of metadata is essential for the Java Driver to work and always present. I’d recommend to start with a different approach and consider the current patch as a proof-of-concept (you may of course take over working code): Learn a bit more about C* and the Java Driver architecture Forget about accessing the ”nearest” node in an initial attempt - you can add that later anyway. BTW that does only make sense, if you have Drill slaves (don’t know if such exist) running on each C* node. Start with a simple cluster to work against. Take a look at ccm - it’s a neat tool that spawns a C* using multiple nodes on your local machine: https://github.com/pcmanus/ccm/ . If you have a basic implementation running you may improve it by adding datacenter-awareness to your client (it’s basically just a simple configuration using Cluster.Builder , authentication against the C* cluster and some other fine tuning Feel free to ask questions on C* user mailing list user@cassandra.apache.org or on freenode IRC #cassandra. There are many people happy to answer individual questions. Just ask - don’t ask to ask
        Hide
        yash360@gmail.com Yash Sharma added a comment -

        Thanks Robert Stupp: I have definitely missed a lot in this patch - which started as a poc.
        There would be a lot of work to be done along your points enabling Drill and Cassandra inter-operable.
        Thanks

        Show
        yash360@gmail.com Yash Sharma added a comment - Thanks Robert Stupp : I have definitely missed a lot in this patch - which started as a poc. There would be a lot of work to be done along your points enabling Drill and Cassandra inter-operable. Thanks
        Hide
        yash360@gmail.com Yash Sharma added a comment -

        Implemented review comments + New storage plugin format

        Show
        yash360@gmail.com Yash Sharma added a comment - Implemented review comments + New storage plugin format
        Hide
        yash360@gmail.com Yash Sharma added a comment -

        Robert Stupp + All : On the points mentioned in the review comment -

        • Multiple Initial contact points: The patch now handles multiple initial contact points and passes all the information while connecting to the cluster. Tested with bringing down certain nodes.
        • Session handling : Now the patch uses the cluster instance rather than session.
        • Endpoint affinity & Partition token: The code does consider endpoint affinity such that - if one of drill endpoint is also a cassandra node it would have an affinity. The partition token is however for a different purpose. The partition scheme just ensures that different sub-scans of drill do not fetch the same range of keys again. It works as a range restriction for each sub-scan. We are not using it to check where the actual partition data lies.
          Tables & Keyspace cache: This is cached not for the functionality for querying cassandra. Its a schema information which we would need when the user would like to describe a table or check tables in keyspace etc. We just cache it.

        Thanks for the review comments. Please share your thoughts on the new patch.

        Show
        yash360@gmail.com Yash Sharma added a comment - Robert Stupp + All : On the points mentioned in the review comment - Multiple Initial contact points: The patch now handles multiple initial contact points and passes all the information while connecting to the cluster. Tested with bringing down certain nodes. Session handling : Now the patch uses the cluster instance rather than session. Endpoint affinity & Partition token: The code does consider endpoint affinity such that - if one of drill endpoint is also a cassandra node it would have an affinity. The partition token is however for a different purpose. The partition scheme just ensures that different sub-scans of drill do not fetch the same range of keys again. It works as a range restriction for each sub-scan. We are not using it to check where the actual partition data lies. Tables & Keyspace cache: This is cached not for the functionality for querying cassandra. Its a schema information which we would need when the user would like to describe a table or check tables in keyspace etc. We just cache it. Thanks for the review comments. Please share your thoughts on the new patch.
        Hide
        yash360@gmail.com Yash Sharma added a comment -

        New changes to test suite.

        Show
        yash360@gmail.com Yash Sharma added a comment - New changes to test suite.
        Hide
        snazy Robert Stupp added a comment -

        Yash Sharma sorry for the late response. Here are my comments:

        Tables & Keyspace cache - you don’t need to cache it. It is already cached - so it’s duplicate effort. Makes your code less complex if you remove the cache - e.g. all those cache related catch ExecutionException stuff.

        I’m not sure whether the connection cache in CassandraConnectionManager really works. For example, if you have hosts 127.0.0.1 and 127.0.0.2 using the same Cluster instance, and the cache decides to evict 127.0.0.1, the instance for 127.0.0.2 no longer works.
        Beside that you query the cache using List<String> but add using String for the key.
        As a proposal: use the cluster name as the cache key and query for that.
        I’m not sure whether you can always close the Cluster instance - in respect whether such an instance is still in use during a long running operation.

        Both CassandraConnectionManager and CassandraSchemaFactory create individual Cluster instances and therefore independent resources (connections, threads, etc). Should be merged. If both classes are used in completely different contexts, please ignore this comment.
        The Cluster instance in CassandraSchemaFactory is never closed.

        Endpoint affinity & Partition token

        If you’re using the code just to assign ranges to Drill hosts, then that should be fine.
        But do not assume anything about tokens assigned to a C* host. That code heavily depends on the individual cluster configuration (partitioner, topology, node placement (DC, rack)) and keyspace configuration. It’s not that easy, but manageable.

        In org.apache.drill.exec.store.cassandra.CassandraRecordReader#setup you’re using QueryBuilder.token for paging / slicing. Unfortunately that would not work. Assume that you have vnodes in the C* cluster (defaults to 256 vnodes per C* node). Vnode tokens are assigned randomly to endpoints (=nodes) - it’s not like old-fashioned single token per node. You just cannot slice using the token() function. Even further it’s quite difficult to nicely split slices matching both C* nodes/vnodes and Drill sub scan ranges (is this the correct wording?). Nice slicing across all nodes/vnodes is one of the weak sides in C*. That’s why Hadoop-on-C* recommends to prevent vnodes - they have the same problem. Let me think a bit about that - maybe I can provide a solution or at least a workaround for that.

        For unit tests: take a look at https://github.com/doanduyhai/Achilles - it has some nice support for unit tests, which may make all that manual work to setup and fill keyspaces/tables superfluous.

        Is it a Drill requirement that CassandraRecordReader#updateValueVector only mutates using {{String}}s?

        General code comments:

        • there’s some unused code, that can be safely removed
        • in CassandraRecordReader: you can safely replace the clazz.isInstance()-sequence with clazz == ClassName.class

        Note: the patch does not apply onto the current master - but on master as of March 31st. There were some breaking API changes in Drill.

        For the future: I don’t know whether the current code supports Cassandra’s User-Defined-Types or collections (maps, sets, lists). If not, it might be a nice feature for later.

        Show
        snazy Robert Stupp added a comment - Yash Sharma sorry for the late response. Here are my comments: Tables & Keyspace cache - you don’t need to cache it. It is already cached - so it’s duplicate effort. Makes your code less complex if you remove the cache - e.g. all those cache related catch ExecutionException stuff. I’m not sure whether the connection cache in CassandraConnectionManager really works. For example, if you have hosts 127.0.0.1 and 127.0.0.2 using the same Cluster instance, and the cache decides to evict 127.0.0.1, the instance for 127.0.0.2 no longer works. Beside that you query the cache using List<String> but add using String for the key. As a proposal: use the cluster name as the cache key and query for that. I’m not sure whether you can always close the Cluster instance - in respect whether such an instance is still in use during a long running operation. Both CassandraConnectionManager  and CassandraSchemaFactory create individual Cluster instances and therefore independent resources (connections, threads, etc). Should be merged. If both classes are used in completely different contexts, please ignore this comment. The Cluster instance in CassandraSchemaFactory is never closed. Endpoint affinity & Partition token If you’re using the code just to assign ranges to Drill hosts, then that should be fine. But do not assume anything about tokens assigned to a C* host. That code heavily depends on the individual cluster configuration (partitioner, topology, node placement (DC, rack)) and keyspace configuration. It’s not that easy, but manageable. In org.apache.drill.exec.store.cassandra.CassandraRecordReader#setup you’re using QueryBuilder.token for paging / slicing. Unfortunately that would not work. Assume that you have vnodes in the C* cluster (defaults to 256 vnodes per C* node). Vnode tokens are assigned randomly to endpoints (=nodes) - it’s not like old-fashioned single token per node. You just cannot slice using the token() function. Even further it’s quite difficult to nicely split slices matching both C* nodes/vnodes and Drill sub scan ranges (is this the correct wording?). Nice slicing across all nodes/vnodes is one of the weak sides in C*. That’s why Hadoop-on-C* recommends to prevent vnodes - they have the same problem. Let me think a bit about that - maybe I can provide a solution or at least a workaround for that. For unit tests: take a look at https://github.com/doanduyhai/Achilles - it has some nice support for unit tests, which may make all that manual work to setup and fill keyspaces/tables superfluous. Is it a Drill requirement that CassandraRecordReader#updateValueVector only mutates using {{String}}s? General code comments: there’s some unused code, that can be safely removed in CassandraRecordReader : you can safely replace the clazz.isInstance() -sequence with clazz == ClassName.class Note: the patch does not apply onto the current master - but on master as of March 31st. There were some breaking API changes in Drill. For the future: I don’t know whether the current code supports Cassandra’s User-Defined-Types or collections (maps, sets, lists). If not, it might be a nice feature for later.
        Hide
        snazy Robert Stupp added a comment -

        FYI there's a ticket for an interesting enhancement to C* to read a token range: https://issues.apache.org/jira/browse/CASSANDRA-9259

        Show
        snazy Robert Stupp added a comment - FYI there's a ticket for an interesting enhancement to C* to read a token range: https://issues.apache.org/jira/browse/CASSANDRA-9259
        Hide
        santoshkulkarni SK (Inactive) added a comment -

        Any new update or timeline on Cassandra storage engine availability in Drill?

        Show
        santoshkulkarni SK (Inactive) added a comment - Any new update or timeline on Cassandra storage engine availability in Drill?

          People

          • Assignee:
            Unassigned
            Reporter:
            sphillips Steven Phillips
          • Votes:
            1 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

            • Created:
              Updated:

              Development