Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-815

Add AvroInputFormat and AvroOutputFormat so that hadoop can use Avro Serialization

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      MapReduce needs AvroInputFormat similar to other InputFormats like TextInputFormat to be able to use avro serialization in hadoop. Similarly AvroOutputFormat is needed.

      1. MAPREDUCE-815.5.patch
        88 kB
        Aaron Kimball
      2. MAPREDUCE-815.4.patch
        78 kB
        Aaron Kimball
      3. MAPREDUCE-815.3.patch
        75 kB
        Aaron Kimball
      4. MAPREDUCE-815.2.patch
        74 kB
        Aaron Kimball
      5. MAPREDUCE-815.patch
        74 kB
        Aaron Kimball

        Issue Links

          Activity

          Hide
          Ravi Gummadi added a comment -

          This could have something like

          public class AvroInputFormat extends
          FileInputFormat<AvroReflectSerializable, AvroReflectSerializable> {

          @Override
          public RecordReader<AvroReflectSerializable, AvroReflectSerializable>
          createRecordReader(InputSplit split,
          TaskAttemptContext context)

          { return new AvroRecordReader(); }

          //...
          }

          and

          public class AvroRecordReader extends
          RecordReader<AvroReflectSerializable, AvroReflectSerializable>

          { //implements the methods of RecordReader for KEY and VALUE of avro types }

          Does this look fine ?

          Show
          Ravi Gummadi added a comment - This could have something like public class AvroInputFormat extends FileInputFormat<AvroReflectSerializable, AvroReflectSerializable> { @Override public RecordReader<AvroReflectSerializable, AvroReflectSerializable> createRecordReader(InputSplit split, TaskAttemptContext context) { return new AvroRecordReader(); } //... } and public class AvroRecordReader extends RecordReader<AvroReflectSerializable, AvroReflectSerializable> { //implements the methods of RecordReader for KEY and VALUE of avro types } Does this look fine ?
          Hide
          Doug Cutting added a comment -

          This issue depends on the shuffle using the metadata-based API (MAPREDUCE-1126).

          Show
          Doug Cutting added a comment - This issue depends on the shuffle using the metadata-based API ( MAPREDUCE-1126 ).
          Hide
          Jacob Rideout added a comment -

          What is the current line of thought on how keys and values will interact with the schema for an avro file? Is the intention that there would be a master schema that encapsulated the key/values similar to:

          { "type" : "record",
            "fields" : [
            { "name" : "KEY", "type" : "record" },
            { "name" : "VALUE", "type" : "record" }
          ]}
          

          What about files created without this "master" schema; would the key return a null object? Byte offset in a schema of type "long" ?

          Show
          Jacob Rideout added a comment - What is the current line of thought on how keys and values will interact with the schema for an avro file? Is the intention that there would be a master schema that encapsulated the key/values similar to: { "type" : "record" , "fields" : [ { "name" : "KEY" , "type" : "record" }, { "name" : "VALUE" , "type" : "record" } ]} What about files created without this "master" schema; would the key return a null object? Byte offset in a schema of type "long" ?
          Hide
          Doug Cutting added a comment -

          > What is the current line of thought on how keys and values will interact with the schema for an avro file?

          I think, similar toTextInputFormat and TextOutputFormat, only one of key and value will be useful and the other will be null. It doesn't matter much which. If input keys have the Avro datum and values are null then the default identity mapper can be used to sort data, while if input values contain the Avro datum and keys are null then InverseMapper must be specified.

          Show
          Doug Cutting added a comment - > What is the current line of thought on how keys and values will interact with the schema for an avro file? I think, similar toTextInputFormat and TextOutputFormat, only one of key and value will be useful and the other will be null. It doesn't matter much which. If input keys have the Avro datum and values are null then the default identity mapper can be used to sort data, while if input values contain the Avro datum and keys are null then InverseMapper must be specified.
          Hide
          Jacob Rideout added a comment -

          > only one of key and value will be useful and the other will be null.
          Thanks Doug, that makes sense.

          What about the AvroOutputFormat? Does the same condition apply? I can see ignoring keys for the output of the reducer, but what about the output of a map?

          Show
          Jacob Rideout added a comment - > only one of key and value will be useful and the other will be null. Thanks Doug, that makes sense. What about the AvroOutputFormat? Does the same condition apply? I can see ignoring keys for the output of the reducer, but what about the output of a map?
          Hide
          Doug Cutting added a comment -

          > What about the AvroOutputFormat?

          I suggest we treat it similarly: require either keys or values to be null.

          An output format could combine output keys and values into a compound record, and one could define an input format that splits each input datum into a separate key and value, but I don't think the basic AvroOutputFormat should do this. If we add a MapFile-like abstraction for Avro, then its input and output formats should probably do this.

          Show
          Doug Cutting added a comment - > What about the AvroOutputFormat? I suggest we treat it similarly: require either keys or values to be null. An output format could combine output keys and values into a compound record, and one could define an input format that splits each input datum into a separate key and value, but I don't think the basic AvroOutputFormat should do this. If we add a MapFile-like abstraction for Avro, then its input and output formats should probably do this.
          Hide
          Aaron Kimball added a comment -

          Now that MAPREDUCE-1126 is in, I'm going to attack this and complete the loop.

          Given that TextInputFormat yields a semi-arbitrary key and encapsulates the file contents in the value, I plan to follow suit here – the value produced by the AvroRecordReader will contain the next object in the file.

          As for output: I think that it's best to leave the output format accepting a single value only (rather than explicitly making a hybrid of key and value pair). Users can implement their own UnionAvroOutputFormat (or whatever) if they need both, but I think the basic version should only do the most straightforward thing. I plan to make this write the user's key to the file, and drop the value. That way InverseMapper -> IdentityReducer should emit it all in sorted order.

          Show
          Aaron Kimball added a comment - Now that MAPREDUCE-1126 is in, I'm going to attack this and complete the loop. Given that TextInputFormat yields a semi-arbitrary key and encapsulates the file contents in the value, I plan to follow suit here – the value produced by the AvroRecordReader will contain the next object in the file. As for output: I think that it's best to leave the output format accepting a single value only (rather than explicitly making a hybrid of key and value pair). Users can implement their own UnionAvroOutputFormat (or whatever) if they need both, but I think the basic version should only do the most straightforward thing. I plan to make this write the user's key to the file, and drop the value. That way InverseMapper -> IdentityReducer should emit it all in sorted order.
          Hide
          Doug Cutting added a comment -

          Aaron, this sounds good. A few questions:

          • If, in the InputFormat we populated the key rather than the value, then one would not even need to specify InverseMapper: by default, MapReduce would simply partition and sort Avro data. Making values optional in both input and output seems more consistent, but does break compatibility with TextInputFormat. Thoughts?
          • In the OutputFormat, should we check if values are non-null or just drop them? Just dropping them may cause some confusion, but is probably useful in many cases, so I guess we err towards utility?
          Show
          Doug Cutting added a comment - Aaron, this sounds good. A few questions: If, in the InputFormat we populated the key rather than the value, then one would not even need to specify InverseMapper: by default, MapReduce would simply partition and sort Avro data. Making values optional in both input and output seems more consistent, but does break compatibility with TextInputFormat. Thoughts? In the OutputFormat, should we check if values are non-null or just drop them? Just dropping them may cause some confusion, but is probably useful in many cases, so I guess we err towards utility?
          Hide
          Tom White added a comment -

          If, in the InputFormat we populated the key rather than the value, then one would not even need to specify InverseMapper: by default, MapReduce would simply partition and sort Avro data.

          I like this. It is the approach I was taking on MAPREDUCE-252.

          Show
          Tom White added a comment - If, in the InputFormat we populated the key rather than the value, then one would not even need to specify InverseMapper: by default, MapReduce would simply partition and sort Avro data. I like this. It is the approach I was taking on MAPREDUCE-252 .
          Hide
          Doug Cutting added a comment -

          FWIW, the file-position-as-text-map-input-key convention came from the original Google MapReduce paper, but I don't think its ever proven useful.

          Show
          Doug Cutting added a comment - FWIW, the file-position-as-text-map-input-key convention came from the original Google MapReduce paper, but I don't think its ever proven useful.
          Hide
          Aaron Kimball added a comment -

          Doug:

          • I agree; I'll make this be the key. The value will be the byte offset.
          • My current implementation gives a log message at level WARN the first time a non-null value is received; it then ignores the value and continues operating.
          Show
          Aaron Kimball added a comment - Doug: I agree; I'll make this be the key. The value will be the byte offset. My current implementation gives a log message at level WARN the first time a non-null value is received; it then ignores the value and continues operating.
          Hide
          Aaron Kimball added a comment -

          Attaching a patch that provides AvroInputFormat/AvroOutputFormat.

          AvroInputFormat allows you to set its input schema in the job configuration. It provides static methods for this functionality. Depending on the input serialization metadata it can choose to deserialize to generic, reflect, or specific-based classes.

          This patch includes unit tests for both of these classes.

          I have also extended the jobdata API to allow you to set output serialization metadata (vs. simple class-name-only metadata) in the same fashion as MAPREDUCE-1126 allowed you to set intermediate serialization metadata. This deprecates the old methods like JobConf.setOutputKeyClass(). Note that now the PipesMapRunner/PipesReducer, MapFileOutputFormat, and SequenceFileOutputFormat rely on these deprecated APIs. MAPREDUCE-1360 will require a Hadoop-core-project JIRA that allows SequenceFile to handle non-class-based serialization; that will update at least the SequenceFile IF/OF APIs. Handling Pipes is a separate issue.

          This cannot be submitted to the patch queue until a small change is made to the Hadoop-core API (issue is linked), and Hadoop is upgraded across the board to Avro 1.3. I'll mark this patch-available when that happens.

          Show
          Aaron Kimball added a comment - Attaching a patch that provides AvroInputFormat/AvroOutputFormat. AvroInputFormat allows you to set its input schema in the job configuration. It provides static methods for this functionality. Depending on the input serialization metadata it can choose to deserialize to generic, reflect, or specific-based classes. This patch includes unit tests for both of these classes. I have also extended the jobdata API to allow you to set output serialization metadata (vs. simple class-name-only metadata) in the same fashion as MAPREDUCE-1126 allowed you to set intermediate serialization metadata. This deprecates the old methods like JobConf.setOutputKeyClass() . Note that now the PipesMapRunner/PipesReducer, MapFileOutputFormat, and SequenceFileOutputFormat rely on these deprecated APIs. MAPREDUCE-1360 will require a Hadoop-core-project JIRA that allows SequenceFile to handle non-class-based serialization; that will update at least the SequenceFile IF/OF APIs. Handling Pipes is a separate issue. This cannot be submitted to the patch queue until a small change is made to the Hadoop-core API (issue is linked), and Hadoop is upgraded across the board to Avro 1.3. I'll mark this patch-available when that happens.
          Hide
          Doug Cutting added a comment -

          This looks great! A few nits:

          • in javadoc comments, use "@deprecated use #foo()" to link to the new implementation
          • AvroSeekableStream is likely to be reused by other applications that use Avro with HDFS. it might be named AvroFSInput. it might better belong in common than in mapreduce.
          • why use LongWritable? Could we instead use java.lang.Long? Or perhaps just null for these values? Does anyone ever make use of the position? If not, let's use null. If we can avoid a dependency on Writable here that'd be good. or does this provide some important compatibility?
          • i don't think SYNC_DISTANCE is needed: DataFileWriter syncs automatically every 100k or so.
          Show
          Doug Cutting added a comment - This looks great! A few nits: in javadoc comments, use "@deprecated use #foo()" to link to the new implementation AvroSeekableStream is likely to be reused by other applications that use Avro with HDFS. it might be named AvroFSInput. it might better belong in common than in mapreduce. why use LongWritable? Could we instead use java.lang.Long? Or perhaps just null for these values? Does anyone ever make use of the position? If not, let's use null. If we can avoid a dependency on Writable here that'd be good. or does this provide some important compatibility? i don't think SYNC_DISTANCE is needed: DataFileWriter syncs automatically every 100k or so.
          Hide
          Aaron Kimball added a comment -

          The only reason I could think of to use the position would be building some sort of index over an avro file. I think this probably doesn't make much sense here. That having been said, we can't use null or we'll break the identity mapper. (The MapOutputBuffer expects non-null keys and values only. A context.write(k, null) from the mapper will throw NullPointerException.)

          This is why writables included NullWritable, I think. We could add a type e.g. "Empty" which implements AvroReflectSerializable and whose toString method returns the empty string; this would work fairly transparently I think and be entirely avro-based.

          Show
          Aaron Kimball added a comment - The only reason I could think of to use the position would be building some sort of index over an avro file. I think this probably doesn't make much sense here. That having been said, we can't use null or we'll break the identity mapper. (The MapOutputBuffer expects non-null keys and values only. A context.write(k, null) from the mapper will throw NullPointerException.) This is why writables included NullWritable, I think. We could add a type e.g. "Empty" which implements AvroReflectSerializable and whose toString method returns the empty string; this would work fairly transparently I think and be entirely avro-based.
          Hide
          Doug Cutting added a comment -

          > That having been said, we can't use null or we'll break the identity mapper.

          It seems to me that we should be able to pass null end-to-end as a value. If we can't, then we perhaps haven't yet removed all of the Writable assumptions, no?

          Show
          Doug Cutting added a comment - > That having been said, we can't use null or we'll break the identity mapper. It seems to me that we should be able to pass null end-to-end as a value. If we can't, then we perhaps haven't yet removed all of the Writable assumptions, no?
          Hide
          Aaron Kimball added a comment -

          New patch per code-review.

          • HADOOP-6492 has been updated with org.apache.hadoop.fs.AvroFSInput; AvroRecordReader now makes use of this.
          • AvroInputFormat/AvroRecordReader now always returns a null value (while Writables can't handle nulls, I've confirmed that Avro can accept a null schema for a value and this works in-shuffle)
          • AvroRecordReader now traps java.util.NoSuchElementException in nextKeyValue() to gracefully handle EOF
          • Tests now include a test which does not set an explicit schema, but instead uses the Generic interface to read it from the file
          • javadoc comments updated
          • SYNC_DISTANCE removed.
          Show
          Aaron Kimball added a comment - New patch per code-review. HADOOP-6492 has been updated with org.apache.hadoop.fs.AvroFSInput ; AvroRecordReader now makes use of this. AvroInputFormat/AvroRecordReader now always returns a null value (while Writables can't handle nulls, I've confirmed that Avro can accept a null schema for a value and this works in-shuffle) AvroRecordReader now traps java.util.NoSuchElementException in nextKeyValue() to gracefully handle EOF Tests now include a test which does not set an explicit schema, but instead uses the Generic interface to read it from the file javadoc comments updated SYNC_DISTANCE removed.
          Hide
          Doug Cutting added a comment -
          • those javadoc @deprecated lines don't look right. did you run javadoc & look at the output?
          • AvroInputFormat's value field isn't used and could be removed.
          • should we add an end-to-end test somewhere that runs an mr job with avro input, avro comparison, and avro output? new issue? localrunner would probably suffice.
          Show
          Doug Cutting added a comment - those javadoc @deprecated lines don't look right. did you run javadoc & look at the output? AvroInputFormat's value field isn't used and could be removed. should we add an end-to-end test somewhere that runs an mr job with avro input, avro comparison, and avro output? new issue? localrunner would probably suffice.
          Hide
          Aaron Kimball added a comment -

          New patch; fixed javadoc. Also includes an avro end-to-end test in TestAvroInputFormat.

          Show
          Aaron Kimball added a comment - New patch; fixed javadoc. Also includes an avro end-to-end test in TestAvroInputFormat.
          Hide
          Aaron Kimball added a comment -

          end-to-end test was accidentally excluded from patch #4. New patch includes this.

          Show
          Aaron Kimball added a comment - end-to-end test was accidentally excluded from patch #4. New patch includes this.
          Hide
          Doug Cutting added a comment -

          In the end-to-end-test validation, you don't need seekable and can read the entire file with something like:

          reader = new DataFileStream<Integer>(istream, datumReader);
          for (int value : reader) { ... }
          

          This will save a number of lines and provide a better example of non-split Avro file usage.

          Show
          Doug Cutting added a comment - In the end-to-end-test validation, you don't need seekable and can read the entire file with something like: reader = new DataFileStream< Integer >(istream, datumReader); for ( int value : reader) { ... } This will save a number of lines and provide a better example of non-split Avro file usage.
          Hide
          Aaron Kimball added a comment -

          Now uses DataStreamReader

          Show
          Aaron Kimball added a comment - Now uses DataStreamReader
          Hide
          Chris Douglas added a comment -

          It looks like some changes to JobTracker were accidentally included in the latest patch

          Show
          Chris Douglas added a comment - It looks like some changes to JobTracker were accidentally included in the latest patch
          Hide
          Doug Cutting added a comment -

          I'd like to close this as redundant with AVRO-493. This will be included in the upcoming 1.4.0 release of Avro which will support mapreduce over Avro data files using Hadoop 0.20 or greater. Objections?

          Show
          Doug Cutting added a comment - I'd like to close this as redundant with AVRO-493 . This will be included in the upcoming 1.4.0 release of Avro which will support mapreduce over Avro data files using Hadoop 0.20 or greater. Objections?
          Hide
          Doug Cutting added a comment -

          Marking this a duplicate of AVRO-493.

          Show
          Doug Cutting added a comment - Marking this a duplicate of AVRO-493 .
          Hide
          Doug Cutting added a comment -

          Closing this as a duplicate. This can be re-opened if someone objects.

          Show
          Doug Cutting added a comment - Closing this as a duplicate. This can be re-opened if someone objects.

            People

            • Assignee:
              Aaron Kimball
              Reporter:
              Ravi Gummadi
            • Votes:
              2 Vote for this issue
              Watchers:
              28 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development