Cassandra
  1. Cassandra
  2. CASSANDRA-4421

Support cql3 table definitions in Hadoop InputFormat

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 1.2.6
    • Component/s: API
    • Labels:
    • Environment:

      Debian Squeeze

      Description

      Hello,

      i faced a bug while writing composite column values and following validation on server side.

      This is the setup for reproduction:

      1. create a keyspace

      create keyspace test with strategy_class = 'SimpleStrategy' and strategy_options:replication_factor = 1;

      2. create a cf via cql (3.0)

      create table test1 (
      a int,
      b int,
      c int,
      primary key (a, b)
      );

      If i have a look at the schema in cli i noticed that there is no column metadata for columns not part of primary key.

      create column family test1
      with column_type = 'Standard'
      and comparator = 'CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)'
      and default_validation_class = 'UTF8Type'
      and key_validation_class = 'Int32Type'
      and read_repair_chance = 0.1
      and dclocal_read_repair_chance = 0.0
      and gc_grace = 864000
      and min_compaction_threshold = 4
      and max_compaction_threshold = 32
      and replicate_on_write = true
      and compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
      and caching = 'KEYS_ONLY'
      and compression_options =

      {'sstable_compression' : 'org.apache.cassandra.io.compress.SnappyCompressor'}

      ;

      Please notice the default validation class: UTF8Type

      Now i would like to insert value > 127 via cassandra client (no cql, part of mr-jobs). Have a look at the attachement.

      Batch mutate fails:

      InvalidRequestException(why:(String didn't validate.) [test][test1][1:c] failed validation)

      A validator for column value is fetched in ThriftValidation::validateColumnData which returns always the default validator which is UTF8Type as described above (The ColumnDefinition for given column name "c" is always null)

      In UTF8Type there is a check for

      if (b > 127)
      return false;

      Anyway, maybe i'm doing something wrong, but i used cql 3.0 for table creation. I assigned data types to all columns, but i can not set values for a composite column because the default validation class is used.

      I think the schema should know the correct validator even for composite columns. The usage of the default validation class does not make sense.

      Best Regards

      Bert Passek

      1. 4421.txt
        162 kB
        Alex Liu
      2. 4421-1.txt
        167 kB
        Alex Liu
      3. 4421-10-patch-for-1.2.txt
        171 kB
        Alex Liu
      4. 4421-10-patch-for-trunk.txt
        169 kB
        Alex Liu
      5. 4421-2.txt
        167 kB
        Alex Liu
      6. 4421-3.txt
        168 kB
        Alex Liu
      7. 4421-4.txt
        169 kB
        Alex Liu
      8. 4421-5.txt
        169 kB
        Alex Liu
      9. 4421-6.cb.txt
        169 kB
        Colin B.
      10. 4421-6-je.txt
        20 kB
        Alex Liu
      11. 4421-7-je.txt
        20 kB
        Alex Liu
      12. 4421-8-cb.txt
        166 kB
        Colin B.
      13. 4421-8-je.txt
        167 kB
        Alex Liu
      14. 4421-9-je.txt
        169 kB
        Alex Liu

        Activity

        Hide
        Jonathan Ellis added a comment -

        You need to use cqlsh to interact with cql3 table definitions.

        Show
        Jonathan Ellis added a comment - You need to use cqlsh to interact with cql3 table definitions.
        Hide
        bert Passek added a comment -

        Oh, i got it, so it means we can not use MR-Jobs using ColumnFamilyOutputFormat or BulkOutputFormat?

        Show
        bert Passek added a comment - Oh, i got it, so it means we can not use MR-Jobs using ColumnFamilyOutputFormat or BulkOutputFormat?
        Hide
        Jonathan Ellis added a comment -

        I should say: you need to use cqlsh, or cql from the client (see comments on CASSANDRA-4377).

        Show
        Jonathan Ellis added a comment - I should say: you need to use cqlsh, or cql from the client (see comments on CASSANDRA-4377 ).
        Hide
        Jonathan Ellis added a comment -

        You can still create appropriate composites from m/r, but the schema design doesn't fit in the structures thrift knows about. So just create a (int, 'c') composite value and an int column name and you'll be fine. The cli can't display this metadata because it only knows how to deal with named non-composite columns.

        Show
        Jonathan Ellis added a comment - You can still create appropriate composites from m/r, but the schema design doesn't fit in the structures thrift knows about. So just create a (int, 'c') composite value and an int column name and you'll be fine. The cli can't display this metadata because it only knows how to deal with named non-composite columns.
        Hide
        Jonathan Ellis added a comment -

        Turns out that's not true yet. Sylvain says:

        Crrently you do have a problem inserting data from thrift to a CQL3 cf, even if you know what to insert exactly. The reason is that ThriftValidation would need to be updated to use CQL3 metdata.

        Show
        Jonathan Ellis added a comment - Turns out that's not true yet. Sylvain says: Crrently you do have a problem inserting data from thrift to a CQL3 cf, even if you know what to insert exactly. The reason is that ThriftValidation would need to be updated to use CQL3 metdata.
        Hide
        bert Passek added a comment -

        Ah, well the issue was already closed so i didn't care the attachement. But you are right, i had problems inserting data from thrift (via client.batch_mutate) to a cql3 cf. I just mentioned in the debugger, that ThriftValidation doesn't know about metadata, so default validator is always used which results in rejecting data on server side.

        Sometimes it can be confusing when talking about cql 2, 3, cli,thrift etc.

        Thanks.

        Show
        bert Passek added a comment - Ah, well the issue was already closed so i didn't care the attachement. But you are right, i had problems inserting data from thrift (via client.batch_mutate) to a cql3 cf. I just mentioned in the debugger, that ThriftValidation doesn't know about metadata, so default validator is always used which results in rejecting data on server side. Sometimes it can be confusing when talking about cql 2, 3, cli,thrift etc. Thanks.
        Hide
        Jonathan Ellis added a comment -

        CASSANDRA-3647 makes map/reduce from Thrift more painful as well.

        We can either

        Show
        Jonathan Ellis added a comment - CASSANDRA-3647 makes map/reduce from Thrift more painful as well. We can either Add a CqlInputFormat, which is painful (see https://issues.apache.org/jira/browse/CASSANDRA-2878?focusedCommentId=13189138&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13189138 ) Add collections support to Thrift via String serialization (which has the added benefit of allowing clients to support collections w/o doing a full rewrite to cql3)
        Hide
        Sylvain Lebresne added a comment -

        Add collections support to Thrift via String serialization

        I'm not really sure what you have in mind but I'm not sure I'm a fan of that idea. Supporting collections on write is one thing, supporting in on reads is another. Do we really want to add a new pass over the resulting columns in thrift to group those belonging to the same collection? That sound very messy.

        Add a CqlInputFormat

        While this might be painful, I have a strong feeling that long term this is probably the right solution (one reason being if we ever want to support CASSANDRA-2478 for the input format). On the painful part of this:

        • on the short term, can't we use the CQL processor client side to convert the select statement to a thrift query (since we know how to do this for thrift queries)?
        • on the slightly longer term, we will need general paging for CQL queries for CASSANDRA-4415. Once that's in, that should lift the main difficulty, shouldn't it?

        But for this specific issue, I suppose the patch I've attached to CASSANDRA-4377 should solve Bert's problem.

        Show
        Sylvain Lebresne added a comment - Add collections support to Thrift via String serialization I'm not really sure what you have in mind but I'm not sure I'm a fan of that idea. Supporting collections on write is one thing, supporting in on reads is another. Do we really want to add a new pass over the resulting columns in thrift to group those belonging to the same collection? That sound very messy. Add a CqlInputFormat While this might be painful, I have a strong feeling that long term this is probably the right solution (one reason being if we ever want to support CASSANDRA-2478 for the input format). On the painful part of this: on the short term, can't we use the CQL processor client side to convert the select statement to a thrift query (since we know how to do this for thrift queries)? on the slightly longer term, we will need general paging for CQL queries for CASSANDRA-4415 . Once that's in, that should lift the main difficulty, shouldn't it? But for this specific issue, I suppose the patch I've attached to CASSANDRA-4377 should solve Bert's problem.
        Hide
        Jonathan Ellis added a comment -

        on the short term, can't we use the CQL processor client side to convert the select statement to a thrift query (since we know how to do this for thrift queries)?

        Well, we know how to convert select to StorageProxy queries, which isn't quite the same thing. So we'd probably need to shove an abstraction layer in there, which is already some pretty thick code.

        on the slightly longer term, we will need general paging for CQL queries for CASSANDRA-4415. Once that's in, that should lift the main difficulty, shouldn't it?

        Yes, I think it would.

        Show
        Jonathan Ellis added a comment - on the short term, can't we use the CQL processor client side to convert the select statement to a thrift query (since we know how to do this for thrift queries)? Well, we know how to convert select to StorageProxy queries, which isn't quite the same thing. So we'd probably need to shove an abstraction layer in there, which is already some pretty thick code. on the slightly longer term, we will need general paging for CQL queries for CASSANDRA-4415 . Once that's in, that should lift the main difficulty, shouldn't it? Yes, I think it would.
        Hide
        Sylvain Lebresne added a comment -

        Unassigned myself for now because I'm really not an expert of the InputFormat. So if someone else want to pick it up, feel free to do it.

        Show
        Sylvain Lebresne added a comment - Unassigned myself for now because I'm really not an expert of the InputFormat. So if someone else want to pick it up, feel free to do it.
        Hide
        Ondřej Černoš added a comment -

        Is this ticket really planned for 1.2.5?

        Show
        Ondřej Černoš added a comment - Is this ticket really planned for 1.2.5?
        Hide
        Alex Liu added a comment -

        I have done some work on it. The pull is @https://github.com/riptano/cassandra/pull/45

        Show
        Alex Liu added a comment - I have done some work on it. The pull is @ https://github.com/riptano/cassandra/pull/45
        Hide
        Jonathan Ellis added a comment -

        Can you explain the approach you took? In particular, how do you handle paging?

        Show
        Jonathan Ellis added a comment - Can you explain the approach you took? In particular, how do you handle paging?
        Hide
        Alex Liu added a comment - - edited

        There are no auto paging for CQL3, so I use a work around method to page through CQL wide rows. Basic idea is to use CQL3 query on the partition key and cluster columns

        e.g. PRIMARY(m, n , o, p) where partition key is m, cluster columns are n, o, p
        

        for Query

        Select * from test limit 1000;
        

        We store the last values of m, n, o, p to m_end, n_end, O_end and p_end after the initial 1000 rows

        so the next page query is

        Select * from test 
        where token(m) = token (m_end) 
              and n = n_end
              and o = o_end
              and p > p_end
        Limit 1000
        
        

        If it reach the end of O, then the next query is the following query

        Select * from test 
        where token(m) = toekn(m_end) 
              and n = n_end
              and o > o_end
        Limit 1000
        

        otherwise

        Select * from test 
        where token(m) = token (m_end) 
              and n = n_end
              and o = o_end
              and p > p_end1
        Limit 1000
        

        until it reach to the next row, the query is

        Select * from test 
        where token(m) > token(m_end) 
        Limit 1000
        

        For the table has more than one columns in partition key

        PRIMARY((m, n) , o, p) where partition key is m and n, cluster columns are o, p
        

        we use the following query

        Select * from test 
        where token(m, n) > token(m_end, n_end) 
        Limit 1000
        
        Show
        Alex Liu added a comment - - edited There are no auto paging for CQL3, so I use a work around method to page through CQL wide rows. Basic idea is to use CQL3 query on the partition key and cluster columns e.g. PRIMARY(m, n , o, p) where partition key is m, cluster columns are n, o, p for Query Select * from test limit 1000; We store the last values of m, n, o, p to m_end, n_end, O_end and p_end after the initial 1000 rows so the next page query is Select * from test where token(m) = token (m_end) and n = n_end and o = o_end and p > p_end Limit 1000 If it reach the end of O, then the next query is the following query Select * from test where token(m) = toekn(m_end) and n = n_end and o > o_end Limit 1000 otherwise Select * from test where token(m) = token (m_end) and n = n_end and o = o_end and p > p_end1 Limit 1000 until it reach to the next row, the query is Select * from test where token(m) > token(m_end) Limit 1000 For the table has more than one columns in partition key PRIMARY((m, n) , o, p) where partition key is m and n, cluster columns are o, p we use the following query Select * from test where token(m, n) > token(m_end, n_end) Limit 1000
        Hide
        Alex Liu added a comment -

        User needs to pass the following settings to the job.

        1. Keyspace and Columnfamily name
        2. intial host, port and Partitioner
        2. Column names that need to be retrieved (optional), default are all the columns
        3. the number of CQL rows per page (optional), default is 1000
        4. User defined the where clauses on indexed columns (optional)
        

        The input format is of List<IColumn>, Map<ByteBuffer, IColumn>
        where List<IColumn> is the keys columns including partition keys and clustering keys
        Map<ByteBuffer, IColumn> is the map of CQL query output column name and column

        Internally, we use the following CQL query

          SELECT <columns> 
          FROM   <Column_family_name> 
          WHERE  <where_clause>
            AND  <user_defined_WhereClauses_on_indexed_column> 
          LIMIT  <page_row_size>
          ALLOW  FILTERING
        
        <where_clause> could be any of the following format
         WHERE token(<partition_key>) >= <start_token> 
           AND token(<partition_key>) <= <end_token>
        or
         WHERE token(<partition_key>) > token(<partition_key_value>) 
           AND token(<partition_key>) <= <end_token>
        or
         WHERE token(<partition_key>) = token(<partition_key_value>) 
           AND <clustering_key1> = <key_value1>
           AND <clustering_key2> > <key_value2>
           AND token(<partition_key>) <= <end_token>
        or
         WHERE token(<partition_key>) = token(<partition_key_value>) 
           AND <clustering_key1> = <key_value1>
           AND <clustering_key2> = <key_value2>
           AND <clustering_key3> > <key_value3>
           AND token(<partition_key>) <= <end_token> 
        
        Show
        Alex Liu added a comment - User needs to pass the following settings to the job. 1. Keyspace and Columnfamily name 2. intial host, port and Partitioner 2. Column names that need to be retrieved (optional), default are all the columns 3. the number of CQL rows per page (optional), default is 1000 4. User defined the where clauses on indexed columns (optional) The input format is of List<IColumn>, Map<ByteBuffer, IColumn> where List<IColumn> is the keys columns including partition keys and clustering keys Map<ByteBuffer, IColumn> is the map of CQL query output column name and column Internally, we use the following CQL query SELECT <columns> FROM <Column_family_name> WHERE <where_clause> AND <user_defined_WhereClauses_on_indexed_column> LIMIT <page_row_size> ALLOW FILTERING <where_clause> could be any of the following format WHERE token(<partition_key>) >= <start_token> AND token(<partition_key>) <= <end_token> or WHERE token(<partition_key>) > token(<partition_key_value>) AND token(<partition_key>) <= <end_token> or WHERE token(<partition_key>) = token(<partition_key_value>) AND <clustering_key1> = <key_value1> AND <clustering_key2> > <key_value2> AND token(<partition_key>) <= <end_token> or WHERE token(<partition_key>) = token(<partition_key_value>) AND <clustering_key1> = <key_value1> AND <clustering_key2> = <key_value2> AND <clustering_key3> > <key_value3> AND token(<partition_key>) <= <end_token>
        Hide
        Alex Liu added a comment -

        If we could get the auto paging through native protocol for CQL3, then we can easily implement the CQL record reader. Before auto paging is available, we can use this record reader to go through the paging.

        Show
        Alex Liu added a comment - If we could get the auto paging through native protocol for CQL3, then we can easily implement the CQL record reader. Before auto paging is available, we can use this record reader to go through the paging.
        Hide
        Cyril Scetbon added a comment -

        Can we consider this code as functional but not efficient ?

        Show
        Cyril Scetbon added a comment - Can we consider this code as functional but not efficient ?
        Hide
        Jonathan Ellis added a comment -

        Elaborate?

        Show
        Jonathan Ellis added a comment - Elaborate?
        Hide
        Cyril Scetbon added a comment - - edited

        I'm talking about the patches provided by Alex Liu. If it works (not tested and it's part of the question) can we use it until it's more efficient with the use of native paging ? Do I make a bad supposition about the lack of performance of this paging strategy ?

        Show
        Cyril Scetbon added a comment - - edited I'm talking about the patches provided by Alex Liu. If it works (not tested and it's part of the question) can we use it until it's more efficient with the use of native paging ? Do I make a bad supposition about the lack of performance of this paging strategy ?
        Hide
        Alex Liu added a comment - - edited

        It does a few extra CQL requests to get to the next page/CF rows. Other than that it's efficient on thrift server.

        Because that native paging could keep using the same connection and internally keep reading the data where page it's, native auto paging could be more efficient.

        I don't know how difficult to implement the native auto paging and the time line. When it's available, we can add a new CQL native reader, so user can have choice to use this one and the new native reader.

        Show
        Alex Liu added a comment - - edited It does a few extra CQL requests to get to the next page/CF rows. Other than that it's efficient on thrift server. Because that native paging could keep using the same connection and internally keep reading the data where page it's, native auto paging could be more efficient. I don't know how difficult to implement the native auto paging and the time line. When it's available, we can add a new CQL native reader, so user can have choice to use this one and the new native reader.
        Hide
        Cyril Scetbon added a comment -

        Alex, Can you provide a patch that can be applied to cassandra-1.2.3 branch ? I would like to test it. Thanks

        Show
        Cyril Scetbon added a comment - Alex, Can you provide a patch that can be applied to cassandra-1.2.3 branch ? I would like to test it. Thanks
        Hide
        Alex Liu added a comment -

        I attach the patch for cassandra-1.2.3

        Show
        Alex Liu added a comment - I attach the patch for cassandra-1.2.3
        Hide
        Cyril Scetbon added a comment -

        thanks but I had an issue when compiling. TClientTransportFactory interface was not found so I copied the file src/java/org/apache/cassandra/thrift/TClientTransportFactory.java from your DSP-1954 branch and the compilation succeeded. I'll give it a try. Tell me if I did an error by doing this copy.

        Show
        Cyril Scetbon added a comment - thanks but I had an issue when compiling. TClientTransportFactory interface was not found so I copied the file src/java/org/apache/cassandra/thrift/TClientTransportFactory.java from your DSP-1954 branch and the compilation succeeded. I'll give it a try. Tell me if I did an error by doing this copy.
        Hide
        Mike Schrag added a comment -

        you'll probably also need TFramedTransportFactory (the implementation of that) ... I'm going through this same exercise now as well.

        Show
        Mike Schrag added a comment - you'll probably also need TFramedTransportFactory (the implementation of that) ... I'm going through this same exercise now as well.
        Hide
        Cyril Scetbon added a comment -

        I didn't need that file to compile. Cassandra is starting well. I'll create and load a CQL3 CF and test a Hadoop Job in the next hour.

        Show
        Cyril Scetbon added a comment - I didn't need that file to compile. Cassandra is starting well. I'll create and load a CQL3 CF and test a Hadoop Job in the next hour.
        Hide
        Mike Schrag added a comment -

        yeah, it's not necessary to compile, but it will be necessary to run

        Show
        Mike Schrag added a comment - yeah, it's not necessary to compile, but it will be necessary to run
        Hide
        Cyril Scetbon added a comment - - edited

        I didn't have to add it cause it was already there

        Show
        Cyril Scetbon added a comment - - edited I didn't have to add it cause it was already there
        Hide
        Alex Liu added a comment -

        Cyril Scetbon Sorry about the missing file TClientTransportFactory. Let me know your testing result.

        Show
        Alex Liu added a comment - Cyril Scetbon Sorry about the missing file TClientTransportFactory. Let me know your testing result.
        Hide
        Mike Schrag added a comment -

        my first trivial test was a one node localhost cass w/ a table with 1 row that has a text key and that ended in an infinite loop of:

        2013-05-07 10:47:41,459 [main] DEBUG org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader - query type: 0
        2013-05-07 10:47:41,459 [main] DEBUG org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader - set tail to null
        ...repeated...

        I'm going to try some multi-node clusters, some multi-row tables, and some different key types to see how that's impacted

        Show
        Mike Schrag added a comment - my first trivial test was a one node localhost cass w/ a table with 1 row that has a text key and that ended in an infinite loop of: 2013-05-07 10:47:41,459 [main] DEBUG org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader - query type: 0 2013-05-07 10:47:41,459 [main] DEBUG org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader - set tail to null ...repeated... I'm going to try some multi-node clusters, some multi-row tables, and some different key types to see how that's impacted
        Hide
        Mike Schrag added a comment - - edited

        i added 10k rows to that simple table, and the infinite loop went away (maybe just a problem when you have a single row .. i'll have to dig into that in a little bit). I got a few rows out, then:

        Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
        	at java.util.ArrayList.RangeCheck(ArrayList.java:547)
        	at java.util.ArrayList.get(ArrayList.java:322)
        	at org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader$RowIterator.preparedQueryBindValues(ColumnFamilyRecordReader.java:596)
        
        Show
        Mike Schrag added a comment - - edited i added 10k rows to that simple table, and the infinite loop went away (maybe just a problem when you have a single row .. i'll have to dig into that in a little bit). I got a few rows out, then: Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.RangeCheck(ArrayList.java:547) at java.util.ArrayList.get(ArrayList.java:322) at org.apache.cassandra.hadoop.cql3.ColumnFamilyRecordReader$RowIterator.preparedQueryBindValues(ColumnFamilyRecordReader.java:596)
        Hide
        Cyril Scetbon added a comment -

        Mike Schrag you were right. I have a bad TFramedTransportFactory cause it was not included in the patch
        Alex Liu Both TFramedTransportFactory and TClientTransportFactory were missing. I copy them and retry.

        Show
        Cyril Scetbon added a comment - Mike Schrag you were right. I have a bad TFramedTransportFactory cause it was not included in the patch Alex Liu Both TFramedTransportFactory and TClientTransportFactory were missing. I copy them and retry.
        Hide
        Mike Schrag added a comment -

        This is a stock 1.2.4 cassandra install. For total disclosure, I DID suck your code out of cass and into our project, and made a few tweaks to build against newer hadoop libs, but I'm actually not even using hadoop here – i'm just calling the ColumnFamilyInputFormat from a simple java main method, so it's possible i'm skewing something with a busted test, but i don't THINK so:

            Configuration conf = new Configuration();
        
            ConfigHelper.setInputInitialAddress(conf, "127.0.0.1");
            ConfigHelper.setInputRpcPort(conf, "9160");
            ConfigHelper.setInputPartitioner(conf, "Murmur3Partitioner");
            ConfigHelper.setInputColumnFamily(conf, "whatever", "branch");
            CQLConfigHelper.setInputCQLPageRowSize(conf, "3");
            //CQLConfigHelper.setInputWhereClauses(conf, "title='A'");
        
            JobContext jobContext = new JobContextImpl(conf, new JobID());
            TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
            
            ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat();
            List<InputSplit> splits = inputFormat.getSplits(jobContext);
            for (InputSplit split : splits) {
              ColumnFamilySplit columnFamilySplit = (ColumnFamilySplit) split;
              System.out.printf("split: %s\n", split);
        
              ColumnFamilyRecordReader reader = new ColumnFamilyRecordReader();
              reader.initialize(split, context);
              // now read out all the values...
              while (reader.nextKeyValue()) {
        
                List<IColumn> keys = reader.getCurrentKey();
                System.out.println("CassandraBulkTest.main: " + ByteBufferUtil.string(keys.get(0).value()));
                Map<ByteBuffer, IColumn> columns = reader.getCurrentValue();
                for (IColumn column : columns.values()) {
                  String name  = ByteBufferUtil.string(column.name());
                  String value = "skipped";//column.value() != null ? ByteBufferUtil.string(column.value()) : "null value";
                  System.out.println("CassandraBulkTest.main: " + name + "=>" + value);
                }
              }
            }
          }
        
        Show
        Mike Schrag added a comment - This is a stock 1.2.4 cassandra install. For total disclosure, I DID suck your code out of cass and into our project, and made a few tweaks to build against newer hadoop libs, but I'm actually not even using hadoop here – i'm just calling the ColumnFamilyInputFormat from a simple java main method, so it's possible i'm skewing something with a busted test, but i don't THINK so: Configuration conf = new Configuration(); ConfigHelper.setInputInitialAddress(conf, "127.0.0.1" ); ConfigHelper.setInputRpcPort(conf, "9160" ); ConfigHelper.setInputPartitioner(conf, "Murmur3Partitioner" ); ConfigHelper.setInputColumnFamily(conf, "whatever" , "branch" ); CQLConfigHelper.setInputCQLPageRowSize(conf, "3" ); //CQLConfigHelper.setInputWhereClauses(conf, "title='A'" ); JobContext jobContext = new JobContextImpl(conf, new JobID()); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); ColumnFamilyInputFormat inputFormat = new ColumnFamilyInputFormat(); List<InputSplit> splits = inputFormat.getSplits(jobContext); for (InputSplit split : splits) { ColumnFamilySplit columnFamilySplit = (ColumnFamilySplit) split; System .out.printf( "split: %s\n" , split); ColumnFamilyRecordReader reader = new ColumnFamilyRecordReader(); reader.initialize(split, context); // now read out all the values... while (reader.nextKeyValue()) { List<IColumn> keys = reader.getCurrentKey(); System .out.println( "CassandraBulkTest.main: " + ByteBufferUtil.string(keys.get(0).value())); Map<ByteBuffer, IColumn> columns = reader.getCurrentValue(); for (IColumn column : columns.values()) { String name = ByteBufferUtil.string(column.name()); String value = "skipped" ; //column.value() != null ? ByteBufferUtil.string(column.value()) : " null value" ; System .out.println( "CassandraBulkTest.main: " + name + "=>" + value); } } } }
        Hide
        Cyril Scetbon added a comment -

        Alex Liu Something should be missing too on my new cassandra-1.2.3 patched as the column family (CQL3 Storage) is not visible through Thrift :

        15:28:03,073 DEBUG [main] (QueryParserDriver.java:283) - Resulting macro AST:
        (QUERY (STATEMENT data (LOAD 'cassandra://ks1/t1' (FUNC CassandraStorage))))

        15:28:04,426 ERROR [main] (LogUtils.java:125) - ERROR 1200: Column family 't1' not found in keyspace 'ks1'

        Show
        Cyril Scetbon added a comment - Alex Liu Something should be missing too on my new cassandra-1.2.3 patched as the column family (CQL3 Storage) is not visible through Thrift : 15:28:03,073 DEBUG [main] (QueryParserDriver.java:283) - Resulting macro AST: (QUERY (STATEMENT data (LOAD 'cassandra://ks1/t1' (FUNC CassandraStorage)))) 15:28:04,426 ERROR [main] (LogUtils.java:125) - ERROR 1200: Column family 't1' not found in keyspace 'ks1'
        Hide
        Mike Schrag added a comment -

        Cyril – I git past initialization, so that code appears to work. Are you sure you're pointing at the right server?

        Show
        Mike Schrag added a comment - Cyril – I git past initialization, so that code appears to work. Are you sure you're pointing at the right server?
        Hide
        Mike Schrag added a comment -

        Oh, your case is loading data, so that might be different ... I'm only testing fetching right now.

        Show
        Mike Schrag added a comment - Oh, your case is loading data, so that might be different ... I'm only testing fetching right now.
        Hide
        Alex Liu added a comment -

        Mike Schrag Did you miss the keyspace and column family settings? I will do some debugging and testing today. The code hasn't been fully tested. It did pass the example in the patch for me.

        Show
        Alex Liu added a comment - Mike Schrag Did you miss the keyspace and column family settings? I will do some debugging and testing today. The code hasn't been fully tested. It did pass the example in the patch for me.
        Hide
        Cyril Scetbon added a comment - - edited

        I need to test cassandra 1.2.3, so the method I used :

        • git checkout cassandra-1.2.3
        • patch -p0 < patch_from_Alex
        • copy TFramedTransportFactory and TClientTransportFactory from git checkout remotes/origin/DSP-1954 (from https://github.com/riptano/cassandra)
        • ant and deploy everything to my test cluster

        Mike Schrag it's a Pig Load call so it's actually fetching data from Cassandra.

        Show
        Cyril Scetbon added a comment - - edited I need to test cassandra 1.2.3, so the method I used : git checkout cassandra-1.2.3 patch -p0 < patch_from_Alex copy TFramedTransportFactory and TClientTransportFactory from git checkout remotes/origin/DSP-1954 (from https://github.com/riptano/cassandra ) ant and deploy everything to my test cluster Mike Schrag it's a Pig Load call so it's actually fetching data from Cassandra.
        Hide
        Mike Schrag added a comment -

        Alex - In my case, I get 2 rows out of my CF before it blows up with that exception, so my basic config is definitely right. That response might have been intended for Cyril, though.

        Show
        Mike Schrag added a comment - Alex - In my case, I get 2 rows out of my CF before it blows up with that exception, so my basic config is definitely right. That response might have been intended for Cyril, though.
        Hide
        Mike Schrag added a comment -

        I'm still digging into this, but one thing I noticed in RowIterator.computeNext, in the "if (pageRows >= pageRowSize || !iterator.hasNext())" condition, it looks like if you hit the end of the page, and there isn't another iterator (i.e. you're done), you return endOfData() there, which I think would mean that you're always losing the last row of results, because the final row would be in the Pair.create(keyColumns, valueColumns) that you'd ordinarily return from this method?

        Show
        Mike Schrag added a comment - I'm still digging into this, but one thing I noticed in RowIterator.computeNext, in the "if (pageRows >= pageRowSize || !iterator.hasNext())" condition, it looks like if you hit the end of the page, and there isn't another iterator (i.e. you're done), you return endOfData() there, which I think would mean that you're always losing the last row of results, because the final row would be in the Pair.create(keyColumns, valueColumns) that you'd ordinarily return from this method?
        Hide
        Mike Schrag added a comment -

        Ah, I think all the clusterKeys.get(0).value == null need to be:

        clusterKeys.size() == 0 || clusterKeys.get(0).value == null

        My clusterKeys is always empty.

        Show
        Mike Schrag added a comment - Ah, I think all the clusterKeys.get(0).value == null need to be: clusterKeys.size() == 0 || clusterKeys.get(0).value == null My clusterKeys is always empty.
        Hide
        Mike Schrag added a comment - - edited

        and setTailNull needs to return -1 if the values is empty:

                private int setTailNull(List<Key> values)
                {
                    if (values.size() == 0)
                        return -1;
        

        that fixes the infinite loop ... this lets me get through the rows now. I'm going to check to see if all the rows are actually there now, but it's not dying or hanging at least, so that's a start

        Show
        Mike Schrag added a comment - - edited and setTailNull needs to return -1 if the values is empty: private int setTailNull(List<Key> values) { if (values.size() == 0) return -1; that fixes the infinite loop ... this lets me get through the rows now. I'm going to check to see if all the rows are actually there now, but it's not dying or hanging at least, so that's a start
        Hide
        Mike Schrag added a comment -

        i do get all the rows back in my tests (even when the last row falls on a page barrier). so a false alarm on the comment about eating the last row.

        Show
        Mike Schrag added a comment - i do get all the rows back in my tests (even when the last row falls on a page barrier). so a false alarm on the comment about eating the last row.
        Hide
        Alex Liu added a comment - - edited

        I attached the fixed version 4421-1.txt. It now fixes the issue with no clustering keys and also removes the endOfData() for the condition mentioned by Mike Schrag.

        BTW it patches on C*-1.2.3 version, you can review it at https://github.com/alexliu68/cassandra/pull/1

        Show
        Alex Liu added a comment - - edited I attached the fixed version 4421-1.txt. It now fixes the issue with no clustering keys and also removes the endOfData() for the condition mentioned by Mike Schrag. BTW it patches on C*-1.2.3 version, you can review it at https://github.com/alexliu68/cassandra/pull/1
        Hide
        Alex Liu added a comment -

        Attached the third version

        Show
        Alex Liu added a comment - Attached the third version
        Hide
        Cyril Scetbon added a comment -

        Alex Liu Compilation works like a charm with your last patch on cassandra-1.2.3 tag. I'm still having the same issue concerning the invisible column family to thrift API : http://pastebin.com/qGwGMa7r
        However as it's linked to issue https://issues.apache.org/jira/browse/CASSANDRA-5234, I don't if you're code fixed it (seems it didn't). But if CQL3 tables are now working with Hadoop they should be returned through thrift describe_keyspace

        Show
        Cyril Scetbon added a comment - Alex Liu Compilation works like a charm with your last patch on cassandra-1.2.3 tag. I'm still having the same issue concerning the invisible column family to thrift API : http://pastebin.com/qGwGMa7r However as it's linked to issue https://issues.apache.org/jira/browse/CASSANDRA-5234 , I don't if you're code fixed it (seems it didn't). But if CQL3 tables are now working with Hadoop they should be returned through thrift describe_keyspace
        Hide
        Jonathan Ellis added a comment -

        if CQL3 tables are now working with Hadoop they should be returned through thrift describe_keyspace

        That doesn't follow at all.

        Show
        Jonathan Ellis added a comment - if CQL3 tables are now working with Hadoop they should be returned through thrift describe_keyspace That doesn't follow at all.
        Hide
        Alex Liu added a comment -

        Cyril ScetbonThe patches is for Hadoop only. For Pig to work, we need modify org.apache.cassandra.hadoop.pig.CassandraStorage to support CQL3. Unfortunately I only implement it for hadoop. I will implement it later after my other assignments are done if no one else have implemented it by that time.

        Show
        Alex Liu added a comment - Cyril Scetbon The patches is for Hadoop only. For Pig to work, we need modify org.apache.cassandra.hadoop.pig.CassandraStorage to support CQL3. Unfortunately I only implement it for hadoop. I will implement it later after my other assignments are done if no one else have implemented it by that time.
        Hide
        Cyril Scetbon added a comment - - edited

        Alex Liu Oh, I was thinking that only thrift was affected by the CQL3 issue and that it was why pig was not working. If Pig needs to be updated too (as you say), I'll wait for that with hope that it will come soon… Can you change the status of CASSANDRA-5234 otherwise it won't be assigned unless there is another JIRA for that ?
        Jonathan Ellis I thought it was ONE whole block

        Show
        Cyril Scetbon added a comment - - edited Alex Liu Oh, I was thinking that only thrift was affected by the CQL3 issue and that it was why pig was not working. If Pig needs to be updated too (as you say), I'll wait for that with hope that it will come soon… Can you change the status of CASSANDRA-5234 otherwise it won't be assigned unless there is another JIRA for that ? Jonathan Ellis I thought it was ONE whole block
        Hide
        Mike Schrag added a comment -

        Alex - would you expect to get duplicate rows out of this code? I wasn't sure if I need to dedupe multiple results due to replication factor > 1. I'm seeing some weird stuff with a large table (nominally should be about 50 million rows) that has been truncate and recreated, and we're up to 500 million rows dumped out of it so far. I'm not quite sure what I'm seeing, yet, but just wanted to explore what the expected behavior is.

        Show
        Mike Schrag added a comment - Alex - would you expect to get duplicate rows out of this code? I wasn't sure if I need to dedupe multiple results due to replication factor > 1. I'm seeing some weird stuff with a large table (nominally should be about 50 million rows) that has been truncate and recreated, and we're up to 500 million rows dumped out of it so far. I'm not quite sure what I'm seeing, yet, but just wanted to explore what the expected behavior is.
        Hide
        Mike Schrag added a comment -

        Also curious if there's any way I would get previously deleted rows back from this?

        Show
        Mike Schrag added a comment - Also curious if there's any way I would get previously deleted rows back from this?
        Hide
        Alex Liu added a comment -

        Mike Schrag If CQL3 statement provided in hadoop job doesn't return duplicate rows then the hadoop job shouldn't return duplicate rows. But CQL 3 doesn't have DISTINCT key word support, so some CQL3 query returns duplicate rows.

        You can test it at one node to see whether the duplicate rows occur, then test it for multiple nodes with rp > 1.

        RF shouldn't create duplicate rows. Let's trace it or reproduce it in a simple way.

        Show
        Alex Liu added a comment - Mike Schrag If CQL3 statement provided in hadoop job doesn't return duplicate rows then the hadoop job shouldn't return duplicate rows. But CQL 3 doesn't have DISTINCT key word support, so some CQL3 query returns duplicate rows. You can test it at one node to see whether the duplicate rows occur, then test it for multiple nodes with rp > 1. RF shouldn't create duplicate rows. Let's trace it or reproduce it in a simple way.
        Hide
        Alex Liu added a comment -

        If you have some snapshot of the nodes, you can restore them, otherwise you will lose the deleted rows after compaction.

        Show
        Alex Liu added a comment - If you have some snapshot of the nodes, you can restore them, otherwise you will lose the deleted rows after compaction.
        Hide
        Mike Schrag added a comment -

        Sorry, i wasn't clear. i'm not trying to recover deleted rows, i'm just trying to explain why a table that we THINK should have 50M rows returns far more than that when we iterate over them in bulk.

        One possibly interesting tidbit is that the table has a compound row key (text, text), so maybe there's something with that detail.

        I've attempted a test with a small CF, and I don't get dupes. I've tested on a 4.5M row CF, and that doesn't appear to get dupes. But this (what we THINK is) 50M row CF does. It takes millions of rows before we start seeing them, though.

        Show
        Mike Schrag added a comment - Sorry, i wasn't clear. i'm not trying to recover deleted rows, i'm just trying to explain why a table that we THINK should have 50M rows returns far more than that when we iterate over them in bulk. One possibly interesting tidbit is that the table has a compound row key (text, text), so maybe there's something with that detail. I've attempted a test with a small CF, and I don't get dupes. I've tested on a 4.5M row CF, and that doesn't appear to get dupes. But this (what we THINK is) 50M row CF does. It takes millions of rows before we start seeing them, though.
        Hide
        Jeremy Hanna added a comment -

        When you mapreduce over the column family, you need to filter out tombstones to get an accurate count. Also, make sure you're setting your consistency level to ensure you have a consistent number.

        Show
        Jeremy Hanna added a comment - When you mapreduce over the column family, you need to filter out tombstones to get an accurate count. Also, make sure you're setting your consistency level to ensure you have a consistent number.
        Hide
        Mike Schrag added a comment -

        I thought CQL3 filtered tombstones automagically? In my case, all the rows I get back actually have data in them. A tombstone would manifest as an empty row, right? I'm beginning to think I'm hitting an edge case bug in this patch. The row count on this one particular inputsplit got up to 400M before i killed it ... It sure looks like it's stuck in some sort of infinite loop. It doesn't happen on every split, just this particular one. I'm going to try and do some more diagnostics.

        Show
        Mike Schrag added a comment - I thought CQL3 filtered tombstones automagically? In my case, all the rows I get back actually have data in them. A tombstone would manifest as an empty row, right? I'm beginning to think I'm hitting an edge case bug in this patch. The row count on this one particular inputsplit got up to 400M before i killed it ... It sure looks like it's stuck in some sort of infinite loop. It doesn't happen on every split, just this particular one. I'm going to try and do some more diagnostics.
        Hide
        Mike Schrag added a comment - - edited

        I've tracked down the bug ... If the token value of the last row of the page == the end value of the split, it ends up trying to fetch the next page using the query:

        SELECT * FROM [cf] WHERE token(key) > token(?) AND token(key) <= ? LIMIT 1000 ALLOW FILTERING

        If you fill this in ... Assume your split is 1000-2000, and the last row of the page happened to actually be the max value 2000, that would be:

        SELECT * FROM [cf] WHERE token(key) > 2000 AND token(key) <= 2000 LIMIT 1000 ALLOW FILTERING

        It looks like Cass freaks out here with the impossible predicate, and where it should be returning an empty result, it ACTUALLY returns bogus values that fall outside the specified range. Once you get a token outside of the split range, you're totally screwed, and everything goes off the rails.

        Show
        Mike Schrag added a comment - - edited I've tracked down the bug ... If the token value of the last row of the page == the end value of the split, it ends up trying to fetch the next page using the query: SELECT * FROM [cf] WHERE token(key) > token(?) AND token(key) <= ? LIMIT 1000 ALLOW FILTERING If you fill this in ... Assume your split is 1000-2000, and the last row of the page happened to actually be the max value 2000, that would be: SELECT * FROM [cf] WHERE token(key) > 2000 AND token(key) <= 2000 LIMIT 1000 ALLOW FILTERING It looks like Cass freaks out here with the impossible predicate, and where it should be returning an empty result, it ACTUALLY returns bogus values that fall outside the specified range. Once you get a token outside of the split range, you're totally screwed, and everything goes off the rails.
        Hide
        Mike Schrag added a comment -
        Show
        Mike Schrag added a comment - I've filed https://issues.apache.org/jira/browse/CASSANDRA-5573 for the Cass issue.
        Hide
        Alex Liu added a comment -

        Attached the third version which fix the issue when pass the columns to inputformat and there are no clustering keys, then the composed query has "null" as one of the column.

        Show
        Alex Liu added a comment - Attached the third version which fix the issue when pass the columns to inputformat and there are no clustering keys, then the composed query has "null" as one of the column.
        Hide
        Jeremy Hanna added a comment -

        Mike: you're right, cql3 does filter range ghosts. FWIW, I have seen where if I've used the default consistency level of ONE (for the CFRR) when counting rows, that an inconsistent number may come back.

        Show
        Jeremy Hanna added a comment - Mike: you're right, cql3 does filter range ghosts. FWIW, I have seen where if I've used the default consistency level of ONE (for the CFRR) when counting rows, that an inconsistent number may come back.
        Hide
        Alex Liu added a comment -

        Version 4 is attached which fix the issue describe at CASSANDRA-5573

        Show
        Alex Liu added a comment - Version 4 is attached which fix the issue describe at CASSANDRA-5573
        Hide
        Mike Schrag added a comment -

        Looks good. Thanks, Alex.

        Show
        Mike Schrag added a comment - Looks good. Thanks, Alex.
        Hide
        Mike Schrag added a comment -

        I spoke too soon (I accidentally had my override classes still in place). reachEndRange is consuming the partition key ByteBuffers, so they're basically empty values every time, so the split just keeps repeating starting at the beginning.

        At the top of reachEndRange, if you:

        for (Key k : partitionKeys) k.value.mark();

        and at the bottom, you can reset them:

        for (Key k : partitionKeys) k.value.reset();

        that will fix it.

        Show
        Mike Schrag added a comment - I spoke too soon (I accidentally had my override classes still in place). reachEndRange is consuming the partition key ByteBuffers, so they're basically empty values every time, so the split just keeps repeating starting at the beginning. At the top of reachEndRange, if you: for (Key k : partitionKeys) k.value.mark(); and at the bottom, you can reset them: for (Key k : partitionKeys) k.value.reset(); that will fix it.
        Hide
        Alex Liu added a comment -

        Version 5 is attached to add the fix by Mike

        Show
        Alex Liu added a comment - Version 5 is attached to add the fix by Mike
        Hide
        Jonathan Ellis added a comment -

        I dug in and started cleaning things up: https://github.com/jbellis/cassandra/commits/4421. (NB: there's a bunch more redundant @Override annotations that I did not clean up. Those should go too: http://wiki.apache.org/cassandra/CodeStyle.)

        But when I looked up from the code I realized that we have an important discussion to have first: What API should we present? I don't think RecordReader<List<IColumn>, Map<ByteBuffer, IColumn>> and RecordWriter<ByteBuffer, List<List<ByteBuffer>>> are it.

        Show
        Jonathan Ellis added a comment - I dug in and started cleaning things up: https://github.com/jbellis/cassandra/commits/4421 . (NB: there's a bunch more redundant @Override annotations that I did not clean up. Those should go too: http://wiki.apache.org/cassandra/CodeStyle .) But when I looked up from the code I realized that we have an important discussion to have first: What API should we present? I don't think RecordReader<List<IColumn>, Map<ByteBuffer, IColumn>> and RecordWriter<ByteBuffer, List<List<ByteBuffer>>> are it.
        Hide
        Jonathan Ellis added a comment -

        I think there are two sane alternatives for the reader. We could expose RecordReader<List<ByteBuffer>, List<ByteBuffer>> and assume the caller can figure out what his PK definition is, and what columns he asked for and therefore what the List items correspond to.

        Alternatively we could expose RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>>, which makes it a lot harder for the caller to screw things up, while also making it more convenient. (The only reason the original CFRR presents a Map as the value is for convenience in referring to columns by name.)

        (The Map should be a LinkedHashMap to preserve order as well.)

        The best argument for sticking with the List is that we're basically forced to use a List for the Writer's bind variables, since we don't support named parameters in CQL. Which would imply RecordWriter<List<ByteBuffer>, <List<ByteBuffer>>. The key should be a List, since we can have compound PKs and we don't want to force people to turn those into a single BB via CompositeType. And the value should just be a single list of bind variables because the list-of-lists is a hold over from the original CFRW. (Where TBH I don't think it made sense either but we're kind of stuck with it no for backwards compatibility.)

        Or, we could do RecordWriter<Map<String, ByteBuffer>, <List<ByteBuffer>>, for consistency with a Map-based Reader.

        Thoughts?

        Show
        Jonathan Ellis added a comment - I think there are two sane alternatives for the reader. We could expose RecordReader<List<ByteBuffer>, List<ByteBuffer>> and assume the caller can figure out what his PK definition is, and what columns he asked for and therefore what the List items correspond to. Alternatively we could expose RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> , which makes it a lot harder for the caller to screw things up, while also making it more convenient. (The only reason the original CFRR presents a Map as the value is for convenience in referring to columns by name.) (The Map should be a LinkedHashMap to preserve order as well.) The best argument for sticking with the List is that we're basically forced to use a List for the Writer's bind variables, since we don't support named parameters in CQL. Which would imply RecordWriter<List<ByteBuffer>, <List<ByteBuffer>> . The key should be a List, since we can have compound PKs and we don't want to force people to turn those into a single BB via CompositeType. And the value should just be a single list of bind variables because the list-of-lists is a hold over from the original CFRW. (Where TBH I don't think it made sense either but we're kind of stuck with it no for backwards compatibility.) Or, we could do RecordWriter<Map<String, ByteBuffer>, <List<ByteBuffer>> , for consistency with a Map-based Reader. Thoughts?
        Hide
        Alex Liu added a comment -

        RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> and RecordWriter<Map<String, ByteBuffer>, <List<ByteBuffer>> is better than RecordReader<List<ByteBuffer>, List<ByteBuffer>> and RecordWriter<List<ByteBuffer>, <List<ByteBuffer>>

        RecordReader<List<ByteBuffer>, List<ByteBuffer>> and RecordWriter<List<ByteBuffer>, <List<ByteBuffer>> is more concise, but user needs be careful not to screw up the order. If we go this route, we should document it clearly.

        Show
        Alex Liu added a comment - RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> and RecordWriter<Map<String, ByteBuffer>, <List<ByteBuffer>> is better than RecordReader<List<ByteBuffer>, List<ByteBuffer>> and RecordWriter<List<ByteBuffer>, <List<ByteBuffer>> RecordReader<List<ByteBuffer>, List<ByteBuffer>> and RecordWriter<List<ByteBuffer>, <List<ByteBuffer>> is more concise, but user needs be careful not to screw up the order. If we go this route, we should document it clearly.
        Hide
        Jonathan Ellis added a comment -

        All right, let's go with the Map versions then.

        Show
        Jonathan Ellis added a comment - All right, let's go with the Map versions then.
        Hide
        Alex Liu added a comment -

        Jonathan Ellis I attached a patch for PIG CQL3 support to CASSANDRA-5234. It works on top of the patch for CASSANDRA-4421.

        Show
        Alex Liu added a comment - Jonathan Ellis I attached a patch for PIG CQL3 support to CASSANDRA-5234 . It works on top of the patch for CASSANDRA-4421 .
        Hide
        Jonathan Ellis added a comment -

        I'm confused, is there a patch w/ updated signatures somewhere for this?

        Show
        Jonathan Ellis added a comment - I'm confused, is there a patch w/ updated signatures somewhere for this?
        Hide
        Alex Liu added a comment -

        The patch is attached to CASSANDRA-5234, which fixes the issue that CQL3 table is not supported in CassandraStorage and it also creates CQL3Storage for cql3 tables only. The patch is built on top of 4421-5.txt.

        Show
        Alex Liu added a comment - The patch is attached to CASSANDRA-5234 , which fixes the issue that CQL3 table is not supported in CassandraStorage and it also creates CQL3Storage for cql3 tables only. The patch is built on top of 4421-5.txt.
        Hide
        Jonathan Ellis added a comment -

        What we need next here is for you to build on the branch I posted, to fix the CFIF/CFOF signatures as discussed above. I don't see that in the 5234 code, but in any case it's best to not entangle the two. (What you can do locally is copy your 4421 branch to work on 5234 on top of it, but keep the 4421 branch itself "pure.")

        Show
        Jonathan Ellis added a comment - What we need next here is for you to build on the branch I posted, to fix the CFIF/CFOF signatures as discussed above. I don't see that in the 5234 code, but in any case it's best to not entangle the two. (What you can do locally is copy your 4421 branch to work on 5234 on top of it, but keep the 4421 branch itself "pure.")
        Hide
        Alex Liu added a comment -

        Sure. I will work on the branch you posted to change the signatures.

        Show
        Alex Liu added a comment - Sure. I will work on the branch you posted to change the signatures.
        Hide
        Colin B. added a comment - - edited

        If anyone cares, attached is 4421-6.cb.txt which is the 4421-5 patch on top of commit 2f72f8b in cassandra-1.2

        Show
        Colin B. added a comment - - edited If anyone cares, attached is 4421-6.cb.txt which is the 4421-5 patch on top of commit 2f72f8b in cassandra-1.2
        Hide
        Alex Liu added a comment -

        I attach patch 4421-6-je.txt to fix the CFIF/CFOF signatures.

        Show
        Alex Liu added a comment - I attach patch 4421-6-je.txt to fix the CFIF/CFOF signatures.
        Hide
        Alex Liu added a comment -

        patch 4421-6-je.txt on top of Jonathan Ellis's branch. Colin B. Can you describe what do you fix in your patch?

        Show
        Alex Liu added a comment - patch 4421-6-je.txt on top of Jonathan Ellis 's branch. Colin B. Can you describe what do you fix in your patch?
        Hide
        Colin B. added a comment - - edited

        Obviously your new patch is better than mine. The major differences between the two are:

        • lots of little formatting changes, 4421-6-je is better here
        • 4421-6-je seems to leave out the example code
        • the ClientHolder stuff, which 4421-6-je doesn't include
        • fix conflicts with CASSANDRA-5536
        • fix conflicts with CASSANDRA-5529

        The conflict resolution is the only difficulty I noticed while putting the patch onto the head of cassandra-1.2 . Overall not a big deal.

        Show
        Colin B. added a comment - - edited Obviously your new patch is better than mine. The major differences between the two are: lots of little formatting changes, 4421-6-je is better here 4421-6-je seems to leave out the example code the ClientHolder stuff, which 4421-6-je doesn't include fix conflicts with CASSANDRA-5536 fix conflicts with CASSANDRA-5529 The conflict resolution is the only difficulty I noticed while putting the patch onto the head of cassandra-1.2 . Overall not a big deal.
        Hide
        Jonathan Ellis added a comment -

        final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<List<ByteBuffer>>>

        We don't need to have list-of-list for values

        Show
        Jonathan Ellis added a comment - final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<List<ByteBuffer>>> We don't need to have list-of-list for values
        Hide
        Alex Liu added a comment -

        4421-7-je.txt patch is attached to change List<List<ByteBuffer>> to List<ByteBuffer>

        Show
        Alex Liu added a comment - 4421-7-je.txt patch is attached to change List<List<ByteBuffer>> to List<ByteBuffer>
        Hide
        Jonathan Ellis added a comment -

        Added to my github branch, with some extra cleanup. (Noticed I forgot to add the examples/ directory before. This probably needs to be updated.)

        I think we're ready for you to rebase to 1.2 now. May or may not be easiest to take Colin's rebase and apply the patches from my branch on top. ("git remote add" is probably the easiest way to grab my branch.)

        Show
        Jonathan Ellis added a comment - Added to my github branch, with some extra cleanup. (Noticed I forgot to add the examples/ directory before. This probably needs to be updated.) I think we're ready for you to rebase to 1.2 now. May or may not be easiest to take Colin's rebase and apply the patches from my branch on top. ("git remote add" is probably the easiest way to grab my branch.)
        Hide
        Mike Schrag added a comment -

        I just noticed that if executeQuery times out, you just lose rows (along with all the other failure conditions), and that doesn't actually bubble the exception up to the runtime, so the consumer can't actually respond to the failure, so you end up with a null result, which i believe will make the page appear to be done. It would be much better to either support retry here, or throw the exception all the way up so that clients can retry on their own. I haven't checked the new round of patches to see if this behaves the same way.

        Show
        Mike Schrag added a comment - I just noticed that if executeQuery times out, you just lose rows (along with all the other failure conditions), and that doesn't actually bubble the exception up to the runtime, so the consumer can't actually respond to the failure, so you end up with a null result, which i believe will make the page appear to be done. It would be much better to either support retry here, or throw the exception all the way up so that clients can retry on their own. I haven't checked the new round of patches to see if this behaves the same way.
        Hide
        Mike Schrag added a comment -

        Note that to recover from some of those failures, you have to reconnect the ColumnFamilyRecordReader. It would be helpful to have the connection code moved into a connect() method and make close() null out the client so you can cycle the underlying connection in the event of a more traumatic failure case. Ideally, the iterators and tokens would all be left alone so that you can pickup where you left off.

        Show
        Mike Schrag added a comment - Note that to recover from some of those failures, you have to reconnect the ColumnFamilyRecordReader. It would be helpful to have the connection code moved into a connect() method and make close() null out the client so you can cycle the underlying connection in the event of a more traumatic failure case. Ideally, the iterators and tokens would all be left alone so that you can pickup where you left off.
        Hide
        Mike Schrag added a comment -

        Ugh .. Kind of nasty. AbstractIterator (which RowIterator extends) poisons itself on failure without any apparent way to recover. It gets stuck in State.FAILED and you can't reset it. That seems overly aggressive.

        Show
        Mike Schrag added a comment - Ugh .. Kind of nasty. AbstractIterator (which RowIterator extends) poisons itself on failure without any apparent way to recover. It gets stuck in State.FAILED and you can't reset it. That seems overly aggressive.
        Hide
        Colin B. added a comment -

        Attached is patch 4421-8-cb containing the changes from jbellis's branch. It applies onto cassandra-1.2 cleanly (11eb352).

        Show
        Colin B. added a comment - Attached is patch 4421-8-cb containing the changes from jbellis's branch. It applies onto cassandra-1.2 cleanly (11eb352).
        Hide
        Mike Schrag added a comment -

        Yeah, recycling the RowIterator is probably too complicated. However, I do think the Exception should bubble up. I can then catch it and reread the split at the app level to pick back up where i left off.

        Show
        Mike Schrag added a comment - Yeah, recycling the RowIterator is probably too complicated. However, I do think the Exception should bubble up. I can then catch it and reread the split at the app level to pick back up where i left off.
        Hide
        Alex Liu added a comment -

        I am fixing the new merged code infinite loop issue and make the example work. I will post the final merge later.

        Show
        Alex Liu added a comment - I am fixing the new merged code infinite loop issue and make the example work. I will post the final merge later.
        Hide
        Alex Liu added a comment -

        Mike Schrag For any cql timeout or other uncaught exception, we have two choices:

        1. Catch it at your client code, so you can handle it after job is done.

        2. Write it to log, so you can check it where is wrong.

        3. Like you said to retry at the point where it fails.

        So choice 1 is a common solution, 2 is the easiest to implement. 3 is quite some work to do to make it's reliable and robust.

        Show
        Alex Liu added a comment - Mike Schrag For any cql timeout or other uncaught exception, we have two choices: 1. Catch it at your client code, so you can handle it after job is done. 2. Write it to log, so you can check it where is wrong. 3. Like you said to retry at the point where it fails. So choice 1 is a common solution, 2 is the easiest to implement. 3 is quite some work to do to make it's reliable and robust.
        Hide
        Alex Liu added a comment -

        4421-8-je.txt is attached to merge the final code and some fixes on example on top of trunk commit 9e8691c26283f2532be3101486a8290ed5128c18

        Show
        Alex Liu added a comment - 4421-8-je.txt is attached to merge the final code and some fixes on example on top of trunk commit 9e8691c26283f2532be3101486a8290ed5128c18
        Hide
        Mike Schrag added a comment -

        Alex - I vote #1, definitely. I need to be able to handle these conditions inline, not after the job is done. If you implement #1, the user can choose #2 if s/he wants. If the library chooses #2 for you, you're just out-of-luck. Particularly in the case of a timeout, that's a relatively straightforward situation to resolve in many cases.

        Show
        Mike Schrag added a comment - Alex - I vote #1, definitely. I need to be able to handle these conditions inline, not after the job is done. If you implement #1, the user can choose #2 if s/he wants. If the library chooses #2 for you, you're just out-of-luck. Particularly in the case of a timeout, that's a relatively straightforward situation to resolve in many cases.
        Hide
        Jonathan Ellis added a comment -

        +1 for #1.

        We'll want a patch against 1.2 as well as trunk, btw.

        Show
        Jonathan Ellis added a comment - +1 for #1. We'll want a patch against 1.2 as well as trunk, btw.
        Hide
        Alex Liu added a comment -

        4421-9-je.txt patch is attached on top 9e8691c26283f2532be3101486a8290ed5128c18 of trunk to add exception handling.

        try three time for TimedOutException and UnavailableException, any other exception is thrown back to client as IOException with the original cause throwable.

        Client side can catch it and handle at client side. check this link for example
        http://stackoverflow.com/questions/14920236/how-to-prevent-hadoop-job-to-fail-on-corrupted-input-file

        Show
        Alex Liu added a comment - 4421-9-je.txt patch is attached on top 9e8691c26283f2532be3101486a8290ed5128c18 of trunk to add exception handling. try three time for TimedOutException and UnavailableException, any other exception is thrown back to client as IOException with the original cause throwable. Client side can catch it and handle at client side. check this link for example http://stackoverflow.com/questions/14920236/how-to-prevent-hadoop-job-to-fail-on-corrupted-input-file
        Hide
        Alex Liu added a comment - - edited

        Attach the final version 10 patches for trunk and 1.2 branch. It fixes some issue with writer

        Show
        Alex Liu added a comment - - edited Attach the final version 10 patches for trunk and 1.2 branch. It fixes some issue with writer
        Hide
        Jonathan Ellis added a comment -

        Committed!

        Show
        Jonathan Ellis added a comment - Committed!
        Hide
        Jonathan Ellis added a comment -

        (Post-commit, I renamed TClientTransportFactory to ITransportFactory.)

        Show
        Jonathan Ellis added a comment - (Post-commit, I renamed TClientTransportFactory to ITransportFactory.)

          People

          • Assignee:
            Alex Liu
            Reporter:
            bert Passek
            Reviewer:
            Jonathan Ellis
            Tester:
            Ryan McGuire
          • Votes:
            15 Vote for this issue
            Watchers:
            18 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development