Pig
  1. Pig
  2. PIG-3255

Avoid extra byte array copies in streaming

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.11
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None

      Description

      PigStreaming.java:

      public Tuple deserialize(byte[] bytes) throws IOException

      { Text val = new Text(bytes); return StorageUtil.textToTuple(val, fieldDel); }

      Should remove new Text(bytes) copy and construct the tuple directly from the bytes

      1. PIG-3255-5.patch
        21 kB
        Rohini Palaniswamy
      2. PIG-3255-4.patch
        21 kB
        Rohini Palaniswamy
      3. PIG-3255-3.patch
        55 kB
        Rohini Palaniswamy
      4. PIG-3255-2.patch
        12 kB
        Rohini Palaniswamy
      5. PIG-3255-1.patch
        3 kB
        Rohini Palaniswamy

        Issue Links

          Activity

          Rohini Palaniswamy created issue -
          Hide
          Rohini Palaniswamy added a comment -

          OutputHandler.java:

          Text value = new Text();
                  int num = in.readLine(value);
                  if (num <= 0) {
                      return null;
                  }
                  
                  byte[] newBytes = new byte[value.getLength()];
                  System.arraycopy(value.getBytes(), 0, newBytes, 0, value.getLength());
                  return deserializer.deserialize(newBytes);
          

          We can cut down another copy here if value.getLength() == value.getBytes().length as that would be the case mostly.

          Show
          Rohini Palaniswamy added a comment - OutputHandler.java: Text value = new Text(); int num = in.readLine(value); if (num <= 0) { return null ; } byte [] newBytes = new byte [value.getLength()]; System .arraycopy(value.getBytes(), 0, newBytes, 0, value.getLength()); return deserializer.deserialize(newBytes); We can cut down another copy here if value.getLength() == value.getBytes().length as that would be the case mostly.
          Rohini Palaniswamy made changes -
          Field Original Value New Value
          Description  public Tuple deserialize(byte[] bytes) throws IOException {
                  Text val = new Text(bytes);
                  return StorageUtil.textToTuple(val, fieldDel);
              }

          Should remove new Text(bytes) copy and construct the tuple directly from the bytes
          PigStreaming.java:

           public Tuple deserialize(byte[] bytes) throws IOException {
                  Text val = new Text(bytes);
                  return StorageUtil.textToTuple(val, fieldDel);
              }

          Should remove new Text(bytes) copy and construct the tuple directly from the bytes
          Rohini Palaniswamy made changes -
          Assignee Rohini Palaniswamy [ rohini ]
          Rohini Palaniswamy made changes -
          Attachment PIG-3255-1.patch [ 12574632 ]
          Rohini Palaniswamy made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Koji Noguchi added a comment -

          +1 Looks good to me.
          Probably another Jira, but I wonder if we really need to create new Text for every streaming outputs. Can we reuse it with value.clear() ?
          (But if we do this, then in most cases value.getBytes().length <> value.getLength().)

          Show
          Koji Noguchi added a comment - +1 Looks good to me. Probably another Jira, but I wonder if we really need to create new Text for every streaming outputs. Can we reuse it with value.clear() ? (But if we do this, then in most cases value.getBytes().length <> value.getLength().)
          Hide
          Rohini Palaniswamy added a comment -

          Had a chat with Koji. He pointed out HADOOP-6109 which doubles the size of byte[] in Text every time a append happens.

          Text.java

          Hadoop 1.x

          private void setCapacity(int len, boolean keepData) {
              if (bytes == null || bytes.length < len) {
                byte[] newBytes = new byte[len];
                if (bytes != null && keepData) {
                  System.arraycopy(bytes, 0, newBytes, 0, length);
                }
                bytes = newBytes;
              }
            }
          

          Hadoop 0.23/2.x:

          private void setCapacity(int len, boolean keepData) {
              if (bytes == null || bytes.length < len) {
                if (bytes != null && keepData) {
                  bytes = Arrays.copyOf(bytes, Math.max(len,length << 1));
                } else {
                  bytes = new byte[len];
                }
              }
            }
          

          So value.getBytes().length == value.getLength() will be true only when the size of the line is < io.file.buffer.size. Since a copy of the byte[] needs to be created with the right size in any case, we can go with reusing the Text() for every getNext() in OutputHandler. It will be more beneficial when the record sizes are greater than io.file.buffer.size and value.getBytes().length is almost never equal to value.getLength() because of the doubling of the size.

          I will modify the patch to reuse Text object.

          Show
          Rohini Palaniswamy added a comment - Had a chat with Koji. He pointed out HADOOP-6109 which doubles the size of byte[] in Text every time a append happens. Text.java Hadoop 1.x private void setCapacity( int len, boolean keepData) { if (bytes == null || bytes.length < len) { byte [] newBytes = new byte [len]; if (bytes != null && keepData) { System .arraycopy(bytes, 0, newBytes, 0, length); } bytes = newBytes; } } Hadoop 0.23/2.x: private void setCapacity( int len, boolean keepData) { if (bytes == null || bytes.length < len) { if (bytes != null && keepData) { bytes = Arrays.copyOf(bytes, Math .max(len,length << 1)); } else { bytes = new byte [len]; } } } So value.getBytes().length == value.getLength() will be true only when the size of the line is < io.file.buffer.size. Since a copy of the byte[] needs to be created with the right size in any case, we can go with reusing the Text() for every getNext() in OutputHandler. It will be more beneficial when the record sizes are greater than io.file.buffer.size and value.getBytes().length is almost never equal to value.getLength() because of the doubling of the size. I will modify the patch to reuse Text object.
          Rohini Palaniswamy made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Rohini Palaniswamy added a comment -

          Attaching a patch, that gets rid of the byte array copy totally. But this requires a interface change and will cause backward incompatibility. If it was an abstract class could have added a default implementation retaining the old method. But since it is an interface, adding a new method will anyways give a runtime error. So removed the deserialize(bytes[]) method.

          I don't see any documentation on implementing StreamToPig. Not sure how many would actually be implementing this interface. Is this change acceptable for the sake of performance? Thoughts?

          Show
          Rohini Palaniswamy added a comment - Attaching a patch, that gets rid of the byte array copy totally. But this requires a interface change and will cause backward incompatibility. If it was an abstract class could have added a default implementation retaining the old method. But since it is an interface, adding a new method will anyways give a runtime error. So removed the deserialize(bytes[]) method. I don't see any documentation on implementing StreamToPig. Not sure how many would actually be implementing this interface. Is this change acceptable for the sake of performance? Thoughts?
          Rohini Palaniswamy made changes -
          Attachment PIG-3255-2.patch [ 12600406 ]
          Rohini Palaniswamy made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Daniel Dai added a comment -

          I personally does not realize anyone using StreamToPig, but need to check with Alan Gates, since he marked it as public stable. Other part of the patch looks good. Avoiding 2 byte array copy and reuse Text object would save memory and enhance performance.

          Show
          Daniel Dai added a comment - I personally does not realize anyone using StreamToPig, but need to check with Alan Gates , since he marked it as public stable. Other part of the patch looks good. Avoiding 2 byte array copy and reuse Text object would save memory and enhance performance.
          Hide
          Rohini Palaniswamy added a comment -

          Alan Gates,
          Comments ?

          Show
          Rohini Palaniswamy added a comment - Alan Gates , Comments ?
          Hide
          Daniel Dai added a comment -

          PIG-2417 stream udf use this interface. Need to get it solved before check in.

          Show
          Daniel Dai added a comment - PIG-2417 stream udf use this interface. Need to get it solved before check in.
          Hide
          Jeremy Karn added a comment -

          I think this change makes sense and it should be easy for me to update the patch in PIG-2417 once this is merged in.

          Show
          Jeremy Karn added a comment - I think this change makes sense and it should be easy for me to update the patch in PIG-2417 once this is merged in.
          Hide
          Rohini Palaniswamy added a comment -

          If the interface change is ok, then thinking of changing even the PigToStream.java interface

          public byte[] serialize(Tuple t) throws IOException;

          to

          public DataBuffer serialize(Tuple t) throws IOException;

          where DataBuffer will be same as
          http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataOutputBuffer.java?revision=1306187&view=markup

          Don't want to use DataOutputBuffer itself as it is marked
          @InterfaceAudience.LimitedPrivate(

          {"HDFS", "MapReduce"}

          )
          @InterfaceStability.Unstable

          This will get rid of one more byte array copy. Thoughts ?

          Show
          Rohini Palaniswamy added a comment - If the interface change is ok, then thinking of changing even the PigToStream.java interface public byte[] serialize(Tuple t) throws IOException; to public DataBuffer serialize(Tuple t) throws IOException; where DataBuffer will be same as http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/DataOutputBuffer.java?revision=1306187&view=markup Don't want to use DataOutputBuffer itself as it is marked @InterfaceAudience.LimitedPrivate( {"HDFS", "MapReduce"} ) @InterfaceStability.Unstable This will get rid of one more byte array copy. Thoughts ?
          Show
          Rohini Palaniswamy added a comment - https://reviews.apache.org/r/14030/
          Rohini Palaniswamy made changes -
          Attachment PIG-3255-3.patch [ 12602079 ]
          Hide
          Alan Gates added a comment -

          I don't know if anyone is using StreamToPig either, but marking an interface as stable and then changing it without deprecation or anything isn't cool. So no, I don't think this change is ok.

          We could add the proposed function "public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException;" to the interface and change Pig to call it if it's present or use the old one if not.

          Show
          Alan Gates added a comment - I don't know if anyone is using StreamToPig either, but marking an interface as stable and then changing it without deprecation or anything isn't cool. So no, I don't think this change is ok. We could add the proposed function "public Tuple deserialize(byte[] bytes, int offset, int length) throws IOException;" to the interface and change Pig to call it if it's present or use the old one if not.
          Hide
          Rohini Palaniswamy added a comment -

          Alan,
          If you add a method to the interface, won't it break existing compiled code at runtime anyways ?

          Show
          Rohini Palaniswamy added a comment - Alan, If you add a method to the interface, won't it break existing compiled code at runtime anyways ?
          Hide
          Jeremy Karn added a comment -

          If the performance gain is significant enough you could deprecate the existing interface and create a new interface that extends it adding just the new deserialize method. It's ugly though.

          Show
          Jeremy Karn added a comment - If the performance gain is significant enough you could deprecate the existing interface and create a new interface that extends it adding just the new deserialize method. It's ugly though.
          Hide
          Alan Gates added a comment -

          At compile time, but not at runtime. At runtime Pig would need to reflect the class implementing StreamToPig and see if it contained a deserialize method that matches your new signature. You could then pick which method to call based on that. As Jeremy suggests, you could instead do that with a new interface (PigToStreamV2) and then at compile time determine which interface is being implemented and act accordingly. This is actually better than what I initially suggested as the determination can be made at compile time. If you choose this route you should also change PIgToStreamV2 to an abstract class so that in the future we can add methods without going through this dance.

          Show
          Alan Gates added a comment - At compile time, but not at runtime. At runtime Pig would need to reflect the class implementing StreamToPig and see if it contained a deserialize method that matches your new signature. You could then pick which method to call based on that. As Jeremy suggests, you could instead do that with a new interface (PigToStreamV2) and then at compile time determine which interface is being implemented and act accordingly. This is actually better than what I initially suggested as the determination can be made at compile time. If you choose this route you should also change PIgToStreamV2 to an abstract class so that in the future we can add methods without going through this dance.
          Hide
          Rohini Palaniswamy added a comment -

          Thanks Jeremy and Alan. I will go with a new v2 abstract class approach and deprecate the old one.

          Show
          Rohini Palaniswamy added a comment - Thanks Jeremy and Alan. I will go with a new v2 abstract class approach and deprecate the old one.
          Hide
          Rohini Palaniswamy added a comment -

          Came across a good article - http://haacked.com/archive/2008/02/20/versioning-issues-with-abstract-base-classes-and-interfaces.aspx. Similar approach but cleaner to code. Idea is to create StreamToPigBase and PigToStreamBase abstract classes that implement StreamToPig and PigToStream interfaces respectively and add the new method there. In the InputHandler and OutputHandler check if it is an instanceof StreamToPigBase, then call new method else call the old interface one. With this don't have to check if v1 or v2 interface is implemented during reflection and change Input/OutputHandler and its implementations to set two different serializer/de-serializers. Will still go ahead and deprecate the interface so that it can be removed in the next release.

          Show
          Rohini Palaniswamy added a comment - Came across a good article - http://haacked.com/archive/2008/02/20/versioning-issues-with-abstract-base-classes-and-interfaces.aspx . Similar approach but cleaner to code. Idea is to create StreamToPigBase and PigToStreamBase abstract classes that implement StreamToPig and PigToStream interfaces respectively and add the new method there. In the InputHandler and OutputHandler check if it is an instanceof StreamToPigBase, then call new method else call the old interface one. With this don't have to check if v1 or v2 interface is implemented during reflection and change Input/OutputHandler and its implementations to set two different serializer/de-serializers. Will still go ahead and deprecate the interface so that it can be removed in the next release.
          Hide
          Alan Gates added a comment -

          +1

          Show
          Alan Gates added a comment - +1
          Hide
          Rohini Palaniswamy added a comment -

          Updated review -https://reviews.apache.org/r/14030/

          Wish we had this in jdk 6 instead of 8 - http://blog.hartveld.com/2013/03/jdk-8-13-interface-default-method.html

          One thing I did was create only abstract class PigStreamingBase instead of one each for the serializing and deserializing. Hope that is ok.

          Show
          Rohini Palaniswamy added a comment - Updated review - https://reviews.apache.org/r/14030/ Wish we had this in jdk 6 instead of 8 - http://blog.hartveld.com/2013/03/jdk-8-13-interface-default-method.html One thing I did was create only abstract class PigStreamingBase instead of one each for the serializing and deserializing. Hope that is ok.
          Rohini Palaniswamy made changes -
          Attachment PIG-3255-4.patch [ 12603090 ]
          Rohini Palaniswamy made changes -
          Link This issue is required by PIG-2417 [ PIG-2417 ]
          Hide
          Rohini Palaniswamy added a comment -

          Patch with the missing @Override for Daniel's comment in https://reviews.apache.org/r/14030/.

          Alan Gates,
          Do you want me to wait for your code review on this patch or can I go ahead and commit with Daniel's +1?

          Show
          Rohini Palaniswamy added a comment - Patch with the missing @Override for Daniel's comment in https://reviews.apache.org/r/14030/ . Alan Gates , Do you want me to wait for your code review on this patch or can I go ahead and commit with Daniel's +1?
          Rohini Palaniswamy made changes -
          Attachment PIG-3255-5.patch [ 12603391 ]
          Hide
          Alan Gates added a comment -

          I gave my +1 above, so we're good from my viewpoint.

          Show
          Alan Gates added a comment - I gave my +1 above, so we're good from my viewpoint.
          Hide
          Rohini Palaniswamy added a comment -

          Committed to trunk. Thanks Alan, Daniel and Jeremy

          Show
          Rohini Palaniswamy added a comment - Committed to trunk. Thanks Alan, Daniel and Jeremy
          Rohini Palaniswamy made changes -
          Summary Avoid extra byte array copy in streaming deserialize Avoid extra byte array copies in streaming
          Rohini Palaniswamy made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Daniel Dai made changes -
          Status Resolved [ 5 ] Closed [ 6 ]

            People

            • Assignee:
              Rohini Palaniswamy
              Reporter:
              Rohini Palaniswamy
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development