Cassandra
  1. Cassandra
  2. CASSANDRA-1101

A Hadoop Output Format That Targets Cassandra

    Details

      Description

      Currently, there exists a Hadoop-specific input format (viz., ColumnFamilyInputFormat) that allows one to iterate over the rows in a given Cassandra column family and treat it as the input to a Hadoop map task. By the same token, one may need to feed the output of a Hadoop reduce task into a Cassandra column family, for which no mechanism exists today. This calls for the definition of a Hadoop-specific output format which accepts a pair of key and columns, and writes it out to a given column family.

      Here, we describe an output format known as ColumnFamilyOutputFormat, which allows reduce tasks to persist keys and their associated columns as Cassandra rows in a given column family. By default, it prevents overwriting existing rows in the column family, by ensuring at initialization time that it contains no rows in the given slice predicate. For the sake of speed, it employs a lazy write-back caching mechanism, where its record writer batches mutations created based on the reduce's inputs (in a task-specific map) but stops short of actually mutating the rows. The latter responsibility falls on its output committer, which makes the changes official by sending a batch mutate request to Cassandra.

      The record writer, which is called ColumnFamilyRecordWriter, maps the input <key, value> pairs to a Cassandra column family. In particular, it creates mutations for each column in the value, which it then associates with the key, and in turn the responsible endpoint. Note that, given that round trips to the server are fairly expensive, it merely batches the mutations in-memory, and leaves it on the output committer to send the batched mutations to the server. Furthermore, the writer groups the mutations by the endpoint responsible for the rows being affected. This allows the output committer to execute the mutations in parallel, on an endpoint-by-endpoint basis.

      The output committer, which is called ColumnFamilyOutputCommitter, traverses the mutations collected by the record writer, and sends them to the endpoints responsible for them. Since the total set of mutations is partitioned by their endpoints, each of which can be performed in parallel, it allows us to commit the mutations using multiple threads, one per endpoint. As a result, it reduces the time it takes to propagate the mutations to the server considering that (a) the client eliminates one network hop that the server would otherwise have had to make and (b) each endpoint node has to deal with but a sub-set of the total set of mutations.

      For convenience, we also define a default reduce task, called ColumnFamilyOutputReducer, which collects the columns in the input value and maps them to a data structure expected by Cassandra. By default, it assumes the input value to be in the form of a ColumnWritable, which denotes a name value pair corresponding to a certain column. This reduce task is in turn used by the attached test case, which maps every <key, value> pair in a sample input sequence file to a <key, column> pair, and then reduces them by aggregating columns corresponding to the same key. Eventually, the batched <key, columns> pairs are written to the column family associated with the output format.

      1. 1101-clock-fix.diff
        3 kB
        Stu Hood
      2. CASSANDRA-1101.patch
        49 kB
        Karthick Sankarachary
      3. CASSANDRA-1101-V1.patch
        43 kB
        Karthick Sankarachary
      4. CASSANDRA-1101-V2.patch
        42 kB
        Karthick Sankarachary
      5. CASSANDRA-1101-V3.patch
        50 kB
        Karthick Sankarachary
      6. CASSANDRA-1101-V4.patch
        52 kB
        Karthick Sankarachary
      7. CASSANDRA-1101-V5.patch
        49 kB
        Karthick Sankarachary

        Issue Links

          Activity

          Hide
          Karthick Sankarachary added a comment -

          Dear Committers,

          Please take a look at the attached patch, and see if it's something you would like to commit. Needless to say, it comes with a test case, albeit a basic one.

          Best Regards,
          Karthick Sankarachary

          Show
          Karthick Sankarachary added a comment - Dear Committers, Please take a look at the attached patch, and see if it's something you would like to commit. Needless to say, it comes with a test case, albeit a basic one. Best Regards, Karthick Sankarachary
          Hide
          Stu Hood added a comment -

          Hey Karthick, thanks a lot for the contribution! I'll review this during the next few days.

          Show
          Stu Hood added a comment - Hey Karthick, thanks a lot for the contribution! I'll review this during the next few days.
          Hide
          Karthick Sankarachary added a comment - - edited

          Thanks Stu, I look forward to your review comments. As you can probably tell, this output format was written along the lines of the existing input format. That said, if you feel that a revision is warranted, please let me know.

          Show
          Karthick Sankarachary added a comment - - edited Thanks Stu, I look forward to your review comments. As you can probably tell, this output format was written along the lines of the existing input format. That said, if you feel that a revision is warranted, please let me know.
          Hide
          Stu Hood added a comment -

          After looking through the code, I'm not sure I'm convinced that the OutputCommitter is necessary. I understand that the intention is that if a Map task fails, you'd want to avoid writing its output to Cassandra, but I think a committer is only really useful for OutputFormats where you get atomic behaviour (via RDBMS transactions, or via an atomic HDFS rename). It is just as likely (probably more likely) that the OutputCommitter fails halfway through its mutations as it is that the RecordWriter fails halfway through writing, and then your column family is left in an in-between state anyway.

          Perhaps another approach (which still disconnects the round trip to the Cassandra servers from RecordWriter.write()), would be to make your Executor a member of ColumnFamilyRecordWriter, still with a thread/callable per endpoint, but with a per-endpoint queue of Mutations. RecordWriter.write() would add to the queue for the appropriate endpoint, and each thread in the Executor would batch writes to its endpoint.

          Also, could you make the SlicePredicate and get_range_slices call in checkOutputSpecs optional, and if the predicate isn't present in the Configuration, skip the check? I expect that a lot of people will be fine with overwriting whatever exists, and needing to subclass to do it would be an unnecessary roadblock.

          Finally, some minor nitpicks:

          • Third paragraph in ColumnFamilyOutputFormat javadoc is stale
          • EndpointCallable.call doesn't close the Thrift socket it opens

          Thanks a bunch for your work here: this seems like a really solid base!

          Show
          Stu Hood added a comment - After looking through the code, I'm not sure I'm convinced that the OutputCommitter is necessary. I understand that the intention is that if a Map task fails, you'd want to avoid writing its output to Cassandra, but I think a committer is only really useful for OutputFormats where you get atomic behaviour (via RDBMS transactions, or via an atomic HDFS rename). It is just as likely (probably more likely) that the OutputCommitter fails halfway through its mutations as it is that the RecordWriter fails halfway through writing, and then your column family is left in an in-between state anyway. Perhaps another approach (which still disconnects the round trip to the Cassandra servers from RecordWriter.write()), would be to make your Executor a member of ColumnFamilyRecordWriter, still with a thread/callable per endpoint, but with a per-endpoint queue of Mutations. RecordWriter.write() would add to the queue for the appropriate endpoint, and each thread in the Executor would batch writes to its endpoint. Also, could you make the SlicePredicate and get_range_slices call in checkOutputSpecs optional, and if the predicate isn't present in the Configuration, skip the check? I expect that a lot of people will be fine with overwriting whatever exists, and needing to subclass to do it would be an unnecessary roadblock. Finally, some minor nitpicks: Third paragraph in ColumnFamilyOutputFormat javadoc is stale EndpointCallable.call doesn't close the Thrift socket it opens Thanks a bunch for your work here: this seems like a really solid base!
          Hide
          Karthick Sankarachary added a comment -

          After looking through the code, I'm not sure I'm convinced that the OutputCommitter is necessary. I understand that the intention is that if a Map task fails, you'd want to avoid writing its output to Cassandra, but I think a committer is only really useful for OutputFormats where you get atomic behaviour (via RDBMS transactions, or via an atomic HDFS rename). It is just as likely (probably more likely) that the OutputCommitter fails halfway through its mutations as it is that the RecordWriter fails halfway through writing, and then your column family is left in an in-between state anyway.

          Agreed. Considering that Cassandra doesn't qualify as a truly transactional system, at least not yet, bringing the OutputCommitter in the mix might be a tad misleading. So, as you suggested, I've moved the pseudo-commit logic that used to be in ColumnFamilyOutputCommitter#commitTask into ColumnFamilyRecordWriter#close.

          Also, could you make the SlicePredicate and get_range_slices call in checkOutputSpecs optional, and if the predicate isn't present in the Configuration, skip the check? I expect that a lot of people will be fine with overwriting whatever exists, and needing to subclass to do it would be an unnecessary roadblock.

          Good point. Now, if no slice predicate is present in the Configuration, then it won't bail out.

          Third paragraph in ColumnFamilyOutputFormat javadoc is stale

          Updated that paragraph to reflect the changes above.

          EndpointCallable.call doesn't close the Thrift socket it opens

          Good catch.

          The revised patch has been attached at the same location as before.

          Finally, going back to your first point about Cassandra not being transactional, is that something that's worth looking into? I've done some work in that area, such as the atomic feature described in ODE-396, among other things.

          Show
          Karthick Sankarachary added a comment - After looking through the code, I'm not sure I'm convinced that the OutputCommitter is necessary. I understand that the intention is that if a Map task fails, you'd want to avoid writing its output to Cassandra, but I think a committer is only really useful for OutputFormats where you get atomic behaviour (via RDBMS transactions, or via an atomic HDFS rename). It is just as likely (probably more likely) that the OutputCommitter fails halfway through its mutations as it is that the RecordWriter fails halfway through writing, and then your column family is left in an in-between state anyway. Agreed. Considering that Cassandra doesn't qualify as a truly transactional system, at least not yet, bringing the OutputCommitter in the mix might be a tad misleading. So, as you suggested, I've moved the pseudo-commit logic that used to be in ColumnFamilyOutputCommitter#commitTask into ColumnFamilyRecordWriter#close. Also, could you make the SlicePredicate and get_range_slices call in checkOutputSpecs optional, and if the predicate isn't present in the Configuration, skip the check? I expect that a lot of people will be fine with overwriting whatever exists, and needing to subclass to do it would be an unnecessary roadblock. Good point. Now, if no slice predicate is present in the Configuration, then it won't bail out. Third paragraph in ColumnFamilyOutputFormat javadoc is stale Updated that paragraph to reflect the changes above. EndpointCallable.call doesn't close the Thrift socket it opens Good catch. The revised patch has been attached at the same location as before. Finally, going back to your first point about Cassandra not being transactional, is that something that's worth looking into? I've done some work in that area, such as the atomic feature described in ODE-396 , among other things.
          Hide
          Stu Hood added a comment -

          Thanks for the quick update Karthick!

          I'm still a bit worried about storing all mutations until each task finishes though, because the entire output for a task needs to fit in memory. Would you be comfortable updating this code to use separate threads per endpoint to perform the batch commits, as I suggested? Or should we wait until after this is committed to improve that?

          Show
          Stu Hood added a comment - Thanks for the quick update Karthick! I'm still a bit worried about storing all mutations until each task finishes though, because the entire output for a task needs to fit in memory. Would you be comfortable updating this code to use separate threads per endpoint to perform the batch commits, as I suggested? Or should we wait until after this is committed to improve that?
          Hide
          Karthick Sankarachary added a comment -

          Ah, I guess I conveniently glossed over the memory issue. To address that concern, I took a slightly different approach. Basically, I defined a configuration property called "mapreduce.output.columnfamilyoutputformat.batch.threshold" (see ColumnFamilyOutputFormat#BATCH_THRESHOLD), which will now force the ColumnFamilyRecordWriter to flush its cache of mutations if it contains more entries than the specified threshold. Ideally, I'd liked to have had an absolute threshold, but that's trickier to implement, so I didn't.

          As far as having separate threads per endpoint, I'm not convinced that that will necessarily buy us anything (it might be a case of too little too late). Right now, we kind of make sure that each reduce task gets its own cache, so at least in the context of a given node, they won't step over each other. At any rate, if you'd like to improve on that part after this is committed, that's fine with me.

          Show
          Karthick Sankarachary added a comment - Ah, I guess I conveniently glossed over the memory issue. To address that concern, I took a slightly different approach. Basically, I defined a configuration property called "mapreduce.output.columnfamilyoutputformat.batch.threshold" (see ColumnFamilyOutputFormat#BATCH_THRESHOLD), which will now force the ColumnFamilyRecordWriter to flush its cache of mutations if it contains more entries than the specified threshold. Ideally, I'd liked to have had an absolute threshold, but that's trickier to implement, so I didn't. As far as having separate threads per endpoint, I'm not convinced that that will necessarily buy us anything (it might be a case of too little too late). Right now, we kind of make sure that each reduce task gets its own cache, so at least in the context of a given node, they won't step over each other. At any rate, if you'd like to improve on that part after this is committed, that's fine with me.
          Hide
          Stu Hood added a comment - - edited

          It doesn't look like the JobContext.IO_SORT_MB property you use in the test exists in Hadoop 0.20.1, which trunk is using. Is that property necessary?

          ColumnFamilyOutputCommitter is still in the latest patch: can you remove it?

          ColumnFamilyOutputFormatTest throws a ClassNotFoundException, since not all of Hadoop's dependencies are being pulled in. Can you add the dependencies to ivy.xml, or record the minimal set of dependencies which we can add manually to the lib/ folder?

          Also, one more nitpick: can you update the code to follow http://wiki.apache.org/cassandra/CodeStyle ?

          Thanks!

          Show
          Stu Hood added a comment - - edited It doesn't look like the JobContext.IO_SORT_MB property you use in the test exists in Hadoop 0.20.1, which trunk is using. Is that property necessary? ColumnFamilyOutputCommitter is still in the latest patch: can you remove it? ColumnFamilyOutputFormatTest throws a ClassNotFoundException, since not all of Hadoop's dependencies are being pulled in. Can you add the dependencies to ivy.xml, or record the minimal set of dependencies which we can add manually to the lib/ folder? Also, one more nitpick: can you update the code to follow http://wiki.apache.org/cassandra/CodeStyle ? Thanks!
          Hide
          Karthick Sankarachary added a comment -

          It doesn't look like the JobContext.IO_SORT_MB property you use in the test exists in Hadoop 0.20.1, which trunk is using. Is that property necessary?

          It's gone. At one point, I was running out of memory, and copped out by setting that limit lower, but I now believe that it's a non-issue.

          ColumnFamilyOutputCommitter is still in the latest patch: can you remove it?

          Done, sorry about that.

          ColumnFamilyOutputFormatTest throws a ClassNotFoundException, since not all of Hadoop's dependencies are being pulled in. Can you add the dependencies to ivy.xml, or record the minimal set of dependencies which we can add manually to the lib/ folder?

          As it turns out, the missing dependency was commons-logging, which I've added to ivy.xml. Just to clarify, this feature works with the existing Hadoop 0.20.1 library, and appears to be forward compatible with future versions as well.

          Also, one more nitpick: can you update the code to follow http://wiki.apache.org/cassandra/CodeStyle ?

          Done.

          On a related note, I had to modify my test case so that it works in a standalone mode, and more importantly in the context of the ant script. Before, the test case worked with the assumption that a Cassandra server would be running locally. Now, it automatically kicks off an embedded Cassandra server (see org.apache.cassandra.EmbeddedServer), which by default uses a Thrift-based daemon, but may be overloaded to use an Avro-based daemon, if so desired. Note that there's a similar service called EmbeddedCassandraService, but it appears to be locked into thrift, and moreover, does not follow the exact same lifecycle that the daemons define in their main method.

          Regards,
          Karthick

          Show
          Karthick Sankarachary added a comment - It doesn't look like the JobContext.IO_SORT_MB property you use in the test exists in Hadoop 0.20.1, which trunk is using. Is that property necessary? It's gone. At one point, I was running out of memory, and copped out by setting that limit lower, but I now believe that it's a non-issue. ColumnFamilyOutputCommitter is still in the latest patch: can you remove it? Done, sorry about that. ColumnFamilyOutputFormatTest throws a ClassNotFoundException, since not all of Hadoop's dependencies are being pulled in. Can you add the dependencies to ivy.xml, or record the minimal set of dependencies which we can add manually to the lib/ folder? As it turns out, the missing dependency was commons-logging, which I've added to ivy.xml. Just to clarify, this feature works with the existing Hadoop 0.20.1 library, and appears to be forward compatible with future versions as well. Also, one more nitpick: can you update the code to follow http://wiki.apache.org/cassandra/CodeStyle ? Done. On a related note, I had to modify my test case so that it works in a standalone mode, and more importantly in the context of the ant script. Before, the test case worked with the assumption that a Cassandra server would be running locally. Now, it automatically kicks off an embedded Cassandra server (see org.apache.cassandra.EmbeddedServer), which by default uses a Thrift-based daemon, but may be overloaded to use an Avro-based daemon, if so desired. Note that there's a similar service called EmbeddedCassandraService, but it appears to be locked into thrift, and moreover, does not follow the exact same lifecycle that the daemons define in their main method. Regards, Karthick
          Hide
          Stu Hood added a comment -

          Hey Karthick, sorry for the additional delay reviewing this. I'm attaching one patch I had to apply to get it to build against trunk, but this should be the last round of review.

          • ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays
          • ColumnWritable implements equality in terms of reference equality: is that intentional?
          • ColumnWritable assumes both name and value can be converted to Strings, which is not safe: use FBUtilities.(write|read)ByteArray instead
          • Why add EmbeddedServer rather than using EmbeddedCassandraService?

          Thanks a ton for your work!

          Show
          Stu Hood added a comment - Hey Karthick, sorry for the additional delay reviewing this. I'm attaching one patch I had to apply to get it to build against trunk, but this should be the last round of review. ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays ColumnWritable implements equality in terms of reference equality: is that intentional? ColumnWritable assumes both name and value can be converted to Strings, which is not safe: use FBUtilities.(write|read)ByteArray instead Why add EmbeddedServer rather than using EmbeddedCassandraService? Thanks a ton for your work!
          Hide
          Karthick Sankarachary added a comment - - edited
          • ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays

          That totally makes sense.

          • ColumnWritable implements equality in terms of reference equality: is that intentional?

          My intent was to comply with the Comparable class' recommendation that '(x.compareTo( y )==0) == (x.equals( y )).' To that end, I rewrote the equality in terms of the comparable method.

          • ColumnWritable assumes both name and value can be converted to Strings, which is not safe: use FBUtilities.(write|read)ByteArray instead

          Good idea.

          • Why add EmbeddedServer rather than using EmbeddedCassandraService?

          There was a two-fold reason why I chose to write EmbeddedServer: (a) The EmbeddedCassandraService appears to be locked into the Thrift CassandraDaemon, whereas EmbeddedServer works for both the Thrift and Avro (and hopefully any future remoting framework), (b) The EmbeddedCassandraService does not follow the exact same lifecycle as is prescribed in the the main method of the CassandraDaemon, which the EmbeddedServer does (to the letter). In fact, I believe that we should try and define an abstraction for the CassandraDaemon (as I've attempted to do in CASSANDRA-1131), because it'll make it easier for tools and test frameworks to launch the Cassandra service in a transport-agnostic way.

          The latest patch incorporates the changes described above, and includes your patch to boot.

          Show
          Karthick Sankarachary added a comment - - edited ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays That totally makes sense. ColumnWritable implements equality in terms of reference equality: is that intentional? My intent was to comply with the Comparable class' recommendation that '(x.compareTo( y )==0) == (x.equals( y )).' To that end, I rewrote the equality in terms of the comparable method. ColumnWritable assumes both name and value can be converted to Strings, which is not safe: use FBUtilities.(write|read)ByteArray instead Good idea. Why add EmbeddedServer rather than using EmbeddedCassandraService? There was a two-fold reason why I chose to write EmbeddedServer: (a) The EmbeddedCassandraService appears to be locked into the Thrift CassandraDaemon, whereas EmbeddedServer works for both the Thrift and Avro (and hopefully any future remoting framework), (b) The EmbeddedCassandraService does not follow the exact same lifecycle as is prescribed in the the main method of the CassandraDaemon, which the EmbeddedServer does (to the letter). In fact, I believe that we should try and define an abstraction for the CassandraDaemon (as I've attempted to do in CASSANDRA-1131 ), because it'll make it easier for tools and test frameworks to launch the Cassandra service in a transport-agnostic way. The latest patch incorporates the changes described above, and includes your patch to boot.
          Hide
          Stu Hood added a comment -

          +1 on getting this merged... maybe one of the committers will have an opinion on EmbeddedCassandraService vs EmbeddedServer.

          Thanks a lot for your work Karthick!

          Show
          Stu Hood added a comment - +1 on getting this merged... maybe one of the committers will have an opinion on EmbeddedCassandraService vs EmbeddedServer. Thanks a lot for your work Karthick!
          Hide
          Karthick Sankarachary added a comment -

          Thanks Stu for your review comments - it was a learning experience for me. Needless to say, +1 from me on committing this as well.

          Show
          Karthick Sankarachary added a comment - Thanks Stu for your review comments - it was a learning experience for me. Needless to say, +1 from me on committing this as well.
          Hide
          Jonathan Ellis added a comment -

          Please update with formatting following http://wiki.apache.org/cassandra/CodeStyle

          Show
          Jonathan Ellis added a comment - Please update with formatting following http://wiki.apache.org/cassandra/CodeStyle
          Hide
          Karthick Sankarachary added a comment -

          Updated the formatting as requested. In addition, I revised the EmbeddedServer test class, so that it leverages CASSANDRA-1131, which just got resolved. Essentially, you can use that class to start (stop) the Cassandra Daemon of your choice (Avro or Thrift), before (after) your test case runs.

          Thanks for your patience. As always, please let me know if you need me to make any changes.

          Show
          Karthick Sankarachary added a comment - Updated the formatting as requested. In addition, I revised the EmbeddedServer test class, so that it leverages CASSANDRA-1131 , which just got resolved. Essentially, you can use that class to start (stop) the Cassandra Daemon of your choice (Avro or Thrift), before (after) your test case runs. Thanks for your patience. As always, please let me know if you need me to make any changes.
          Hide
          Jonathan Ellis added a comment -

          reformatted and committed with minor changes.

          Show
          Jonathan Ellis added a comment - reformatted and committed with minor changes.

            People

            • Assignee:
              Karthick Sankarachary
              Reporter:
              Karthick Sankarachary
            • Votes:
              2 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development