Avro
  1. Avro
  2. AVRO-969

Make possible usage of SpecificDatumWriter in avro-mapred

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 1.6.1
    • Fix Version/s: None
    • Component/s: java
    • Labels:
      None

      Description

      I realized that ReflectDatumWriter is always used when running mapred job (in AvroOutputFormat.java). Sometimes it leads to bugs like in AVRO-966.
      Why not just provide a property like WRITER_IS_REFLECT = "avro.map.writer.is.reflect"; to make a decision which DatumWriter should be used.
      I created a small patch to solve this:

      avro-mapred.patch
      Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
      ===================================================================
      --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java  (revision 1209417)
      +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java  (revision )
      @@ -53,6 +53,8 @@
         /** The configuration key for reflection-based map output representation. */
         public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
       
      +  public static final String WRITER_IS_REFLECT = "avro.map.writer.is.reflect";
      +
         /** Configure a job's map input schema. */
         public static void setInputSchema(JobConf job, Schema s) {
           job.set(INPUT_SCHEMA, s.toString());
      Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
      ===================================================================
      --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (revision 1209417)
      +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (revision )
      @@ -23,6 +23,7 @@
       import java.util.Map;
       import java.net.URLDecoder;
       
      +import org.apache.avro.specific.SpecificDatumWriter;
       import org.apache.hadoop.io.NullWritable;
       import org.apache.hadoop.fs.FileSystem;
       import org.apache.hadoop.fs.Path;
      @@ -102,8 +103,9 @@
             ? AvroJob.getMapOutputSchema(job)
             : AvroJob.getOutputSchema(job);
       
      -    final DataFileWriter<T> writer =
      -      new DataFileWriter<T>(new ReflectDatumWriter<T>());
      +    final DataFileWriter<T> writer = job.getBoolean(AvroJob.WRITER_IS_REFLECT, false) ?
      +      new DataFileWriter<T>(new ReflectDatumWriter<T>()) :
      +      new DataFileWriter<T>(new SpecificDatumWriter<T>());
           
           configureDataFileWriter(writer, job);
      

      Does it make sense?

        Activity

        Hide
        Vyacheslav Zholudev added a comment -

        Now that TestReflectJob test passes:

        Bar.java
        Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
        ===================================================================
        --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java	(revision 1209417)
        +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java	(revision )
        @@ -53,6 +53,8 @@
           /** The configuration key for reflection-based map output representation. */
           public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
         
        +  public static final String WRITER_IS_REFLECT = "avro.map.writer.is.reflect";
        +
           /** Configure a job's map input schema. */
           public static void setInputSchema(JobConf job, Schema s) {
             job.set(INPUT_SCHEMA, s.toString());
        @@ -117,6 +119,7 @@
           public static void setReflect(JobConf job) {
             setInputReflect(job);
             setMapOutputReflect(job);
        +    setWriterReflect(job);
           }
           
           /** Indicate that a job's input data should use reflect representation.*/
        @@ -129,6 +132,10 @@
             job.setBoolean(MAP_OUTPUT_IS_REFLECT, true);
           }
         
        +  public static void setWriterReflect(JobConf job) {
        +    job.setBoolean(WRITER_IS_REFLECT, true);
        +  }
        +
           /** Return a job's output key schema. */
           public static Schema getOutputSchema(Configuration job) {
             return Schema.parse(job.get(OUTPUT_SCHEMA));
        Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
        ===================================================================
        --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java	(revision 1209417)
        +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java	(revision )
        @@ -23,6 +23,7 @@
         import java.util.Map;
         import java.net.URLDecoder;
         
        +import org.apache.avro.specific.SpecificDatumWriter;
         import org.apache.hadoop.io.NullWritable;
         import org.apache.hadoop.fs.FileSystem;
         import org.apache.hadoop.fs.Path;
        @@ -102,8 +103,9 @@
               ? AvroJob.getMapOutputSchema(job)
               : AvroJob.getOutputSchema(job);
         
        -    final DataFileWriter<T> writer =
        -      new DataFileWriter<T>(new ReflectDatumWriter<T>());
        +    final DataFileWriter<T> writer = job.getBoolean(AvroJob.WRITER_IS_REFLECT, false) ?
        +      new DataFileWriter<T>(new ReflectDatumWriter<T>()) :
        +      new DataFileWriter<T>(new SpecificDatumWriter<T>());
             
             configureDataFileWriter(writer, job);
        
        Show
        Vyacheslav Zholudev added a comment - Now that TestReflectJob test passes: Bar.java Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java =================================================================== --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java (revision 1209417) +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java (revision ) @@ -53,6 +53,8 @@ /** The configuration key for reflection-based map output representation. */ public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect" ; + public static final String WRITER_IS_REFLECT = "avro.map.writer.is.reflect" ; + /** Configure a job's map input schema. */ public static void setInputSchema(JobConf job, Schema s) { job.set(INPUT_SCHEMA, s.toString()); @@ -117,6 +119,7 @@ public static void setReflect(JobConf job) { setInputReflect(job); setMapOutputReflect(job); + setWriterReflect(job); } /** Indicate that a job's input data should use reflect representation.*/ @@ -129,6 +132,10 @@ job.setBoolean(MAP_OUTPUT_IS_REFLECT, true ); } + public static void setWriterReflect(JobConf job) { + job.setBoolean(WRITER_IS_REFLECT, true ); + } + /** Return a job's output key schema. */ public static Schema getOutputSchema(Configuration job) { return Schema.parse(job.get(OUTPUT_SCHEMA)); Index: lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java =================================================================== --- lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (revision 1209417) +++ lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (revision ) @@ -23,6 +23,7 @@ import java.util.Map; import java.net.URLDecoder; + import org.apache.avro.specific.SpecificDatumWriter; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -102,8 +103,9 @@ ? AvroJob.getMapOutputSchema(job) : AvroJob.getOutputSchema(job); - final DataFileWriter<T> writer = - new DataFileWriter<T>( new ReflectDatumWriter<T>()); + final DataFileWriter<T> writer = job.getBoolean(AvroJob.WRITER_IS_REFLECT, false ) ? + new DataFileWriter<T>( new ReflectDatumWriter<T>()) : + new DataFileWriter<T>( new SpecificDatumWriter<T>()); configureDataFileWriter(writer, job);
        Hide
        Doug Cutting added a comment -

        ReflectDatumWriter should be able to correctly write a superset of the types that SpecificDatumWriter can write, so this property should not be needed. That said, it might be good to be able to override the DatumWriter and/or DatumReader classes used by Avro's mapred API. This might permit, e.g., ThriftDatumWriter to be used. So the patch that might be best is to switch to using an "avro.input.datumReader", "avro.map_output.datumWriter", and avro.output.datumWriter" properties that name classes whose constructor accepts a schema parameter.

        Show
        Doug Cutting added a comment - ReflectDatumWriter should be able to correctly write a superset of the types that SpecificDatumWriter can write, so this property should not be needed. That said, it might be good to be able to override the DatumWriter and/or DatumReader classes used by Avro's mapred API. This might permit, e.g., ThriftDatumWriter to be used. So the patch that might be best is to switch to using an "avro.input.datumReader", "avro.map_output.datumWriter", and avro.output.datumWriter" properties that name classes whose constructor accepts a schema parameter.
        Hide
        Vyacheslav Zholudev added a comment -

        Yes, it should, but it does not for me (see AVRO-966) - can somebody confirm that it is a bug or my misusage of avro specific objects?
        Also I thought that using SpecificDatumWriter rather than ReflectDatumWriter might be more efficient for specific objects since generated methods can be used instead of reflection.

        Show
        Vyacheslav Zholudev added a comment - Yes, it should, but it does not for me (see AVRO-966 ) - can somebody confirm that it is a bug or my misusage of avro specific objects? Also I thought that using SpecificDatumWriter rather than ReflectDatumWriter might be more efficient for specific objects since generated methods can be used instead of reflection.
        Hide
        Doug Cutting added a comment -

        AVRO-966 is a bug. I supplied a patch there.

        SpecificDatumWriter might be a bit more efficient. Reflection is not in general used for specific objects though, even when ReflectDatumReader is used. Rather ReflectDatumReader detects the specific object and uses SpecificDatumReader, but that detection adds a small cost.

        Show
        Doug Cutting added a comment - AVRO-966 is a bug. I supplied a patch there. SpecificDatumWriter might be a bit more efficient. Reflection is not in general used for specific objects though, even when ReflectDatumReader is used. Rather ReflectDatumReader detects the specific object and uses SpecificDatumReader, but that detection adds a small cost.

          People

          • Assignee:
            Unassigned
            Reporter:
            Vyacheslav Zholudev
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:

              Development