Cassandra
  1. Cassandra
  2. CASSANDRA-1497

Add input support for Hadoop Streaming

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Fix Version/s: None
    • Component/s: Hadoop
    • Labels:
      None

      Description

      related to CASSANDRA-1368 - create similar functionality for input streaming.

        Issue Links

          Activity

          Hide
          Brandyn White added a comment -

          Made a new ticket here CASSANDRA-3134

          Show
          Brandyn White added a comment - Made a new ticket here CASSANDRA-3134
          Hide
          Jonathan Ellis added a comment -

          I'd recommend creating a new ticket for this since it's a completely different approach than the old one.

          I'm not familar with TypedBytes, but now that Cassandra's AbstractBytes have to/from string support (getString/fromString), that would probably be the natural way to go for us.

          Show
          Jonathan Ellis added a comment - I'd recommend creating a new ticket for this since it's a completely different approach than the old one. I'm not familar with TypedBytes, but now that Cassandra's AbstractBytes have to/from string support (getString/fromString), that would probably be the natural way to go for us.
          Hide
          Brandyn White added a comment - - edited

          Good point. It'll be easier to update the Cassandra Hadoop API to support the old-style Hadoop interface. After that we can add in the Cassandra IO and command line switches with a small patch.

          Show
          Brandyn White added a comment - - edited Good point. It'll be easier to update the Cassandra Hadoop API to support the old-style Hadoop interface. After that we can add in the Cassandra IO and command line switches with a small patch.
          Hide
          Jeremy Hanna added a comment -

          I don't think anyone was ever particularly against allowing hadoop streaming functionality. I think there just wasn't the interest for a while. On the input side, it will also require CASSANDRA-2799 which should be trivial.

          Show
          Jeremy Hanna added a comment - I don't think anyone was ever particularly against allowing hadoop streaming functionality. I think there just wasn't the interest for a while. On the input side, it will also require CASSANDRA-2799 which should be trivial.
          Hide
          Brandyn White added a comment -

          I am certainly interested in streaming and have talked to others that are. I'm the author of the Hadoopy http://bwhite.github.com/hadoopy/ python library and I'm interested in taking another stab at streaming support. Hadoopy and Dumbo both use the TypedBytes format that is in CDH for communication with the streaming jar. A simple way to get this to work is modify the streaming code (make hadoop-cassandra-streaming.jar) so that it uses the same TypedBytes communication with streaming programs, but the actual job IO is using the Cassandra IO. The user would have the exact same streaming interface, but the user would specify the keyspace, etc using environmental variables.

          The benefits of this are
          1. Easy implementation: Take the cloudera-patched version of streaming and change the IO, add environmental variable reading.
          2. Only Client side: As the streaming jar is included in the job, no server side changes are required.
          3. Simple maintenance: If the Hadoop Cassandra interface changes, then this would require the same simple fixup as any other Hadoop job.
          4. The TypedBytes format supports all of the necessary Cassandara types (https://issues.apache.org/jira/browse/HADOOP-5450)
          5. Compatible with existing streaming libraries: Hadoopy and dumbo would only need to know the path of this new streaming jar
          6. No need for avro

          The negatives of this are
          1. Duplicative code: This would be a dupe and patch of the streaming jar. This can be stored itself as a patch.
          2. I'd have to check but this solution should work on a stock hadoop (cluster side) but it requires TypedBytes (client side) which can be included in the jar.

          I can code this up but I wanted to get some feedback from the community first.

          Show
          Brandyn White added a comment - I am certainly interested in streaming and have talked to others that are. I'm the author of the Hadoopy http://bwhite.github.com/hadoopy/ python library and I'm interested in taking another stab at streaming support. Hadoopy and Dumbo both use the TypedBytes format that is in CDH for communication with the streaming jar. A simple way to get this to work is modify the streaming code (make hadoop-cassandra-streaming.jar) so that it uses the same TypedBytes communication with streaming programs, but the actual job IO is using the Cassandra IO. The user would have the exact same streaming interface, but the user would specify the keyspace, etc using environmental variables. The benefits of this are 1. Easy implementation: Take the cloudera-patched version of streaming and change the IO, add environmental variable reading. 2. Only Client side: As the streaming jar is included in the job, no server side changes are required. 3. Simple maintenance: If the Hadoop Cassandra interface changes, then this would require the same simple fixup as any other Hadoop job. 4. The TypedBytes format supports all of the necessary Cassandara types ( https://issues.apache.org/jira/browse/HADOOP-5450 ) 5. Compatible with existing streaming libraries: Hadoopy and dumbo would only need to know the path of this new streaming jar 6. No need for avro The negatives of this are 1. Duplicative code: This would be a dupe and patch of the streaming jar. This can be stored itself as a patch. 2. I'd have to check but this solution should work on a stock hadoop (cluster side) but it requires TypedBytes (client side) which can be included in the jar. I can code this up but I wanted to get some feedback from the community first.
          Hide
          Jeremy Hanna added a comment -

          It turns out that currently there's not much interest in streaming. The way to go is something like pig or hive if Java is not an option.

          Show
          Jeremy Hanna added a comment - It turns out that currently there's not much interest in streaming. The way to go is something like pig or hive if Java is not an option.
          Hide
          Jeremy Hanna added a comment - - edited

          The patch that I had submitted moves everything about the RecordReader into an abstract class except the actual marshalling/unmarshalling of the data. So it could be used to build a typed json impl. It would also make it so people could make their own serialization mechanisms - say for Dumbo, a python way to do hadoop MR.

          Show
          Jeremy Hanna added a comment - - edited The patch that I had submitted moves everything about the RecordReader into an abstract class except the actual marshalling/unmarshalling of the data. So it could be used to build a typed json impl. It would also make it so people could make their own serialization mechanisms - say for Dumbo, a python way to do hadoop MR.
          Hide
          Jonathan Ellis added a comment -

          The main reason for going with a binary format initially instead of plaintext was that, at the time, we had limited information about
          column types.

          Now that we have column_metadata we've been moving towards more human-readable formats (e.g. https://issues.apache.org/jira/browse/CASSANDRA-1933). Key types are newer (https://issues.apache.org/jira/browse/CASSANDRA-2311) but the same principle applies.

          Sure feels like (typed) json would be a better fit today than Avro or Thrift.

          Show
          Jonathan Ellis added a comment - The main reason for going with a binary format initially instead of plaintext was that, at the time, we had limited information about column types. Now that we have column_metadata we've been moving towards more human-readable formats (e.g. https://issues.apache.org/jira/browse/CASSANDRA-1933 ). Key types are newer ( https://issues.apache.org/jira/browse/CASSANDRA-2311 ) but the same principle applies. Sure feels like (typed) json would be a better fit today than Avro or Thrift.
          Hide
          Stu Hood added a comment - - edited

          contrib/hadoop_streaming_input/bin/mapper.py

          • Mentions the original source multiple times, and claims to be both a mapper and reducer
          • I suspect that extract_text can be turned into a one-liner somehow

          contrib/hadoop_streaming_input/bin/reducer.py

          • Needs an Apache header

          contrib/hadoop_streaming_input/[input/]README.txt

          • Mentions "-input": bin/streaming should fake the input, and explain why
          • There is an extra copy of README.txt in an unused 'input' subdirectory

          .../hadoop/ColumnFamilyRecordReader.java

          • Indentation

          .../hadoop/streaming/AvroResolver.java

          • Updated javadoc

          I looked a little bit into the immediate runtime failure, but didn't come to any conclusions. One suspicious aspect is that Streaming appears to use the result of Resolver.getInputWriterClass to write to both the mapper and reducer scripts: see http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=markup#l783

          Show
          Stu Hood added a comment - - edited contrib/hadoop_streaming_input/bin/mapper.py Mentions the original source multiple times, and claims to be both a mapper and reducer I suspect that extract_text can be turned into a one-liner somehow contrib/hadoop_streaming_input/bin/reducer.py Needs an Apache header contrib/hadoop_streaming_input/ [input/] README.txt Mentions "-input": bin/streaming should fake the input, and explain why There is an extra copy of README.txt in an unused 'input' subdirectory .../hadoop/ColumnFamilyRecordReader.java Indentation .../hadoop/streaming/AvroResolver.java Updated javadoc I looked a little bit into the immediate runtime failure, but didn't come to any conclusions. One suspicious aspect is that Streaming appears to use the result of Resolver.getInputWriterClass to write to both the mapper and reducer scripts: see http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=markup#l783
          Hide
          Jeremy Hanna added a comment -

          stu - btw, the change in your contrib/hadoop_streaming_output/bin/streaming in the patch is wrong. that was for when I had a AvroColumnFamilyRecordReader. also you just need to make sure hadoop's bin (0.20.2+320) is in your PATH.

          Show
          Jeremy Hanna added a comment - stu - btw, the change in your contrib/hadoop_streaming_output/bin/streaming in the patch is wrong. that was for when I had a AvroColumnFamilyRecordReader. also you just need to make sure hadoop's bin (0.20.2+320) is in your PATH.
          Hide
          Stu Hood added a comment -

          I'll take a look tonight.

          Show
          Stu Hood added a comment - I'll take a look tonight.
          Hide
          Jeremy Hanna added a comment -

          Updated to use avro. Despite there being no non-string based map in avro, I just used a key/value pair list for the input values. So using a binary search (since the column names are ordered) is only O(log n) to get each column out of the results. If they use a slice predicate too, it shouldn't be bad at all.

          The current patch is mostly done - some errors with the streaming script. Also docs need updating and python needs to use bisect or something similar to do the binary search over the list of columns. Currently it just iterates and searches.

          Show
          Jeremy Hanna added a comment - Updated to use avro. Despite there being no non-string based map in avro, I just used a key/value pair list for the input values. So using a binary search (since the column names are ordered) is only O(log n) to get each column out of the results. If they use a slice predicate too, it shouldn't be bad at all. The current patch is mostly done - some errors with the streaming script. Also docs need updating and python needs to use bisect or something similar to do the binary search over the list of columns. Currently it just iterates and searches.
          Hide
          Jeremy Hanna added a comment -

          based on a conversation with jonathan - it sounds like he doesn't want several different implementations nor does he want a mixture of avro for output and thrift for input.

          For the long term I've logged this ticket with avro - https://issues.apache.org/jira/browse/AVRO-680

          For the short term, we could probably get away with avro - having a list of columns (k/v pairs) that the client could scan through or convert to a map on their own.

          Show
          Jeremy Hanna added a comment - based on a conversation with jonathan - it sounds like he doesn't want several different implementations nor does he want a mixture of avro for output and thrift for input. For the long term I've logged this ticket with avro - https://issues.apache.org/jira/browse/AVRO-680 For the short term, we could probably get away with avro - having a list of columns (k/v pairs) that the client could scan through or convert to a map on their own.
          Hide
          Jeremy Hanna added a comment - - edited

          In addition to the previous comment, I've also left the original ColumnFamilyRecordReader and ColumnFamilyInputFormat undeprecated. So there will be 3 extensions of the abstract - CFRR/CFIF, TCFRR/TCFIF, and ACFRR/ACFIF. The only code that is in the extension classes has to do with data type marshalling, so it's not duplicated code.

          Show
          Jeremy Hanna added a comment - - edited In addition to the previous comment, I've also left the original ColumnFamilyRecordReader and ColumnFamilyInputFormat undeprecated. So there will be 3 extensions of the abstract - CFRR/CFIF, TCFRR/TCFIF, and ACFRR/ACFIF. The only code that is in the extension classes has to do with data type marshalling, so it's not duplicated code.
          Hide
          Jeremy Hanna added a comment -

          Just as a status update. I ran into some issues with using Avro as an input format for now - avro maps currently require strings as their keys. There is talk about facilitating other types of keys in the future - http://search-hadoop.com/m/b2Wqk1HvNbT1/+%2522type+array%2522&subj=Re+Using+bytes+as+keys+in+a+map+equivalent - and I should probably create a ticket on that.

          However for now, it looks like I'll need to go with thrift for the input streaming solution. So far I've created an AbstractColumnFamilyRecordReader and AbstractColumnFamilyInputFormat that pulls out a lot of the code, and then created thrift and avro specific extensions of each. I've created a thrift input writer, required for input streaming. I've also created an initial mapper.py that uses the thrift deserialization.

          Anyway, so that's where things stand. I'm getting the kinks out and will hopefully have patches soon. I'll probably create a separate ticket for a thrift based output format for sake of completeness. The avro specific CFRR and CFIF will just be there and functional for when the avro stuff becomes available - already had it done and it works in Java.

          Show
          Jeremy Hanna added a comment - Just as a status update. I ran into some issues with using Avro as an input format for now - avro maps currently require strings as their keys. There is talk about facilitating other types of keys in the future - http://search-hadoop.com/m/b2Wqk1HvNbT1/+%2522type+array%2522&subj=Re+Using+bytes+as+keys+in+a+map+equivalent - and I should probably create a ticket on that. However for now, it looks like I'll need to go with thrift for the input streaming solution. So far I've created an AbstractColumnFamilyRecordReader and AbstractColumnFamilyInputFormat that pulls out a lot of the code, and then created thrift and avro specific extensions of each. I've created a thrift input writer, required for input streaming. I've also created an initial mapper.py that uses the thrift deserialization. Anyway, so that's where things stand. I'm getting the kinks out and will hopefully have patches soon. I'll probably create a separate ticket for a thrift based output format for sake of completeness. The avro specific CFRR and CFIF will just be there and functional for when the avro stuff becomes available - already had it done and it works in Java.
          Hide
          Jeremy Hanna added a comment - - edited

          No - I'm still getting the o.a.c.hadoop.streaming.AvroInputWriter done. That may change this patch slightly as it turns out. The foundation patch won't change too much though. I'm hoping to get the complete patch finished today though.

          Show
          Jeremy Hanna added a comment - - edited No - I'm still getting the o.a.c.hadoop.streaming.AvroInputWriter done. That may change this patch slightly as it turns out. The foundation patch won't change too much though. I'm hoping to get the complete patch finished today though.
          Hide
          Jonathan Ellis added a comment -

          is this ready for review?

          Show
          Jonathan Ellis added a comment - is this ready for review?
          Hide
          Jeremy Hanna added a comment -

          For the foundation, added an AbstractColumnFamilyRecordReader with most of the functionality. Deprecated the way we have been doing CFRR to LegacyCFRR and CFIF to LegacyCFIF. The new way paves the way for enabling streaming.

          I had thought to move the current CFRR/CFIF to LegacyX so that people using the forward-looking stuff wouldn't have to change it once the deprecated stuff goes away. Those upgrading to 0.7 would have to modify their class they're using though.

          Show
          Jeremy Hanna added a comment - For the foundation, added an AbstractColumnFamilyRecordReader with most of the functionality. Deprecated the way we have been doing CFRR to LegacyCFRR and CFIF to LegacyCFIF. The new way paves the way for enabling streaming. I had thought to move the current CFRR/CFIF to LegacyX so that people using the forward-looking stuff wouldn't have to change it once the deprecated stuff goes away. Those upgrading to 0.7 would have to modify their class they're using though.
          Hide
          Stu Hood added a comment -
          • CFIF should be able to delegate almost everything to a private LegacyCFIF: no point in duplicating things
          • Other parts of the code use com.google.common.base.Charsets.UTF_8 as our reference to the UTF8 Charset
          • Pig contrib should use LegacyCFIF until we can get around to changing it

          Thanks!

          Show
          Stu Hood added a comment - CFIF should be able to delegate almost everything to a private LegacyCFIF: no point in duplicating things Other parts of the code use com.google.common.base.Charsets.UTF_8 as our reference to the UTF8 Charset Pig contrib should use LegacyCFIF until we can get around to changing it Thanks!
          Hide
          Jeremy Hanna added a comment -

          Attaching 0001:

          Deprecates the old way of doing ColumnFamilyRecordReader and ColumnFamilyInputFormat - renames them to LegacyX. Reused code as I could (in the InputFormat).

          Adds the new ColumnFamilyRecordReader and ColumnFamilyInputReader that uses ByteBuffers and Avro ColumnOrSuperColumns instead of byte[] and db.IColumn. This makes for easier Hadoop Streaming support and mirrors what was done in the RecordWriter/OutputFormat that we recently added.

          Updated the WordCount/WordCountSetup to use the newer code.

          Show
          Jeremy Hanna added a comment - Attaching 0001: Deprecates the old way of doing ColumnFamilyRecordReader and ColumnFamilyInputFormat - renames them to LegacyX. Reused code as I could (in the InputFormat). Adds the new ColumnFamilyRecordReader and ColumnFamilyInputReader that uses ByteBuffers and Avro ColumnOrSuperColumns instead of byte[] and db.IColumn. This makes for easier Hadoop Streaming support and mirrors what was done in the RecordWriter/OutputFormat that we recently added. Updated the WordCount/WordCountSetup to use the newer code.

            People

            • Assignee:
              Unassigned
              Reporter:
              Jeremy Hanna
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development