Avro
  1. Avro
  2. AVRO-669

Avro Mapreduce Doesn't Work With Reflect Schemas

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.5.0
    • Component/s: java
    • Labels:
      None

      Description

      I'm trying to get the Avro trunk code (from Subversion) to work with a simple example of a reflection-defined schema, using a class I created. I use a ReflectDatumWriter to write a set of records to a file, e.g.,

      DatumWriter writer = new ReflectDatumWriter(Record.class);
      DataFileWriter file = new DataFileWriter(writer);

      However, when I try to read that data in using an AvroMapper it fails with an exception as shown below. It turns out that the mapreduce implementation hard-codes a dependence on SpecificDatum readers and writers.

      I've tested switching to use ReflectDatum instead in five places to try to get it to work for an end-to-end reflect data example:
      AvroFileInputFormat
      AvroFileOutputFormat
      AvroSerialization (getDeserializer and getSerializer)
      AvroKeyComparator

      However, switching to use reflection for AvroKeyComparator doesn't work:
      java.lang.UnsupportedOperationException
      at org.apache.avro.reflect.ReflectData.compare(ReflectData.java:427)
      at org.apache.avro.mapred.AvroKeyComparator.compare(AvroKeyComparator.java:46)

      It should be possible to implement compare on reflect data (just like GenericData's implementation but use the field name instead (or better yet a cached java.lang.reflect.Field)...

      Original exception:
      java.lang.ClassCastException: tba.mr.sample.avro.Record cannot be cast to org.apache.avro.generic.IndexedRecord
      at org.apache.avro.generic.GenericDatumReader.setField(GenericDatumReader.java:152)
      at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:142)
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:114)
      at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105)
      at org.apache.avro.file.DataFileStream.next(DataFileStream.java:198)
      at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:63)
      at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:33)
      at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)
      at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)
      at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
      at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
      at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
      at org.apache.hadoop.mapred.Child.main(Child.java:170)

      1. AVRO-669.patch
        20 kB
        Doug Cutting
      2. AVRO-669.patch
        19 kB
        Doug Cutting
      3. AVRO-669.patch.2
        5 kB
        Ron Bodkin
      4. AVRO-669.patch
        15 kB
        Doug Cutting

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          I committed this.

          Show
          Doug Cutting added a comment - I committed this.
          Hide
          Doug Cutting added a comment -

          Unless there are objections, I will commit this soon.

          Show
          Doug Cutting added a comment - Unless there are objections, I will commit this soon.
          Hide
          Doug Cutting added a comment -

          Here's a version of this patch that applies to trunk after the AVRO-647 re-org.

          Show
          Doug Cutting added a comment - Here's a version of this patch that applies to trunk after the AVRO-647 re-org.
          Hide
          Doug Cutting added a comment -

          Note that, combined with other issues, this nearly completes AVRO-638 too. The representation that strings are read into is the only remaining incompatibility.

          Show
          Doug Cutting added a comment - Note that, combined with other issues, this nearly completes AVRO-638 too. The representation that strings are read into is the only remaining incompatibility.
          Hide
          Doug Cutting added a comment -

          Here's a patch that makes Avro's MapReduce API work with reflection-based data, including a test.

          Jobs that wish to use reflection should call AvroJob.setReflect(job).

          Note that this also makes the reflect data representation almost a complete superset of specific, which is itself a superset of generic. The only exception is that, in reflect, strings are read as java.lang.String while in specific and generic, strings are read as org.apache.avro.util.Utf8. Either class may be used when writing strings.

          Show
          Doug Cutting added a comment - Here's a patch that makes Avro's MapReduce API work with reflection-based data, including a test. Jobs that wish to use reflection should call AvroJob.setReflect(job). Note that this also makes the reflect data representation almost a complete superset of specific, which is itself a superset of generic. The only exception is that, in reflect, strings are read as java.lang.String while in specific and generic, strings are read as org.apache.avro.util.Utf8. Either class may be used when writing strings.
          Hide
          Spike Gronim added a comment -

          I have also experienced this issue. In my case I could work around it by disabling the Combiner.

          Show
          Spike Gronim added a comment - I have also experienced this issue. In my case I could work around it by disabling the Combiner.
          Hide
          Doug Cutting added a comment -

          I moved my patch from here to a new issue, AVRO-678, since it solves a distinct sub-problem.

          Show
          Doug Cutting added a comment - I moved my patch from here to a new issue, AVRO-678 , since it solves a distinct sub-problem.
          Hide
          Ron Bodkin added a comment -

          From discussion on IRC, it'd be better to change the default values that ReflectData uses when reading in types to be the values that are generated by the Avro code generator (e.g., it should use GenericData.Array rather than a Java array by default).

          This will also require encoding reflection data for arrays that used to be a default, e.g., the ReflectData.createSchema method should emit a CLASS_PROP of GenericArrayType.class and reflect data reader should honor that.

          Show
          Ron Bodkin added a comment - From discussion on IRC, it'd be better to change the default values that ReflectData uses when reading in types to be the values that are generated by the Avro code generator (e.g., it should use GenericData.Array rather than a Java array by default). This will also require encoding reflection data for arrays that used to be a default, e.g., the ReflectData.createSchema method should emit a CLASS_PROP of GenericArrayType.class and reflect data reader should honor that.
          Hide
          Ron Bodkin added a comment -

          That makeWriter method was a hold-over from an earlier attempt at allowing the use of either reflect or specific. If we can let it always use reflect that would be a lot better (and I'd just eliminate makeWriter). Likewise for that constructor flag - hopefully something to just drop.

          To fit the approach of converting Utf8 to CharSequence there would also be a need to have a similar change in any of the Avro-generated classes, e.g., if I try to read an array of strings with reflection using Avro-generated code I get this exception:

          java.lang.ClassCastException: [Ljava.lang.String; cannot be cast to java.util.List

          This got generated as
          public java.util.List<java.lang.CharSequence> key;

          The back trace is:
          at org.apache.avro.generic.GenericData.setField(GenericData.java:377)
          at org.apache.avro.reflect.ReflectData.setField(ReflectData.java:79)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:149)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:121)
          at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:178)

          The problem here is that the system can't detect the type of null fields. Since it's possible to have fields that should be null, we can't just initialize an empty collection. Instead, it looks to me like the generated schema for a generated class needs to emit the right hints in the form of these schema properties:
          static final String CLASS_PROP = "java-class";
          static final String ELEMENT_PROP = "java-element-class";

          Show
          Ron Bodkin added a comment - That makeWriter method was a hold-over from an earlier attempt at allowing the use of either reflect or specific. If we can let it always use reflect that would be a lot better (and I'd just eliminate makeWriter). Likewise for that constructor flag - hopefully something to just drop. To fit the approach of converting Utf8 to CharSequence there would also be a need to have a similar change in any of the Avro-generated classes, e.g., if I try to read an array of strings with reflection using Avro-generated code I get this exception: java.lang.ClassCastException: [Ljava.lang.String; cannot be cast to java.util.List This got generated as public java.util.List<java.lang.CharSequence> key; The back trace is: at org.apache.avro.generic.GenericData.setField(GenericData.java:377) at org.apache.avro.reflect.ReflectData.setField(ReflectData.java:79) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:149) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:121) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:178) The problem here is that the system can't detect the type of null fields. Since it's possible to have fields that should be null, we can't just initialize an empty collection. Instead, it looks to me like the generated schema for a generated class needs to emit the right hints in the form of these schema properties: static final String CLASS_PROP = "java-class"; static final String ELEMENT_PROP = "java-element-class";
          Hide
          Doug Cutting added a comment -

          > What do you think is the right way to handle this inconsistency?

          In TestWordCount,.java, we should change all non-constructor references to Utf8 to CharSequence, the common interface between String and Utf8.

          Looking at your patch, you've added a few features without explanation. You've added an AvroSerialization#makeWriter() method. This might be reasonable, but since it's protected and will appear in javadoc, it deserves a javadoc comment. To AvroRecordReader you add a new constructor with a flag, indicating whether reflection's to be used. Should we rather be consistent here in how we specify this?

          An early version of this API had a job parameter, avro.input.api, avro.mapout.api and avro.output.api or somesuch, that could have values "reflect", "specific", or "generic". Perhaps we should revive that approach? The default would be "reflect", and mapout would default to output. Thoughts?

          Show
          Doug Cutting added a comment - > What do you think is the right way to handle this inconsistency? In TestWordCount,.java, we should change all non-constructor references to Utf8 to CharSequence, the common interface between String and Utf8. Looking at your patch, you've added a few features without explanation. You've added an AvroSerialization#makeWriter() method. This might be reasonable, but since it's protected and will appear in javadoc, it deserves a javadoc comment. To AvroRecordReader you add a new constructor with a flag, indicating whether reflection's to be used. Should we rather be consistent here in how we specify this? An early version of this API had a job parameter, avro.input.api, avro.mapout.api and avro.output.api or somesuch, that could have values "reflect", "specific", or "generic". Perhaps we should revive that approach? The default would be "reflect", and mapout would default to output. Thoughts?
          Hide
          Ron Bodkin added a comment -

          This change does break the word count test:

          [junit] java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.util.Utf8
          [junit] at org.apache.avro.mapred.TestWordCount$MapImpl.map(TestWordCount.java:43)
          [junit] at org.apache.avro.mapred.HadoopMapper.map(HadoopMapper.java:80)
          [junit] at org.apache.avro.mapred.HadoopMapper.map(HadoopMapper.java:34)
          [junit] at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
          [junit] at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
          [junit] at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
          [junit] at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)

          This was because ReflectDatumReader.readString returns a String, whereas GenericDatumReader.readString returns a Utf8.

          I'm tested making ReflectDatumReader return a Utf8, but there are a number of places in the code that expect it to return a String - e.g., a protocol test breaks and TestDataFileReflect breaks also.

          What do you think is the right way to handle this inconsistency?

          Show
          Ron Bodkin added a comment - This change does break the word count test: [junit] java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.util.Utf8 [junit] at org.apache.avro.mapred.TestWordCount$MapImpl.map(TestWordCount.java:43) [junit] at org.apache.avro.mapred.HadoopMapper.map(HadoopMapper.java:80) [junit] at org.apache.avro.mapred.HadoopMapper.map(HadoopMapper.java:34) [junit] at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) [junit] at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) [junit] at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) [junit] at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177) This was because ReflectDatumReader.readString returns a String, whereas GenericDatumReader.readString returns a Utf8. I'm tested making ReflectDatumReader return a Utf8, but there are a number of places in the code that expect it to return a String - e.g., a protocol test breaks and TestDataFileReflect breaks also. What do you think is the right way to handle this inconsistency?
          Hide
          Ron Bodkin added a comment -

          Additional patch, to applied after AVRO-669.patch, to allow reflect data to be used with mapreduce.

          I haven't got the map/reduce tests for avro working in my development environment, so they still needs to be verified and extended to test reflect data as well.

          Show
          Ron Bodkin added a comment - Additional patch, to applied after AVRO-669 .patch, to allow reflect data to be used with mapreduce. I haven't got the map/reduce tests for avro working in my development environment, so they still needs to be verified and extended to test reflect data as well.
          Hide
          Ron Bodkin added a comment -

          Thanks Doug, that does help. With those changes (and a few more that I'll attach as a patch to be applied in addition) I was able to run my sample code.

          Show
          Ron Bodkin added a comment - Thanks Doug, that does help. With those changes (and a few more that I'll attach as a patch to be applied in addition) I was able to run my sample code.
          Hide
          Doug Cutting added a comment -

          Here's a patch that implements ReflectData.compare() for all types but Object[] and byte[]. It moves the getField() and setField() methods from the reader/writers to GenericData, so that the generic implementation of record comparison can be shared. This also eliminates a little duplicate code.

          It doesn't add full end-to-end MapReduce tests for reflect-based data, so I have not verified whether that works yet, but it should address the proximal cause.

          AVRO-638 is related. When that's complete, it should be possible to switch the mapred code to always use ReflectDatumReader, ReflectDatumWriter and ReflectData, since these should then work for specific and generic data as well.

          Please tell me if this helps.

          Show
          Doug Cutting added a comment - Here's a patch that implements ReflectData.compare() for all types but Object[] and byte[]. It moves the getField() and setField() methods from the reader/writers to GenericData, so that the generic implementation of record comparison can be shared. This also eliminates a little duplicate code. It doesn't add full end-to-end MapReduce tests for reflect-based data, so I have not verified whether that works yet, but it should address the proximal cause. AVRO-638 is related. When that's complete, it should be possible to switch the mapred code to always use ReflectDatumReader, ReflectDatumWriter and ReflectData, since these should then work for specific and generic data as well. Please tell me if this helps.

            People

            • Assignee:
              Doug Cutting
              Reporter:
              Ron Bodkin
            • Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development