Avro
  1. Avro
  2. AVRO-493

hadoop mapreduce support for avro data

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: java
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Avro should provide support for using Hadoop MapReduce over Avro data files.

      1. AVRO-493.patch
        46 kB
        Doug Cutting
      2. AVRO-493.patch
        45 kB
        Doug Cutting

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          Here's a patch that implements this.

          Show
          Doug Cutting added a comment - Here's a patch that implements this.
          Hide
          Scott Carey added a comment -

          AvroWrapper.java:

            public boolean equals(Object o) {
              return (datum == null) ? o == null : datum.equals(o);
            }

          The above looks odd. Is equals() expected to compare the AvroWrapper to another AvroWrapper or only to the datum? If the former, then o needs to be cast to AvroWrapper and o.datum used. If not, then equals() does not adhere to the equals() contract and should be well documented. (a.equals(b) is not always the same result as b.equals(a))

          Eclipse's auto-generate equals() tool generates this code for equals():

            public boolean equals(Object obj) {
              if (this == obj)
                return true;
              if (obj == null)
                return false;
              if (getClass() != obj.getClass())
                return false;
              AvroWrapper other = (AvroWrapper) obj;
              if (datum == null) {
                if (other.datum != null)
                  return false;
              } else if (!datum.equals(other.datum))
                return false;
              return true;
            }
          

          AvroOutputFormat:
          It would be nice if the compression level was configurable. If not, I lean towards level 1 since it is closest to lzo in speed and still has good compression. But I suppose intermediate outputs versus final outputs have different requirements here.

          AvroMapper:
          This creates a new AvroWrapper for each output.collect(). Is this necessary? AvroReducer does not do this, but it doesn't have the combiner behind it. If it must be a new instance, a comment might be appropriate to avoid an improper optimization attempt later.

          AvroKeySerialization:
          I am a bit confused about this class. It is only referenced in the configuration of AvroJob.
          When is this used in the big picture? It appears that the Serialization interface contract requires no buffering (a performance issue if heavily used, for all serializations). The decoder here doesn't have access to the avro file and can't use the file format's reader or writer – so what is it for? The javadoc on the Hadoop Serialization API doesn't have any information on the purpose of the interfaces. Is it for serializing data to/from SequenceFiles and other such use cases?

          General:
          Deprecated APIs are used – are the replacements not appropriate or insufficient? I'm no expert on the details of the new API.

          Show
          Scott Carey added a comment - AvroWrapper.java: public boolean equals( Object o) { return (datum == null ) ? o == null : datum.equals(o); } The above looks odd. Is equals() expected to compare the AvroWrapper to another AvroWrapper or only to the datum? If the former, then o needs to be cast to AvroWrapper and o.datum used. If not, then equals() does not adhere to the equals() contract and should be well documented. (a.equals(b) is not always the same result as b.equals(a)) Eclipse's auto-generate equals() tool generates this code for equals(): public boolean equals( Object obj) { if ( this == obj) return true ; if (obj == null ) return false ; if (getClass() != obj.getClass()) return false ; AvroWrapper other = (AvroWrapper) obj; if (datum == null ) { if (other.datum != null ) return false ; } else if (!datum.equals(other.datum)) return false ; return true ; } AvroOutputFormat: It would be nice if the compression level was configurable. If not, I lean towards level 1 since it is closest to lzo in speed and still has good compression. But I suppose intermediate outputs versus final outputs have different requirements here. AvroMapper: This creates a new AvroWrapper for each output.collect(). Is this necessary? AvroReducer does not do this, but it doesn't have the combiner behind it. If it must be a new instance, a comment might be appropriate to avoid an improper optimization attempt later. AvroKeySerialization: I am a bit confused about this class. It is only referenced in the configuration of AvroJob. When is this used in the big picture? It appears that the Serialization interface contract requires no buffering (a performance issue if heavily used, for all serializations). The decoder here doesn't have access to the avro file and can't use the file format's reader or writer – so what is it for? The javadoc on the Hadoop Serialization API doesn't have any information on the purpose of the interfaces. Is it for serializing data to/from SequenceFiles and other such use cases? General: Deprecated APIs are used – are the replacements not appropriate or insufficient? I'm no expert on the details of the new API.
          Hide
          Doug Cutting added a comment -

          Scott, thanks for the careful review!

          > The above looks odd.

          Yes, you're right, it was a buggy equals implementation. I replaced it with the one you provided. hashCode() is required to support hash-based MapReduce partitioning (the default) and I only provide an equals implementation to be consistent: it's not otherwise required here. Good catch.

          > It would be nice if the compression level was configurable.

          Yes, I meant to get to that but forgot. I've now added it. Thanks.

          > This creates a new AvroWrapper for each output.collect().

          Oops. I originally wrote it that way, but reverted it while debugging to remove a possibility but forgot to restore it. I've now restored it.

          > AvroKeySerialization: I am a bit confused about this class.

          It's used to serialize map outputs and deserialize reduce inputs. The mapreduce framework uses the job's specified map output key class to find the serialization implementation it uses to read and write intermediate keys and values.

          > Deprecated APIs are used - are the replacements not appropriate or insufficient?

          Good question. Hadoop 0.20 deprecated the "old" org.apache.hadoop.mapred APIs to encourage folks to try the new org.apache.hadoop.mapreduce APIs. However the org.apache.hadoop.mapreduce APIs are not fully functional in 0.20, and folks primarily continue to use the org.apache.hadoop.mapred APIs. 0.20 is used here since it's in Maven repos, but this code should also work against 0.19 and perhaps even 0.18, and I'd compile against one of those instead if it were in a Maven repo.

          Show
          Doug Cutting added a comment - Scott, thanks for the careful review! > The above looks odd. Yes, you're right, it was a buggy equals implementation. I replaced it with the one you provided. hashCode() is required to support hash-based MapReduce partitioning (the default) and I only provide an equals implementation to be consistent: it's not otherwise required here. Good catch. > It would be nice if the compression level was configurable. Yes, I meant to get to that but forgot. I've now added it. Thanks. > This creates a new AvroWrapper for each output.collect(). Oops. I originally wrote it that way, but reverted it while debugging to remove a possibility but forgot to restore it. I've now restored it. > AvroKeySerialization: I am a bit confused about this class. It's used to serialize map outputs and deserialize reduce inputs. The mapreduce framework uses the job's specified map output key class to find the serialization implementation it uses to read and write intermediate keys and values. > Deprecated APIs are used - are the replacements not appropriate or insufficient? Good question. Hadoop 0.20 deprecated the "old" org.apache.hadoop.mapred APIs to encourage folks to try the new org.apache.hadoop.mapreduce APIs. However the org.apache.hadoop.mapreduce APIs are not fully functional in 0.20, and folks primarily continue to use the org.apache.hadoop.mapred APIs. 0.20 is used here since it's in Maven repos, but this code should also work against 0.19 and perhaps even 0.18, and I'd compile against one of those instead if it were in a Maven repo.
          Hide
          Scott Carey added a comment -

          JIRA's "You can reply to this email to add a comment to the issue online." Doesn't appear to work via the apache mail lists, so I have put the email exchange in the quote below:

          Scott Carey wrote:
          > Thats too bad that the intermediate files can't use the avro file format, the performance will suffer until that API changes to either allow custom file formats or to support a >feature like the decoder's inputStream() method to allow buffering of chained or interleaved readers.

          The intermediate files are part of the mapreduce kernel. The buffering,
          sorting, transmission and merging of this data is a critical part of
          mapreduce. So I don't think it is as simple as just permitting a
          pluggable file format.

          > FYI, Avro does not work with Hadoop 0.20 for CDH2 or CDH3 (I have not tried plain 0.20) because they include jackson 1.0.1 and you'll get an exception like this:

          Can't one update the version of Jackson in one's Hadoop cluster to fix
          this? However that might not work with Amazon's Electric MapReduce,
          where you don't get to update the cluster (which runs Hadoop 0.18).

          Should we avoid using org.codehaus.jackson.JsonFactory.enable() to make
          Avro compatible with older versions of Jackson?

          Doug

          Jackson is in Hadoop due to HADOOP-6184 ("Provide a configuration dump in JSON format").
          In my case, I just removed the jar completely from Hadoop because I don't use that feature. We could make sure our use of the Jackson API is 1.0.1 compatible, but at some point we probably will require the newer version. There might be bugs in that version that affect Avro, and it will be troublesome if 1.0.1 is silently used and causes bugs or other issues.

          In the short term we could run our unit tests with 1.0.1 and stop using enable() and anything else that we are using that is not 1.0.1 compatible.
          We can even change Maven to be a range of supported versions ( example, version [1.0.1-2.x) is 1.0.1 inclusive to 2.x exclusive).

          In the long run Hadoop needs to keep its libraries more up to date given its classloader status, and/or implement some classloader partitioning to prevent hadoop system and user code class conflicts, especially due to small features like HADOOP-6184.

          Show
          Scott Carey added a comment - JIRA's "You can reply to this email to add a comment to the issue online." Doesn't appear to work via the apache mail lists, so I have put the email exchange in the quote below: Scott Carey wrote: > Thats too bad that the intermediate files can't use the avro file format, the performance will suffer until that API changes to either allow custom file formats or to support a >feature like the decoder's inputStream() method to allow buffering of chained or interleaved readers. The intermediate files are part of the mapreduce kernel. The buffering, sorting, transmission and merging of this data is a critical part of mapreduce. So I don't think it is as simple as just permitting a pluggable file format. > FYI, Avro does not work with Hadoop 0.20 for CDH2 or CDH3 (I have not tried plain 0.20) because they include jackson 1.0.1 and you'll get an exception like this: Can't one update the version of Jackson in one's Hadoop cluster to fix this? However that might not work with Amazon's Electric MapReduce, where you don't get to update the cluster (which runs Hadoop 0.18). Should we avoid using org.codehaus.jackson.JsonFactory.enable() to make Avro compatible with older versions of Jackson? Doug Jackson is in Hadoop due to HADOOP-6184 ("Provide a configuration dump in JSON format"). In my case, I just removed the jar completely from Hadoop because I don't use that feature. We could make sure our use of the Jackson API is 1.0.1 compatible, but at some point we probably will require the newer version. There might be bugs in that version that affect Avro, and it will be troublesome if 1.0.1 is silently used and causes bugs or other issues. In the short term we could run our unit tests with 1.0.1 and stop using enable() and anything else that we are using that is not 1.0.1 compatible. We can even change Maven to be a range of supported versions ( example, version [1.0.1-2.x) is 1.0.1 inclusive to 2.x exclusive). In the long run Hadoop needs to keep its libraries more up to date given its classloader status, and/or implement some classloader partitioning to prevent hadoop system and user code class conflicts, especially due to small features like HADOOP-6184 .
          Hide
          Doug Cutting added a comment -

          > In the short term we could run our unit tests with 1.0.1 [ ...]

          That sounds reasonable, but should be a separate issue, no?

          Did my answers & patch modifications otherwise satisfy you?

          Show
          Doug Cutting added a comment - > In the short term we could run our unit tests with 1.0.1 [ ...] That sounds reasonable, but should be a separate issue, no? Did my answers & patch modifications otherwise satisfy you?
          Hide
          Scott Carey added a comment -

          Did my answers & patch modifications otherwise satisfy you?

          Yes, the patch looks good.

          The other items are potential issues that can't be addressed in the patch content itself.

          Show
          Scott Carey added a comment - Did my answers & patch modifications otherwise satisfy you? Yes, the patch looks good. The other items are potential issues that can't be addressed in the patch content itself.
          Hide
          Doug Cutting added a comment -

          I just committed this.

          Show
          Doug Cutting added a comment - I just committed this.
          Hide
          Iván de Prado added a comment -

          Writting a custom DeserializerCompartor is needed if you want this patch to be useful in many developments. Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer. I'm failing to create a custom DeserializerComparator:

            public static class CustomComparator extends DeserializerComparator<AvroWrapper<GenericRecord>> {
          
          	public CustomComparator() throws IOException {
          		super(new AvroKeySerialization().getDeserializer(AvroWrapper.class));
          	}
          
          	@Override
          	public int compare(AvroWrapper<GenericRecord> o1, AvroWrapper<GenericRecord> o2) {
          		
          		return o1.datum().get("word").toString().charAt(1)-o2.datum().get("word").toString().charAt(1);
          	}
            }
           

          It raises the following exception:

          Caused by: java.lang.NullPointerException
          	at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:98)
          	at org.apache.avro.mapred.AvroKeySerialization.getDeserializer(AvroKeySerialization.java:55)
                  ....
          

          The problem is in that line:

              Schema schema = AvroJob.getMapOutputSchema(getConf());
          

          It is looking for the datum schema at the job configuration but unsurprisingly it is not there.

          Any ideas or workarrounds for creating custom Comparators for Avro?

          Show
          Iván de Prado added a comment - Writting a custom DeserializerCompartor is needed if you want this patch to be useful in many developments. Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer. I'm failing to create a custom DeserializerComparator: public static class CustomComparator extends DeserializerComparator<AvroWrapper<GenericRecord>> { public CustomComparator() throws IOException { super ( new AvroKeySerialization().getDeserializer(AvroWrapper.class)); } @Override public int compare(AvroWrapper<GenericRecord> o1, AvroWrapper<GenericRecord> o2) { return o1.datum().get( "word" ).toString().charAt(1)-o2.datum().get( "word" ).toString().charAt(1); } } It raises the following exception: Caused by: java.lang.NullPointerException at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:98) at org.apache.avro.mapred.AvroKeySerialization.getDeserializer(AvroKeySerialization.java:55) .... The problem is in that line: Schema schema = AvroJob.getMapOutputSchema(getConf()); It is looking for the datum schema at the job configuration but unsurprisingly it is not there. Any ideas or workarrounds for creating custom Comparators for Avro?
          Hide
          Harsh J added a comment -

          Have you tried setting a Map-Output Schema in your job driver manually?

          AvroJob.setMapOutputSchema(...);
          
          Show
          Harsh J added a comment - Have you tried setting a Map-Output Schema in your job driver manually? AvroJob.setMapOutputSchema(...);
          Hide
          Iván de Prado added a comment -

          Yes, I've tried, but doesn't work. I was able to dodge this issue by creating my own AvroWrapperDeserializer class that receives the schema in the constructor, but doesn't seem too elegant.

          In any case, I found a bigger issue: seems imposible to use your custom group comparator with the current AvroReducer implementation because the reducer receives the datum as the key. From the AvroReducer:

           public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore,
                               OutputCollector<AvroWrapper<OUT>,NullWritable> output, 
                               Reporter reporter) throws IOException {
              if (this.out == null) {
                this.out = output;
                this.reporter = reporter;
              }
              reduce(wrapper.datum());
            }
          

          If you use your own group comparator, the child reducer will only receive the first datum of each group. Any ideas about how to solve that?

          Show
          Iván de Prado added a comment - Yes, I've tried, but doesn't work. I was able to dodge this issue by creating my own AvroWrapperDeserializer class that receives the schema in the constructor, but doesn't seem too elegant. In any case, I found a bigger issue: seems imposible to use your custom group comparator with the current AvroReducer implementation because the reducer receives the datum as the key. From the AvroReducer: public void reduce(AvroWrapper<IN> wrapper, Iterator<NullWritable> ignore, OutputCollector<AvroWrapper<OUT>,NullWritable> output, Reporter reporter) throws IOException { if ( this .out == null ) { this .out = output; this .reporter = reporter; } reduce(wrapper.datum()); } If you use your own group comparator, the child reducer will only receive the first datum of each group. Any ideas about how to solve that?
          Hide
          Doug Cutting added a comment -

          > Yes, I've tried, but doesn't work.

          Is that because of AVRO-534, or something else?

          > Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer.

          Is that so bad?

          > Any ideas about how to solve that?

          Would AVRO-581 help?

          Show
          Doug Cutting added a comment - > Yes, I've tried, but doesn't work. Is that because of AVRO-534 , or something else? > Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer. Is that so bad? > Any ideas about how to solve that? Would AVRO-581 help?
          Hide
          Iván de Prado added a comment -

          > Is that because of AVRO-534, or something else?

          No. The problem is that the Configuration is not available at DeserializerComparator, so it is not possible to instantiate a SerializationFactory with "new SerializationFactory(jobConf)".

          >> Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer.
          > Is that so bad?

          Consider the following user case: you manage a data file with people Profiles. Each Profile has many fields. Common operations could be grouping by address, grouping by name and other combinations. Maintaining several Avro schema files for each needed sorting is not reasonable (it also would include several specific Java classes).

          > Would AVRO-581 help?

          Well, I couldn't tell you. It would depend on the implementation details. It will be useful if the Avro Hadoop's implementation supports <key, value>, including ways for providing your own partitioner, group comparator and key comparator.

          Show
          Iván de Prado added a comment - > Is that because of AVRO-534 , or something else? No. The problem is that the Configuration is not available at DeserializerComparator, so it is not possible to instantiate a SerializationFactory with "new SerializationFactory(jobConf)". >> Otherwise you would need a different Avro schema with a different sorting for each kind of grouping you want to do in the reducer. > Is that so bad? Consider the following user case: you manage a data file with people Profiles. Each Profile has many fields. Common operations could be grouping by address, grouping by name and other combinations. Maintaining several Avro schema files for each needed sorting is not reasonable (it also would include several specific Java classes). > Would AVRO-581 help? Well, I couldn't tell you. It would depend on the implementation details. It will be useful if the Avro Hadoop's implementation supports <key, value>, including ways for providing your own partitioner, group comparator and key comparator.

            People

            • Assignee:
              Doug Cutting
              Reporter:
              Doug Cutting
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development