Details

    • Type: Wish Wish
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Protocol Buffers (http://code.google.com/p/protobuf/) are a way of encoding data in a compact binary format. This issue is to write a ProtocolBuffersSerialization to support using Protocol Buffers types in MapReduce programs, including an example program. This should probably go into contrib.

      1. hadoop-3788-v1.patch
        60 kB
        Alex Loddengaard
      2. protobuf-java-2.0.1.jar
        236 kB
        Alex Loddengaard
      3. hadoop-3788-v2.patch
        46 kB
        Alex Loddengaard
      4. hadoop-3788-v3.patch
        48 kB
        Tom White
      5. protobuf-java-2.0.2.jar
        267 kB
        Tom White

        Issue Links

          Activity

          Hide
          Alex Loddengaard added a comment -

          After fiddling with Protocol Buffers (PBs) and reading documentation around them, actually using PBs may not require the introduction of a new Serialization class.

          PBs work in the following way:

          First, the developer defines a .proto file, which is essentially a schema that describes the type of data the user wishes to deal with. Below is an example of an addressbook.proto file taken from the PB documentation, located here: http://code.google.com/apis/protocolbuffers/docs/javatutorial.html

          addressbook.proto
          package tutorial;
          
          option java_package = "com.example.tutorial";
          option java_outer_classname = "AddressBookProtos";
          
          message Person {
            required string name = 1;
            required int32 id = 2;
            optional string email = 3;
          
            enum PhoneType {
              MOBILE = 0;
              HOME = 1;
              WORK = 2;
            }
          
            message PhoneNumber {
              required string number = 1;
              optional PhoneType type = 2 [default = HOME];
            }
          
            repeated PhoneNumber phone = 4;
          }
          
          message AddressBook {
            repeated Person person = 1;
          }
          

          Once the user has defined their .proto file, they use PB's compiler, protoc, to generate an outer Java class, accompanied by a few subclasses. Amongst the generated code are methods to serialize and deserialize given an OutputStream and InputStream, respectively.

          Refactoring .proto files is somewhat tricky given the way PBs work (read their documentation for more info), so Google recommends that PBs are wrapped inside of other classes and only used when serializing and deserializing. This structure fits in perfectly with Hadoop's Writable structure. That is, if a user wants to utilize PBs, they need to use protoc to create a Java class, which is essentially a Bean. They can then define a new Writable implementation that uses the protoc-generated class to serialize and deserialize. This is all possible without creating a ProtocolBuffersSerialization class because the default Serialization, org.apache.hadoop.io.serializer.WritableSerialization, delegates its read and write methods to the Writable that is being serialized or deserialized.

          A general ProtocolBuffersSerialization class would not fully utilize PBs to their fullest, because it would have to use a very primitive, generalized .proto file (for example a file with just one field: a large String).

          With that said, a few things can be done with regard to this feature:

          1. I can create an example that extends Text and overwrites its serialization methods to use a protoc-generated class
          2. I can begin extending Hadoop's Writable implementations to use PBs instead
          3. I can begin replacing Hadoop's Writable implementations to use PBs instead
          4. I can try and create a general ProtocolBuffersSerialization and see how it performs, though this solution seems against the premise of using PBs
          5. You can tell me that my understanding of PBs is completely wrong (please follow-up with a more accurate description if this is the case )

          Before either option 2 or 3 is decided on, profiling should be done to ensure that PBs are in fact faster than Java's built in mechanism. If profiling proves PBs are faster in all cases, then it seems like option 3 would be the most desirable. However, perhaps more discussion should be made to determine if 2 or 3 or some other solution altogether is better.

          Again, I'm totally new here, so please argue with me if I have misunderstood Hadoop's workings, PBs, or anything else. While I'm waiting for responses, I can begin working on option 1 to prove my understanding of PBs is correct.

          Show
          Alex Loddengaard added a comment - After fiddling with Protocol Buffers (PBs) and reading documentation around them, actually using PBs may not require the introduction of a new Serialization class. PBs work in the following way: First, the developer defines a .proto file, which is essentially a schema that describes the type of data the user wishes to deal with. Below is an example of an addressbook.proto file taken from the PB documentation, located here: http://code.google.com/apis/protocolbuffers/docs/javatutorial.html addressbook.proto package tutorial; option java_package = "com.example.tutorial" ; option java_outer_classname = "AddressBookProtos" ; message Person { required string name = 1; required int32 id = 2; optional string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; } message PhoneNumber { required string number = 1; optional PhoneType type = 2 [ default = HOME]; } repeated PhoneNumber phone = 4; } message AddressBook { repeated Person person = 1; } Once the user has defined their .proto file, they use PB's compiler, protoc , to generate an outer Java class, accompanied by a few subclasses. Amongst the generated code are methods to serialize and deserialize given an OutputStream and InputStream, respectively. Refactoring .proto files is somewhat tricky given the way PBs work (read their documentation for more info), so Google recommends that PBs are wrapped inside of other classes and only used when serializing and deserializing. This structure fits in perfectly with Hadoop's Writable structure. That is, if a user wants to utilize PBs, they need to use protoc to create a Java class, which is essentially a Bean. They can then define a new Writable implementation that uses the protoc -generated class to serialize and deserialize. This is all possible without creating a ProtocolBuffersSerialization class because the default Serialization, org.apache.hadoop.io.serializer.WritableSerialization, delegates its read and write methods to the Writable that is being serialized or deserialized. A general ProtocolBuffersSerialization class would not fully utilize PBs to their fullest, because it would have to use a very primitive, generalized .proto file (for example a file with just one field: a large String). With that said, a few things can be done with regard to this feature: I can create an example that extends Text and overwrites its serialization methods to use a protoc -generated class I can begin extending Hadoop's Writable implementations to use PBs instead I can begin replacing Hadoop's Writable implementations to use PBs instead I can try and create a general ProtocolBuffersSerialization and see how it performs, though this solution seems against the premise of using PBs You can tell me that my understanding of PBs is completely wrong (please follow-up with a more accurate description if this is the case ) Before either option 2 or 3 is decided on, profiling should be done to ensure that PBs are in fact faster than Java's built in mechanism. If profiling proves PBs are faster in all cases, then it seems like option 3 would be the most desirable. However, perhaps more discussion should be made to determine if 2 or 3 or some other solution altogether is better. Again, I'm totally new here, so please argue with me if I have misunderstood Hadoop's workings, PBs, or anything else. While I'm waiting for responses, I can begin working on option 1 to prove my understanding of PBs is correct.
          Hide
          Tom White added a comment -

          Alex, Thanks for looking at this.

          It shouldn't be necessary to create a new Writable implementation for each protoc-generated class (if that's what you are suggesting). By writing a ProtocolBuffersSerialization it should be possible to avoid having to use Writables at all.

          I imagined that the implementation of ProtocolBuffersSerializer would create a CodedOutputStream in the open method, then call Message#writeTo with the CodedOutputStream in the serialize method. ProtocolBuffersDeserializer is a bit more tricky. It would find the com.google.protobuf.Descriptors.Descriptor for the message class being deserialized, then use DynamicMessage#parseFrom to construct a message from the descriptor and the input stream.

          To test this you could write some PB types to a Hadoop sequence file, then write a MapReduce program to process it and write it out to another sequence file containing PB types. See HADOOP-3787.

          Show
          Tom White added a comment - Alex, Thanks for looking at this. It shouldn't be necessary to create a new Writable implementation for each protoc-generated class (if that's what you are suggesting). By writing a ProtocolBuffersSerialization it should be possible to avoid having to use Writables at all. I imagined that the implementation of ProtocolBuffersSerializer would create a CodedOutputStream in the open method, then call Message#writeTo with the CodedOutputStream in the serialize method. ProtocolBuffersDeserializer is a bit more tricky. It would find the com.google.protobuf.Descriptors.Descriptor for the message class being deserialized, then use DynamicMessage#parseFrom to construct a message from the descriptor and the input stream. To test this you could write some PB types to a Hadoop sequence file, then write a MapReduce program to process it and write it out to another sequence file containing PB types. See HADOOP-3787 .
          Hide
          Alex Loddengaard added a comment -

          Tom, thanks for your feedback. I've created a thread on the Protocol Buffer discussion group; I'm having trouble deserializing. I wanted to link to the thread here for reference purposes and also in hopes of you giving your feedback .

          http://groups.google.com/group/protobuf/browse_thread/thread/19ab6bbb364fef35#

          Show
          Alex Loddengaard added a comment - Tom, thanks for your feedback. I've created a thread on the Protocol Buffer discussion group; I'm having trouble deserializing. I wanted to link to the thread here for reference purposes and also in hopes of you giving your feedback . http://groups.google.com/group/protobuf/browse_thread/thread/19ab6bbb364fef35#
          Hide
          Alex Loddengaard added a comment -

          Per Tom's advice, I created a Protocol Buffer (PB) serialization framework and tests to show its usage. I used HADOOP-3787 as a guide while doing so.

          I ran into a problem, though. My test, TestPBSerialization, is precisely the same as the test in HADOOP-3787 with the exception of using PBs instead of Thrift. My test threw PB exceptions due to issues with deserializing. I engaged in dialog with a Google employee via the the PB Google Group in hopes of diagnosing my problem. Our thread can be found here. The key point to the thread is that the InputStream passed to a PB Message instance during deserialization cannot have trailing binary data. For example, if a Message instance is serialized to "<binary>asdf", then giving "<binary>asdf<arbitrary binary>" to a PB deserializer will break by means of a PB Exception. This is due to serialized PB Message instances not being self-delimiting, which was a design decision made by Google to guarantee small serialized size and speed.

          I created a second test, TestPBSerializationIsolated, that demonstrates the correctness of PBSerializer and PBDeserializer, the two classes that actually do the work to serialize and deserialize. This test passed, hinting that perhaps there is an incompatibility with Hadoop's current workings and PBs.

          I then created a third test, TestPBHadoopStreams, which tries to understands the Hadoop stream that is given to PBDeserializer. Though this test is somewhat silly (it always passes with an assertTrue(true) – read the class comment for an explanation), its System.err output shows the makeup of the serialized StringMessage and the InputStream given for deserialization. I discovered that when Hadoop gives PBDeserializer an InputStream, the stream contains arbitrary binary data after the serialized PB Message instance. This is problematic for reasons I have previously discussed. I am confident that this extra arbitrary data is not a result of using PBs but instead a Hadoop implementation decision.

          To be very precise, below is a serialized StringMessage with a value, "testKey". Note that the below was copy-pasted from less, which is why strange ASCII characters are displayed:

          ^GtestKey
          

          Below is the stream given to PBDeserializer to deserialize:

          ^GtestKeyx���,I-.K�)M^E^@^S�^C�
          

          Again, take note to the trailing bits, starting with 'x'.

          Can someone comment on this issue? Was having trailing binary information a design decision? Can it be avoided somehow easily? Is there a way around this issue?

          In the meantime, I plan to dig deeper into Hadoop's inner workings to understand why the InputStream might have extra binary data. Similarly. I plan to use Hadoop's default Serialization to see if the InputStream also has arbitrary trailing bytes.

          Show
          Alex Loddengaard added a comment - Per Tom's advice, I created a Protocol Buffer (PB) serialization framework and tests to show its usage. I used HADOOP-3787 as a guide while doing so. I ran into a problem, though. My test, TestPBSerialization , is precisely the same as the test in HADOOP-3787 with the exception of using PBs instead of Thrift. My test threw PB exceptions due to issues with deserializing. I engaged in dialog with a Google employee via the the PB Google Group in hopes of diagnosing my problem. Our thread can be found here . The key point to the thread is that the InputStream passed to a PB Message instance during deserialization cannot have trailing binary data. For example, if a Message instance is serialized to "<binary>asdf", then giving "<binary>asdf<arbitrary binary>" to a PB deserializer will break by means of a PB Exception. This is due to serialized PB Message instances not being self-delimiting, which was a design decision made by Google to guarantee small serialized size and speed. I created a second test, TestPBSerializationIsolated , that demonstrates the correctness of PBSerializer and PBDeserializer , the two classes that actually do the work to serialize and deserialize. This test passed, hinting that perhaps there is an incompatibility with Hadoop's current workings and PBs. I then created a third test, TestPBHadoopStreams , which tries to understands the Hadoop stream that is given to PBDeserializer . Though this test is somewhat silly (it always passes with an assertTrue(true) – read the class comment for an explanation), its System.err output shows the makeup of the serialized StringMessage and the InputStream given for deserialization. I discovered that when Hadoop gives PBDeserializer an InputStream , the stream contains arbitrary binary data after the serialized PB Message instance. This is problematic for reasons I have previously discussed. I am confident that this extra arbitrary data is not a result of using PBs but instead a Hadoop implementation decision. To be very precise, below is a serialized StringMessage with a value, "testKey". Note that the below was copy-pasted from less , which is why strange ASCII characters are displayed: ^GtestKey Below is the stream given to PBDeserializer to deserialize: ^GtestKeyx���,I-.K�)M^E^@^S�^C� Again, take note to the trailing bits, starting with 'x'. Can someone comment on this issue? Was having trailing binary information a design decision? Can it be avoided somehow easily? Is there a way around this issue? In the meantime, I plan to dig deeper into Hadoop's inner workings to understand why the InputStream might have extra binary data. Similarly. I plan to use Hadoop's default Serialization to see if the InputStream also has arbitrary trailing bytes.
          Hide
          Alex Loddengaard added a comment -

          Attaching the patch and PB library. The patch is discussed in great detail in my previous comment.

          Show
          Alex Loddengaard added a comment - Attaching the patch and PB library. The patch is discussed in great detail in my previous comment.
          Hide
          Alex Loddengaard added a comment -

          A quick update: I applied Tom's patch from HADOOP-3787 to a fresh trunk build and looked at the InputStream given to ThriftDeserialization. This stream has trailing binary data as well, that very closely resembles the trailing binary I saw in my PB example. Here is the output from ThriftSerializer:

          Again, the strange characters below are a result of this being copy-pasted from less.

          ^@^A^@^@^@^@^@^@^@^A^@
          

          Here's the InputStream given to ThriftDeserializer

          ^@^A^@^@^@^@^@^@^@^A^@x��f`d```NN,a^@^@^D3^AH
          

          As far as I can tell, this find proves that PBs are not compatible with Hadoop's current implementation. Can someone verify this, please, and also recommend possible next steps towards compatibility? In the meantime I'll dig into Hadoop more. Thanks!

          Show
          Alex Loddengaard added a comment - A quick update: I applied Tom's patch from HADOOP-3787 to a fresh trunk build and looked at the InputStream given to ThriftDeserialization . This stream has trailing binary data as well, that very closely resembles the trailing binary I saw in my PB example. Here is the output from ThriftSerializer : Again, the strange characters below are a result of this being copy-pasted from less . ^@^A^@^@^@^@^@^@^@^A^@ Here's the InputStream given to ThriftDeserializer ^@^A^@^@^@^@^@^@^@^A^@x��f`d```NN,a^@^@^D3^AH As far as I can tell, this find proves that PBs are not compatible with Hadoop's current implementation. Can someone verify this, please, and also recommend possible next steps towards compatibility? In the meantime I'll dig into Hadoop more. Thanks!
          Hide
          Tom White added a comment -

          Looks like your on the right track with PBDeserializer and PBSerializer. I think one of the problems is with the sequence file format and how it interacts with protocol buffers.

          SequenceFile.Reader#next(Object) reads the next key and value into a single buffer DataInputBuffer which is given to PBDeserializer to deserialize from in the mergeFrom call. As you point out, PB reads to the end of the stream, so when it tries to read the key it consumes the whole buffer, consuming the value as well, which causes the exception. This is not a problem with other serialization frameworks that we have seen so far (Writable, Java Serialization, Thrift), which know how much of the stream to consume.

          We could fix this by having separate buffers for key and value, much like org.apache.hadoop.mapred.IFile does. Or perhaps we could change deserializer to take a length. The latter would only work if it is possible to restrict the number of bytes read from the stream in PB. Is this the case?

          Does it work if you don't try to read from a sequence file? Is your use-case based on being able to read from sequence files?

          A couple of other points:

          • PBDeserializerTracker reads from the stream twice, which isn't going to work. You need to tee the stream to do debugging.
          • Can we keep a Builder instance per deserializer rather than create a new one for each call to deserialize? We should call clear on it each time to reset its state.
          Show
          Tom White added a comment - Looks like your on the right track with PBDeserializer and PBSerializer. I think one of the problems is with the sequence file format and how it interacts with protocol buffers. SequenceFile.Reader#next(Object) reads the next key and value into a single buffer DataInputBuffer which is given to PBDeserializer to deserialize from in the mergeFrom call. As you point out, PB reads to the end of the stream, so when it tries to read the key it consumes the whole buffer, consuming the value as well, which causes the exception. This is not a problem with other serialization frameworks that we have seen so far (Writable, Java Serialization, Thrift), which know how much of the stream to consume. We could fix this by having separate buffers for key and value, much like org.apache.hadoop.mapred.IFile does. Or perhaps we could change deserializer to take a length. The latter would only work if it is possible to restrict the number of bytes read from the stream in PB. Is this the case? Does it work if you don't try to read from a sequence file? Is your use-case based on being able to read from sequence files? A couple of other points: PBDeserializerTracker reads from the stream twice, which isn't going to work. You need to tee the stream to do debugging. Can we keep a Builder instance per deserializer rather than create a new one for each call to deserialize? We should call clear on it each time to reset its state.
          Hide
          Tom White added a comment -

          Cleared the incompatible change flag, since as it stands this patch does not introduce any incompatible changes.

          Also cleared the release note, which is intended as documentation that goes into the release only after the change has been committed.

          Show
          Tom White added a comment - Cleared the incompatible change flag, since as it stands this patch does not introduce any incompatible changes. Also cleared the release note, which is intended as documentation that goes into the release only after the change has been committed.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12389811/protobuf-java-2.0.1.jar
          against trunk revision 693705.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          -1 patch. The patch command could not apply the patch.

          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3231/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12389811/protobuf-java-2.0.1.jar against trunk revision 693705. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3231/console This message is automatically generated.
          Hide
          Alex Loddengaard added a comment -

          Tom, thanks for correcting my JIRA misuse.

          PBs do not provide a mechanism to limit the amount of data read from a stream, so your solution of breaking key, value pairs into two streams is the approach we should take. I'll make this change.

          As for your other comments, I included the *Tracker classes and TestPBHadoopStreams to prove that streams have extra binary data. I was planning on removing these classes once it was widely understood that the streams had extra data, but I'll keep them around and modify them appropriately.

          Good idea on keeping a single Builder instance per deserializer. That's a much better solution.

          I'll get working on this and submit a patch. Thanks again, Tom!

          Show
          Alex Loddengaard added a comment - Tom, thanks for correcting my JIRA misuse. PBs do not provide a mechanism to limit the amount of data read from a stream, so your solution of breaking key, value pairs into two streams is the approach we should take. I'll make this change. As for your other comments, I included the *Tracker classes and TestPBHadoopStreams to prove that streams have extra binary data. I was planning on removing these classes once it was widely understood that the streams had extra data, but I'll keep them around and modify them appropriately. Good idea on keeping a single Builder instance per deserializer. That's a much better solution. I'll get working on this and submit a patch. Thanks again, Tom!
          Hide
          Tom White added a comment -

          PBs do not provide a mechanism to limit the amount of data read from a stream, so your solution of breaking key, value pairs into two streams is the approach we should take.

          Of course the other option is to propose changes to PB (which is open source) to limit the amount of data read. I think a change to CodedInputStream would be relatively simple.

          As a quick experiment I modified a working MapReduce program so that the deserializer read to the end of the stream. It failed in ReduceValuesIterator. So to make this work would require changing more than just SequenceFile. Perhaps this reveals a bug in the MR system - one that has been masked because existing serializers only consume as much as they need. (So if they are given more than they need it's not a problem.) Either way I worry about defining the contract for deserializers so that the end of the stream marks the end of the object being read as it might limit optimizations we may make in the future. What do others think?

          Show
          Tom White added a comment - PBs do not provide a mechanism to limit the amount of data read from a stream, so your solution of breaking key, value pairs into two streams is the approach we should take. Of course the other option is to propose changes to PB (which is open source) to limit the amount of data read. I think a change to CodedInputStream would be relatively simple. As a quick experiment I modified a working MapReduce program so that the deserializer read to the end of the stream. It failed in ReduceValuesIterator. So to make this work would require changing more than just SequenceFile. Perhaps this reveals a bug in the MR system - one that has been masked because existing serializers only consume as much as they need. (So if they are given more than they need it's not a problem.) Either way I worry about defining the contract for deserializers so that the end of the stream marks the end of the object being read as it might limit optimizations we may make in the future. What do others think?
          Hide
          Alex Loddengaard added a comment - - edited

          Attaching a new patch. Changes:

          • Removed *Tracker and TestPBHadoopStreams because they aren't very useful now that we've established streams have trailing data
          • Did not keep a single Builder instance in PBDeserializer, because Builders cannot be reused once build() has been called. From the PB API: "[build()] Construct the final message. Once [build()] is called, the Builder is no longer valid, and calling any other method may throw a NullPointerException. If you need to continue working with the builder after calling build(), clone() it first." I made the decision to just re-instantiate instead of clone, because I thought the performance differences were negligible. Please argue with me if I'm wrong.
          • Changed SequenceFile.Reader#next(Object)
          • Changed TestPBSerialization to just write and read a SequenceFile, respectively.
          • Created a new test, TestPBSerializationMapReduce, that uses PBs in a MapReduce program

          TestPBSerialization passes, but TestPBSerializationMapReduce does not, which means you're right, Tom, that other code will need to change, though I'm not familiar enough with Hadoop to say more than that. If we decide to move further along by changing Hadoop such that deserializers will never be given trailing data, then more guidance would be greatly appreciated .

          This patch breaks a few existing tests such as org.apache.hadoop.fs.TestCopyFiles and org.apache.hadoop.fs.TestFileSystem. It's unclear if my change causes these or if my lack of change to others areas does. Regardless, I think this proves that creating the contract of not having extra data in the Deserializer's InputStream would probably be a large change.

          There is a discussion going on in the PB Google Group about possibly making PBs self-delimiting. Take a look here. In summary, a few different people are trying to determine the best way to allow self-delimiting, though there hasn't been any talk about a schedule.

          Show
          Alex Loddengaard added a comment - - edited Attaching a new patch. Changes: Removed *Tracker and TestPBHadoopStreams because they aren't very useful now that we've established streams have trailing data Did not keep a single Builder instance in PBDeserializer , because Builders cannot be reused once build() has been called. From the PB API: " [build()] Construct the final message. Once [build()] is called, the Builder is no longer valid, and calling any other method may throw a NullPointerException. If you need to continue working with the builder after calling build(), clone() it first." I made the decision to just re-instantiate instead of clone, because I thought the performance differences were negligible. Please argue with me if I'm wrong. Changed SequenceFile.Reader#next(Object) Changed TestPBSerialization to just write and read a SequenceFile, respectively. Created a new test, TestPBSerializationMapReduce , that uses PBs in a MapReduce program TestPBSerialization passes, but TestPBSerializationMapReduce does not, which means you're right, Tom, that other code will need to change, though I'm not familiar enough with Hadoop to say more than that. If we decide to move further along by changing Hadoop such that deserializers will never be given trailing data, then more guidance would be greatly appreciated . This patch breaks a few existing tests such as org.apache.hadoop.fs.TestCopyFiles and org.apache.hadoop.fs.TestFileSystem . It's unclear if my change causes these or if my lack of change to others areas does. Regardless, I think this proves that creating the contract of not having extra data in the Deserializer 's InputStream would probably be a large change. There is a discussion going on in the PB Google Group about possibly making PBs self-delimiting. Take a look here . In summary, a few different people are trying to determine the best way to allow self-delimiting, though there hasn't been any talk about a schedule.
          Hide
          Doug Cutting added a comment -

          > I worry about defining the contract for deserializers so that the end of the stream marks the end of the object being read

          That's certainly not the contract in effect today . One possible advantage of such a contract is that, if the framework knows the length, it can communicate it in this way to the object, so that lengths need not be stored twice. For example, SequenceFile stores the lengths of keys and values, and, if keys and values are Text, we store their lenght again in the key and value. But reading until EOF seems a poor way of communicating this. Perhaps we could change the deserialize API to be optionally passed a length or somesuch.

          Show
          Doug Cutting added a comment - > I worry about defining the contract for deserializers so that the end of the stream marks the end of the object being read That's certainly not the contract in effect today . One possible advantage of such a contract is that, if the framework knows the length, it can communicate it in this way to the object, so that lengths need not be stored twice. For example, SequenceFile stores the lengths of keys and values, and, if keys and values are Text, we store their lenght again in the key and value. But reading until EOF seems a poor way of communicating this. Perhaps we could change the deserialize API to be optionally passed a length or somesuch.
          Hide
          Tom White added a comment -

          But reading until EOF seems a poor way of communicating this. Perhaps we could change the deserialize API to be optionally passed a length or somesuch.

          +1

          How about adding a method to Deserializer (an interface, sigh), which is called when the length of the serialized object is known:

          T deserialize(T t, int length) throws IOException;
          

          This still needs help from PB to work. I wonder if CodedInputStream#pushLimit() would do the trick?

          Show
          Tom White added a comment - But reading until EOF seems a poor way of communicating this. Perhaps we could change the deserialize API to be optionally passed a length or somesuch. +1 How about adding a method to Deserializer (an interface, sigh), which is called when the length of the serialized object is known: T deserialize(T t, int length) throws IOException; This still needs help from PB to work. I wonder if CodedInputStream#pushLimit() would do the trick?
          Hide
          Doug Cutting added a comment -

          > T deserialize(T t, int length) throws IOException;

          The serializer would have to know that it will be deserialized this way, so that it knows not to include the length. So we'd need a different serialize() method too.

          Show
          Doug Cutting added a comment - > T deserialize(T t, int length) throws IOException; The serializer would have to know that it will be deserialized this way, so that it knows not to include the length. So we'd need a different serialize() method too.
          Hide
          steve_l added a comment -

          1. I could imagine the deserializers being handed a special kind of stream that counts down the remaining bytes and then signals EOF.

          2. On a personal note, I think excluding length from packets is risky. It implies a protocol that assumes that TCP connections never stop half way. If you've ever seen a web browser only render half a photo, you know that assumption isn't always valid.

          Show
          steve_l added a comment - 1. I could imagine the deserializers being handed a special kind of stream that counts down the remaining bytes and then signals EOF. 2. On a personal note, I think excluding length from packets is risky. It implies a protocol that assumes that TCP connections never stop half way. If you've ever seen a web browser only render half a photo, you know that assumption isn't always valid.
          Hide
          Pete Wyckoff added a comment -

          could imagine the deserializers being handed a special kind of stream that counts down the remaining bytes and then signals EOF.

          One problem i ran into related to this is what if your deserializer does buffering? For EOF, one can return null from deserialize or in theory throw EOFException , but for getProgress, RecordReader's use getPos which is off if the deserializer has its own buffers. e.g., if you were to implement LineReaderDeserializer using LineRecordReader.LineReader.

          – pete

          Show
          Pete Wyckoff added a comment - could imagine the deserializers being handed a special kind of stream that counts down the remaining bytes and then signals EOF. One problem i ran into related to this is what if your deserializer does buffering? For EOF, one can return null from deserialize or in theory throw EOFException , but for getProgress, RecordReader's use getPos which is off if the deserializer has its own buffers. e.g., if you were to implement LineReaderDeserializer using LineRecordReader.LineReader. – pete
          Hide
          Alex Loddengaard added a comment -

          FYI: from the PB discussion group here:

          At Google, we have lots of various container formats, for streaming, record-based files, database tables,
          etc., where each record is a protocol buffer.  All of these formats store the size of the message before the
          message itself.  Our philosophy is that because we have protocol buffers, all of these *other* formats and
          protocols can be designed to pass around arbitrary byte blobs, which greatly simplifies them.  An arbitrary
          byte blob is not necessarily self-delimiting, so it's up to these container formats to keep track of the
          size separately.
          

          A possible solution would be to change the interface between Message instances and the PBSerialization framework such that a wrapping class, call it PBMessageWrapper contains the length and logic to delimit the stream. Instances of this interface could create a new stream for deserializing, though serializing would now become more tricky – the OutputStream when serializing would need meta data included in it. It might also be possible to create a general instance of PBMessageWrapper, instead of creating wrappers for each Message.

          Thoughts?

          Show
          Alex Loddengaard added a comment - FYI: from the PB discussion group here : At Google, we have lots of various container formats, for streaming, record-based files, database tables, etc., where each record is a protocol buffer. All of these formats store the size of the message before the message itself. Our philosophy is that because we have protocol buffers, all of these *other* formats and protocols can be designed to pass around arbitrary byte blobs, which greatly simplifies them. An arbitrary byte blob is not necessarily self-delimiting, so it's up to these container formats to keep track of the size separately. A possible solution would be to change the interface between Message instances and the PBSerialization framework such that a wrapping class, call it PBMessageWrapper contains the length and logic to delimit the stream. Instances of this interface could create a new stream for deserializing, though serializing would now become more tricky – the OutputStream when serializing would need meta data included in it. It might also be possible to create a general instance of PBMessageWrapper , instead of creating wrappers for each Message . Thoughts?
          Hide
          Tom White added a comment -

          the OutputStream when serializing would need meta data included in it

          I don't think we want to invent a new format here - this issue is to make serialization work with existing formats, such as SequenceFile (or the new TFile, or HADOOP-4065).

          As an experiment, I modified PBDeserializer to have a deserialize method that takes a length (in is now a CodedInputStream):

            public T deserialize(T t, int length) throws IOException {
              t = (t == null) ? (T) newInstance() : t;
              
              int limit = in.pushLimit(length);
              Message result =
                t.newBuilderForType().mergeFrom(in).build();
              in.popLimit(limit);
              
              return (T) result;
            }
          

          I then modified TestPBSerializationIsolated to serialize two strings to the stream. When using the deserialize method that doesn't take a length the test failed, but when I passed the length the test succeeded.

          So, I think we can do this without modifying Protocol Buffers. The change needed is the new method on Deserializer (and Serializer?) that takes a length, and then changes in the framework to call the new method when appropriate.

          Show
          Tom White added a comment - the OutputStream when serializing would need meta data included in it I don't think we want to invent a new format here - this issue is to make serialization work with existing formats, such as SequenceFile (or the new TFile, or HADOOP-4065 ). As an experiment, I modified PBDeserializer to have a deserialize method that takes a length ( in is now a CodedInputStream): public T deserialize(T t, int length) throws IOException { t = (t == null ) ? (T) newInstance() : t; int limit = in.pushLimit(length); Message result = t.newBuilderForType().mergeFrom(in).build(); in.popLimit(limit); return (T) result; } I then modified TestPBSerializationIsolated to serialize two strings to the stream. When using the deserialize method that doesn't take a length the test failed, but when I passed the length the test succeeded. So, I think we can do this without modifying Protocol Buffers. The change needed is the new method on Deserializer (and Serializer?) that takes a length, and then changes in the framework to call the new method when appropriate.
          Hide
          Tom White added a comment -

          Cancelling patch while discussion continues.

          Show
          Tom White added a comment - Cancelling patch while discussion continues.
          Hide
          Owen O'Malley added a comment -

          What went wrong with a FilterInputStream that has an artificial limit on the data read? Even better would be one that was resettable to a new length for the next object. Don't we already have one of those floating around Hadoop somewhere?

          Show
          Owen O'Malley added a comment - What went wrong with a FilterInputStream that has an artificial limit on the data read? Even better would be one that was resettable to a new length for the next object. Don't we already have one of those floating around Hadoop somewhere?
          Hide
          Doug Cutting added a comment -

          > Don't we already have one of those floating around Hadoop somewhere?

          If not, there's one in TFile (HADOOP-3315) that should be shared.

          Show
          Doug Cutting added a comment - > Don't we already have one of those floating around Hadoop somewhere? If not, there's one in TFile ( HADOOP-3315 ) that should be shared.
          Hide
          Doug Cutting added a comment -

          Should this instead go in contrib/protocol-buffer-serialization? It seems to me that this won't share a lot with other serialization implementations (e.g, HADOOP-3787) and that each should be packaged in its own jar. Separate contrib modules would achieve this.

          Show
          Doug Cutting added a comment - Should this instead go in contrib/protocol-buffer-serialization? It seems to me that this won't share a lot with other serialization implementations (e.g, HADOOP-3787 ) and that each should be packaged in its own jar. Separate contrib modules would achieve this.
          Hide
          Chris Dyer added a comment -

          Apologies for just jumping in on this and commenting without much context, but one of my great hopes for protocol buffer integration in Hadoop is that it will make it easier to integrate rich, structured data types with non-Java Hadoop clients (specifically c+, but python is also quite well supported in Hadoop). Specifically, it would be nice if there could be some kind of interface definition that would prescribe how non-Java clients could implement mappers and reducers that would return protocol buffer serializations. In most cases, I don't think the framework would even need to know anything about the data types (unless you were implementing some kind of custom comparator), but if I could easily mix and match languages I would be a much, much happier camper. Authors of this patch: can you estimate how difficult incorporating c+ support would be?

          Show
          Chris Dyer added a comment - Apologies for just jumping in on this and commenting without much context, but one of my great hopes for protocol buffer integration in Hadoop is that it will make it easier to integrate rich, structured data types with non-Java Hadoop clients (specifically c+ , but python is also quite well supported in Hadoop). Specifically, it would be nice if there could be some kind of interface definition that would prescribe how non-Java clients could implement mappers and reducers that would return protocol buffer serializations. In most cases, I don't think the framework would even need to know anything about the data types (unless you were implementing some kind of custom comparator), but if I could easily mix and match languages I would be a much, much happier camper. Authors of this patch: can you estimate how difficult incorporating c + support would be?
          Hide
          Alex Loddengaard added a comment -

          Chris, while I stagger with Protocol Buffers, you should look in to Thrift, which is described in HADOOP-3787.

          Similarly, you should learn about Hadoop Streaming (http://wiki.apache.org/hadoop/HadoopStreaming), which will let you run any executable (C++, Python, etc).

          Show
          Alex Loddengaard added a comment - Chris, while I stagger with Protocol Buffers, you should look in to Thrift, which is described in HADOOP-3787 . Similarly, you should learn about Hadoop Streaming ( http://wiki.apache.org/hadoop/HadoopStreaming ), which will let you run any executable (C++, Python, etc).
          Hide
          Tom White added a comment -

          A patch that correctly reads Protocol Buffers from a SequenceFile. MapReduce doesn't work yet since Protocol Buffers always create a new object on deserialization, so we need the new MapReduce interfaces from HADOOP-1230.

          I also moved the code to protocol-buffers-serialization under contrib, and updated to the latest PB release.

          Show
          Tom White added a comment - A patch that correctly reads Protocol Buffers from a SequenceFile. MapReduce doesn't work yet since Protocol Buffers always create a new object on deserialization, so we need the new MapReduce interfaces from HADOOP-1230 . I also moved the code to protocol-buffers-serialization under contrib, and updated to the latest PB release.
          Hide
          Chris Dyer added a comment -

          Other pieces of my system are using protocol buffers already, so I'm stuck with it for the pieces that have to interact with Hadoop. I am currently using HadoopStreaming, but the hoops I jump through are quite extensive- I serialize the protocol buffer to byte arrays and then encode them using base64 so that they can be put into Streaming's text format's key-value pair encoding, where the key is separated from the value by a tab and the record is terminated with a newline. These extra layers aren't really a problem since what i'm computing is computationally quite expensive (i could serialize to XML and it would be just a drop in the bucket in terms of running time). But, it does complicate my code in ways I think should be unnecessary. There is a paucity of information for how to use streaming with non-text data, so I haven't really been able to figure out of there's an easier way to do all of this.

          Show
          Chris Dyer added a comment - Other pieces of my system are using protocol buffers already, so I'm stuck with it for the pieces that have to interact with Hadoop. I am currently using HadoopStreaming, but the hoops I jump through are quite extensive- I serialize the protocol buffer to byte arrays and then encode them using base64 so that they can be put into Streaming's text format's key-value pair encoding, where the key is separated from the value by a tab and the record is terminated with a newline. These extra layers aren't really a problem since what i'm computing is computationally quite expensive (i could serialize to XML and it would be just a drop in the bucket in terms of running time). But, it does complicate my code in ways I think should be unnecessary. There is a paucity of information for how to use streaming with non-text data, so I haven't really been able to figure out of there's an easier way to do all of this.
          Hide
          Greg Roelofs added a comment -

          Is this issue still relevant, given its two-year slumber and the existence of Avro? (I've merely skimmed the discussion, so I don't know if the latter is actually relevant. Lack of progress and/or continued interest seems like it is, though.)

          Show
          Greg Roelofs added a comment - Is this issue still relevant, given its two-year slumber and the existence of Avro? (I've merely skimmed the discussion, so I don't know if the latter is actually relevant. Lack of progress and/or continued interest seems like it is, though.)
          Hide
          Tom White added a comment -

          Greg, I think the high-level issue is still relevant - as I can imagine there are folks who want to process PB-encoded data with MapReduce. I understand Owen is going to tackle this in HADOOP-6684.

          Show
          Tom White added a comment - Greg, I think the high-level issue is still relevant - as I can imagine there are folks who want to process PB-encoded data with MapReduce. I understand Owen is going to tackle this in HADOOP-6684 .
          Hide
          Owen O'Malley added a comment -

          This is certainly still relevant.

          Show
          Owen O'Malley added a comment - This is certainly still relevant.
          Hide
          Pere Ferrera Bertran added a comment -

          PB integration with Hadoop is now possible by using Protostuff (http://code.google.com/p/protostuff/) by calling ProtobufIOUtil.writeDelimitedTo() and ProtobufIOUtil.mergeDelimitedFrom() . These methods avoid the problem with consuming too many bytes from the stream.

          Show
          Pere Ferrera Bertran added a comment - PB integration with Hadoop is now possible by using Protostuff ( http://code.google.com/p/protostuff/ ) by calling ProtobufIOUtil.writeDelimitedTo() and ProtobufIOUtil.mergeDelimitedFrom() . These methods avoid the problem with consuming too many bytes from the stream.
          Hide
          Josh Hansen added a comment -

          writeDelimitedTo(OutputStream), mergeDelimitedFrom(InputStream), and parseDelimitedFrom(InputStream) have all made it into the standard Protocol Buffers library now. See https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/MessageLite#writeDelimitedTo(java.io.OutputStream) . That should resolve one obvious obstacle to addressing this issue.

          There were questions a few years ago about whether this issue is still relevant; I'm with Tom White that it's very relevant for people who want to use their protobuf data in Hadoop MapReduce. Avro in particular doesn't meet the needs of my organization due to its lack of a sparse representation.

          Twitter's elephant-bird library (https://github.com/kevinweil/elephant-bird) provides some protobuf-in-Hadoop support, but it's less than obvious how to use it with protobufs that are not LZO-compressed.

          Show
          Josh Hansen added a comment - writeDelimitedTo(OutputStream), mergeDelimitedFrom(InputStream), and parseDelimitedFrom(InputStream) have all made it into the standard Protocol Buffers library now. See https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/MessageLite#writeDelimitedTo(java.io.OutputStream ) . That should resolve one obvious obstacle to addressing this issue. There were questions a few years ago about whether this issue is still relevant; I'm with Tom White that it's very relevant for people who want to use their protobuf data in Hadoop MapReduce. Avro in particular doesn't meet the needs of my organization due to its lack of a sparse representation. Twitter's elephant-bird library ( https://github.com/kevinweil/elephant-bird ) provides some protobuf-in-Hadoop support, but it's less than obvious how to use it with protobufs that are not LZO-compressed.

            People

            • Assignee:
              Alex Loddengaard
              Reporter:
              Tom White
            • Votes:
              5 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

              • Created:
                Updated:

                Development