Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-1462

Enable context-specific and stateful serializers in MapReduce

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: task
    • Labels:
      None

      Description

      Although the current serializer framework is powerful, within the context of a job it is limited to picking a single serializer for a given class. Additionally, Avro generic serialization can make use of additional configuration/state such as the schema. (Most other serialization frameworks including Writable, Jute/Record IO, Thrift, Avro Specific, and Protocol Buffers only need the object's class name to deserialize the object.)

      With the goal of keeping the easy things easy and maintaining backwards compatibility, we should be able to allow applications to use context specific (eg. map output key) serializers in addition to the current type based ones that handle the majority of the cases. Furthermore, we should be able to support serializer specific configuration/metadata in a type safe manor without cluttering up the base API with a lot of new methods that will confuse new users.

      1. h-1462.patch
        14 kB
        Owen O'Malley
      2. MAPREDUCE-1462-common.patch
        5 kB
        Tom White
      3. MAPREDUCE-1462-mr.patch
        77 kB
        Tom White

        Issue Links

          Activity

          Hide
          Owen O'Malley added a comment -

          Ok this patch is a rough sketch of the way we could refactor the serialization interface.

          The RootSerilizationFactory provides a factory to look up a serialization given an object's class.

          Serializations contain a serialize and deserialize method along with the Input/OutputStream to write the object to. They also contain a serializeSelf/deserializeSelf method that serializes the metadata for that serializer. By having the serializer handle and parse the metadata itself, it means the framework doesn't need to support each individual serializers' information and it doesn't lose all type safety of the string to string maps where a single mispelling can cause an attribute to not be found or an extra space can cause parse errors.

          There is a subtype of Serialization named TypedSerialization that is the base class for all of the serializations that use the object's class as their metadata. This would include the Writable, Thrift, ProtocolBuffers, and Avro Specific serializers.

          For containers such as SequenceFile and T-File, the file would contain the name of the serializer class and the serializer class' metadata. That is enough to reconstruct the serialization and deserialize the objects. For writing a SequenceFile or T-File, you can specify the types as currently and let the root serialization factory pick the serializers or you can provide them explicitly.

          In terms of how this would hook to MapReduce, the job would have the map outputs' class name configured as currently, but it would just require that the actual type be assignable to the declared type. (ie. you can set the map output type to Object and pass anything.)

          There would be an enumeration of the contexts and a method to set a serialization for a specific context:

          public enum SerializationContext {
             DEFAULT, 
             MAP_OUTPUT_KEY, 
             MAP_OUTPUT_VALUE, 
             REDUCE_OUTPUT_KEY, 
             REDUCE_OUTPUT_VALUE, 
             INPUT_SPLIT
          };
          

          and the Job/JobContext would get a new setter/getter for getting the Serialization for each context. If the user doesn't specify a given context, it will use the default. If the default isn't specified, it will use the root serialization factory for the assignable type.

          Show
          Owen O'Malley added a comment - Ok this patch is a rough sketch of the way we could refactor the serialization interface. The RootSerilizationFactory provides a factory to look up a serialization given an object's class. Serializations contain a serialize and deserialize method along with the Input/OutputStream to write the object to. They also contain a serializeSelf/deserializeSelf method that serializes the metadata for that serializer. By having the serializer handle and parse the metadata itself, it means the framework doesn't need to support each individual serializers' information and it doesn't lose all type safety of the string to string maps where a single mispelling can cause an attribute to not be found or an extra space can cause parse errors. There is a subtype of Serialization named TypedSerialization that is the base class for all of the serializations that use the object's class as their metadata. This would include the Writable, Thrift, ProtocolBuffers, and Avro Specific serializers. For containers such as SequenceFile and T-File, the file would contain the name of the serializer class and the serializer class' metadata. That is enough to reconstruct the serialization and deserialize the objects. For writing a SequenceFile or T-File, you can specify the types as currently and let the root serialization factory pick the serializers or you can provide them explicitly. In terms of how this would hook to MapReduce, the job would have the map outputs' class name configured as currently, but it would just require that the actual type be assignable to the declared type. (ie. you can set the map output type to Object and pass anything.) There would be an enumeration of the contexts and a method to set a serialization for a specific context: public enum SerializationContext { DEFAULT, MAP_OUTPUT_KEY, MAP_OUTPUT_VALUE, REDUCE_OUTPUT_KEY, REDUCE_OUTPUT_VALUE, INPUT_SPLIT }; and the Job/JobContext would get a new setter/getter for getting the Serialization for each context. If the user doesn't specify a given context, it will use the default. If the default isn't specified, it will use the root serialization factory for the assignable type.
          Hide
          Tom White added a comment -

          Owen, thanks for posting your design. I've reproduced my comments on the design which I made on MAPREDUCE-1126 here for convenience:

          • The changes to the serialization API are not backwards compatible, so a new package of serializer types would need creating. Is this really necessary to achieve Avro integration?
          • I'm not sure why we need to serialize serializations. The patch in MAPREDUCE-1126 avoids the need for this by using a simple string mechanism for configuration. Having an opaque binary format also makes it difficult to retrieve and use the serialization from other languages (e.g. C++ or other Pipes languages). My latest patch on MAPREDUCE-1126 is language-neutral in this regard.
          • Adding a side file for the context-serializer mapping complicates the implementation. It's not clear what container file would be used for the side file (Avro container, custom?). I understand that putting framework configuration in the job configuration may not be desirable, but it has been done in the past so I don't know why it is being ruled out here. I would rather have a separate effort (and discussion) to create a "private" job configuration (not accessible by user code) for such configuration (above and beyond the configuration needed for serialization).
          • The user API is no shorter than the one proposed in MAPREDUCE-1126. Compare:
            Schema keySchema = ...
            AvroGenericSerialization serialization = new AvroGenericSerialization();
            serialization.setSchema(keySchema);
            job.set(SerializationContext.MAP_OUTPUT_KEY, serialization);
            

            with

            Schema keySchema = ...
            AvroGenericData.setMapOutputKeySchema(job, keySchema);
            
          Show
          Tom White added a comment - Owen, thanks for posting your design. I've reproduced my comments on the design which I made on MAPREDUCE-1126 here for convenience: The changes to the serialization API are not backwards compatible, so a new package of serializer types would need creating. Is this really necessary to achieve Avro integration? I'm not sure why we need to serialize serializations. The patch in MAPREDUCE-1126 avoids the need for this by using a simple string mechanism for configuration. Having an opaque binary format also makes it difficult to retrieve and use the serialization from other languages (e.g. C++ or other Pipes languages). My latest patch on MAPREDUCE-1126 is language-neutral in this regard. Adding a side file for the context-serializer mapping complicates the implementation. It's not clear what container file would be used for the side file (Avro container, custom?). I understand that putting framework configuration in the job configuration may not be desirable, but it has been done in the past so I don't know why it is being ruled out here. I would rather have a separate effort (and discussion) to create a "private" job configuration (not accessible by user code) for such configuration (above and beyond the configuration needed for serialization). The user API is no shorter than the one proposed in MAPREDUCE-1126 . Compare: Schema keySchema = ... AvroGenericSerialization serialization = new AvroGenericSerialization(); serialization.setSchema(keySchema); job.set(SerializationContext.MAP_OUTPUT_KEY, serialization); with Schema keySchema = ... AvroGenericData.setMapOutputKeySchema(job, keySchema);
          Hide
          Doug Cutting added a comment -

          > Serializations contain a serialize and deserialize method along with the Input/OutputStream to write the object to.

          Where would these be stored? Are you proposing we add another file to each job? Currently we have conf+splits+jar. Do we really want tasks to have to open more files?

          This proposes fundamental changes to job submission that should be addressed elsewhere. We can achieve the goals of this issue using the existing mechanisms, as in Tom & Aaron's patch to MAPREDUCE-1126. Changing job submission in the way you suggest should be discussed separately, in MAPREDUCE-1183.

          > it doesn't lose all type safety of the string to string maps where a single misspelling can cause an attribute to not be found or an extra space can cause parse errors

          We have long used string maps for Hadoop configuration. These strings are generally written by programs that use constants to avoid misspellings. This approach has long served HTTP, SMTP and unix enviroment variables as a language-independent means of specifying parameters. If you want to change this, it should be addressed systematically in MAPREDUCE-1183.

          > the job would have the map outputs' class name configured as currently,

          So we'd have a class name in the job configuration too? That seems redundant and inconsistent with your misspelling concerns.

          > the Job/JobContext would get a new setter/getter for getting the Serialization for each context.

          HADOOP-6420 already provided a mechanism for this purpose. Tom & Aaron's patch for HADOOP-1126 use this mechanism with a constant per context rather than an enum. Would you prefer that an enum was used? Perhaps you could suggest that there?

          Show
          Doug Cutting added a comment - > Serializations contain a serialize and deserialize method along with the Input/OutputStream to write the object to. Where would these be stored? Are you proposing we add another file to each job? Currently we have conf+splits+jar. Do we really want tasks to have to open more files? This proposes fundamental changes to job submission that should be addressed elsewhere. We can achieve the goals of this issue using the existing mechanisms, as in Tom & Aaron's patch to MAPREDUCE-1126 . Changing job submission in the way you suggest should be discussed separately, in MAPREDUCE-1183 . > it doesn't lose all type safety of the string to string maps where a single misspelling can cause an attribute to not be found or an extra space can cause parse errors We have long used string maps for Hadoop configuration. These strings are generally written by programs that use constants to avoid misspellings. This approach has long served HTTP, SMTP and unix enviroment variables as a language-independent means of specifying parameters. If you want to change this, it should be addressed systematically in MAPREDUCE-1183 . > the job would have the map outputs' class name configured as currently, So we'd have a class name in the job configuration too? That seems redundant and inconsistent with your misspelling concerns. > the Job/JobContext would get a new setter/getter for getting the Serialization for each context. HADOOP-6420 already provided a mechanism for this purpose. Tom & Aaron's patch for HADOOP-1126 use this mechanism with a constant per context rather than an enum. Would you prefer that an enum was used? Perhaps you could suggest that there?
          Hide
          Arun C Murthy added a comment -

          Where would these be stored? Are you proposing we add another file to each job? Currently we have conf+splits+jar. Do we really want tasks to have to open more files?

          This proposes fundamental changes to job submission that should be addressed elsewhere. We can achieve the goals of this issue using the existing mechanisms, as in Tom & Aaron's patch to MAPREDUCE-1126. Changing job submission in the way you suggest should be discussed separately, in MAPREDUCE-1183.

          Iff there is consensus that this is the the model being proposed in MAPREDUCE-1183, we could start that journey in this patch, no? Why do we need to do the work twice i.e. first put in the conf, then move it to the (serialized) job-description file (say, job.data) via MAPREDUCE-1183?

          Show
          Arun C Murthy added a comment - Where would these be stored? Are you proposing we add another file to each job? Currently we have conf+splits+jar. Do we really want tasks to have to open more files? This proposes fundamental changes to job submission that should be addressed elsewhere. We can achieve the goals of this issue using the existing mechanisms, as in Tom & Aaron's patch to MAPREDUCE-1126 . Changing job submission in the way you suggest should be discussed separately, in MAPREDUCE-1183 . Iff there is consensus that this is the the model being proposed in MAPREDUCE-1183 , we could start that journey in this patch, no? Why do we need to do the work twice i.e. first put in the conf, then move it to the (serialized) job-description file (say, job.data) via MAPREDUCE-1183 ?
          Hide
          Owen O'Malley added a comment - - edited

          The changes to the serialization API are not backwards compatible, so a new package of serializer types would need creating. Is this really necessary to achieve Avro integration?

          No, it is not necessary. I believe it to be a much cleaner interface. The current interface defines the metadata for serializers as a Map<String,String>. The metadata is not supposed to be user facing but defined by each particular serialization to control its own serialization and deserialization. For opaque data that is not intended to be interpreted by the user, isn't a binary blob a better way to communicate the intent than a Map<String,String>?

          I'm not sure why we need to serialize serializations.

          The goal is to reduce the chance that user's will make a mistake in calling the API. With your patch on 1126, all of the application's control over serialization is done indirectly through static methods that reach into the Job's configuration and set the map. So roughly,

          1. user calls to serializer-specific configuration code, which manually sets the configuration with the metadata.
          2. to get the serializer, the frameworks gets the metadata from the configuration, and looks through the list of serializations for the first one that will accept that metadata. The selected serializer gets the metadata and hopefully does the right thing.

          I think it is much clearer and less error-prone, if the framework has a method that takes a serializer and uses that to get the metadata. The serializer serialization is really just methods to read and write serialization specific metadata.

          Under your 1126 patch, my hypothetical magic serialization looks like:

          public class MyMagicSerialization extends SerializerBase {
            public static void setMapOutputKeyMagicMetdata(Job job, Other metadata) { ... }
            public static void setMapOutputValueMagicMetdata(Job job, Other metadata) { ... }
            public static void setReduceOutputKeyMagicMetdata(Job job, Other metadata) { ... }
            public static void setReduceOutputValueMagicMetdata(Job job, Other metadata) { ... }
            public boolean accept(Map<String,String> metadata) { ... }
            ...
          }
          

          The first thing to notice is that my serialization, which has does not depend on MapReduce needs a bunch of methods that are very concretely tied to MapReduce. Furthermore, it is difficult to extend by adding new contexts. If HBase needs to use this serializer they need to add a new method to the serializer for their context.

          Now look at the equivalent in my scheme:

          public class MyMagicSerialization extends Serialization {
            public void setMMagicMetdata(Other metadata) { ... }
            ...
          }
          

          It does not depend on MapReduce and the way that MapReduce may use it. On the other hand, I do want a method to set the serializer for each context:

          public class Job extends JobContext {
            enum Context {MAP_OUT_KEY, MAP_OUT_VALUE,  REDUCE_OUT_KEY, REDUCE_OUT_VALUE};
            public void setSerialization(Context context, Serialization serialization);
          ...
          }
          

          I've pulled all of the MapReduce specific code into MapReduce's Job class. That is a much better place for it to be. Furthermore, if MapReduce adds a new context, it only means changing Job by adding a new value to an enum and not adding new methods to all of the serializers. That is a big win.

          Other nice changes are:

          • having serialize/deserialize methods rather than objects represents the real semantics in that they should not be storing state between calls (ie. the bug that hit the original java serialization).
          • it also means that the merge code can be given a single object that it reuse for both serialization and deserialization rather than one of each
          • the new api also means that you can serialize/deserialize to a new stream without recreating the object

          As to where the serialized metadata is stored, I don't care nearly as much. It might make sense to stick the encoded bytes into the configuration, write it into a new file, or add it to the input split file. That matters much less to me than getting APIs that are clean, understandable, and extensible.

          Show
          Owen O'Malley added a comment - - edited The changes to the serialization API are not backwards compatible, so a new package of serializer types would need creating. Is this really necessary to achieve Avro integration? No, it is not necessary. I believe it to be a much cleaner interface. The current interface defines the metadata for serializers as a Map<String,String>. The metadata is not supposed to be user facing but defined by each particular serialization to control its own serialization and deserialization. For opaque data that is not intended to be interpreted by the user, isn't a binary blob a better way to communicate the intent than a Map<String,String>? I'm not sure why we need to serialize serializations. The goal is to reduce the chance that user's will make a mistake in calling the API. With your patch on 1126, all of the application's control over serialization is done indirectly through static methods that reach into the Job's configuration and set the map. So roughly, 1. user calls to serializer-specific configuration code, which manually sets the configuration with the metadata. 2. to get the serializer, the frameworks gets the metadata from the configuration, and looks through the list of serializations for the first one that will accept that metadata. The selected serializer gets the metadata and hopefully does the right thing. I think it is much clearer and less error-prone, if the framework has a method that takes a serializer and uses that to get the metadata. The serializer serialization is really just methods to read and write serialization specific metadata. Under your 1126 patch, my hypothetical magic serialization looks like: public class MyMagicSerialization extends SerializerBase { public static void setMapOutputKeyMagicMetdata(Job job, Other metadata) { ... } public static void setMapOutputValueMagicMetdata(Job job, Other metadata) { ... } public static void setReduceOutputKeyMagicMetdata(Job job, Other metadata) { ... } public static void setReduceOutputValueMagicMetdata(Job job, Other metadata) { ... } public boolean accept(Map< String , String > metadata) { ... } ... } The first thing to notice is that my serialization, which has does not depend on MapReduce needs a bunch of methods that are very concretely tied to MapReduce. Furthermore, it is difficult to extend by adding new contexts. If HBase needs to use this serializer they need to add a new method to the serializer for their context. Now look at the equivalent in my scheme: public class MyMagicSerialization extends Serialization { public void setMMagicMetdata(Other metadata) { ... } ... } It does not depend on MapReduce and the way that MapReduce may use it. On the other hand, I do want a method to set the serializer for each context: public class Job extends JobContext { enum Context {MAP_OUT_KEY, MAP_OUT_VALUE, REDUCE_OUT_KEY, REDUCE_OUT_VALUE}; public void setSerialization(Context context, Serialization serialization); ... } I've pulled all of the MapReduce specific code into MapReduce's Job class. That is a much better place for it to be. Furthermore, if MapReduce adds a new context, it only means changing Job by adding a new value to an enum and not adding new methods to all of the serializers. That is a big win. Other nice changes are: having serialize/deserialize methods rather than objects represents the real semantics in that they should not be storing state between calls (ie. the bug that hit the original java serialization). it also means that the merge code can be given a single object that it reuse for both serialization and deserialization rather than one of each the new api also means that you can serialize/deserialize to a new stream without recreating the object As to where the serialized metadata is stored, I don't care nearly as much. It might make sense to stick the encoded bytes into the configuration, write it into a new file, or add it to the input split file. That matters much less to me than getting APIs that are clean, understandable, and extensible.
          Hide
          Tom White added a comment -

          In order to help understand the problem better I've created a demonstration patch that uses the SerializationContext-based user API, while retaining the Serialization code that exists in common. (In fact, I had to make some changes to the Serialization code so that it can retain its metadata in an instance variable.)

          Here's what the configuration looks like for the user:

          Schema keySchema = Schema.create(Schema.Type.STRING);
          Schema valSchema = Schema.create(Schema.Type.LONG);
          job.setSerialization(Job.SerializationContext.MAP_OUTPUT_KEY,
                     new AvroGenericSerialization(keySchema));
          job.setSerialization(Job.SerializationContext.MAP_OUTPUT_VALUE,
                     new AvroGenericSerialization(valSchema));
          
          Show
          Tom White added a comment - In order to help understand the problem better I've created a demonstration patch that uses the SerializationContext-based user API, while retaining the Serialization code that exists in common. (In fact, I had to make some changes to the Serialization code so that it can retain its metadata in an instance variable.) Here's what the configuration looks like for the user: Schema keySchema = Schema.create(Schema.Type.STRING); Schema valSchema = Schema.create(Schema.Type.LONG); job.setSerialization(Job.SerializationContext.MAP_OUTPUT_KEY, new AvroGenericSerialization(keySchema)); job.setSerialization(Job.SerializationContext.MAP_OUTPUT_VALUE, new AvroGenericSerialization(valSchema));
          Hide
          Nigel Daley added a comment -

          Too late for 0.22. Moving Fix Version from 0.22 to 0.23.

          Show
          Nigel Daley added a comment - Too late for 0.22. Moving Fix Version from 0.22 to 0.23.
          Hide
          Harsh J added a comment -

          (Not a blocker for any release, so unmarking as one for the moment)

          Show
          Harsh J added a comment - (Not a blocker for any release, so unmarking as one for the moment)

            People

            • Assignee:
              Owen O'Malley
              Reporter:
              Owen O'Malley
            • Votes:
              2 Vote for this issue
              Watchers:
              23 Start watching this issue

              Dates

              • Created:
                Updated:

                Development