Pig
  1. Pig
  2. PIG-2550

Custom tuple results in "Unexpected datatype 110 while reading tuplefrom binary file" while spilling

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1, 0.9.1, 0.10.0
    • Fix Version/s: 0.10.0, 0.9.3, 0.11
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      In the below script ;

      a = load 'gen_data/' AS (f1,f2);
      b = load 'gen_data_02/' AS (f1,f2);
      c = cogroup a by f1,b by f1;
      d = foreach c generate group,flatten(a),COUNT(b),flatten(UDFReturningMyCustomTuple(b,a));
      store d into 'test006';
      

      The udf (UDFReturningMyCustomTuple) returns a bag which contains custom tuples.
      The script execution fails at the reducer side with the below exception while reading back the spilled data,

      2012-02-23 10:37:16,840 FATAL org.apache.pig.data.DefaultDataBag: Unable to read our spill file.
      org.apache.pig.backend.executionengine.ExecException: ERROR 2112: Unexpected datatype 110 while reading tuple from binary file.
      at org.apache.pig.data.BinInterSedes.getTupleSize(BinInterSedes.java:133)
      at org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSedes.java:556)
      at org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.java:66)
      at org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.next(DefaultDataBag.java:215)
      at org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.hasNext(DefaultDataBag.java:158)
      at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:301)
      at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:208)
      at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:459)
      at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:427)
      at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:407)
      at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:261)
      at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
      at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
      at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
      at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:396)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1082)
      at org.apache.hadoop.mapred.Child.main(Child.java:249)

      It looks like while spilling we do MyCustomTuple.write(DataOutput out) which writes the type as DataType.TUPLE (110),
      but while reading back we always use BinSedesTuple.

      1. REPRODUCING_SPILL_ERROR.txt
        3 kB
        Vivek Padmanabhan
      2. PIG-2550-1.patch
        2 kB
        Daniel Dai

        Issue Links

          Activity

          Hide
          Daniel Dai added a comment -

          Committed to 0.9/0.10/trunk. For trunk, it is actually fixed in PIG-2359. Only commit the test case to trunk.

          Show
          Daniel Dai added a comment - Committed to 0.9/0.10/trunk. For trunk, it is actually fixed in PIG-2359 . Only commit the test case to trunk.
          Hide
          Amol Kekre added a comment -

          Daniel, Thejas,
          Can this patch be committed. We are looking to launch it on grids asap.

          Show
          Amol Kekre added a comment - Daniel, Thejas, Can this patch be committed. We are looking to launch it on grids asap.
          Hide
          Thejas M Nair added a comment -

          That only fixes the spilling behavior. It does not fix the fact that custom tuples whose serialization does not match BinInterSedes can't be used across the Map-Reduce boundary.

          Yes, the tuples will be serialized using the BinInterSedes format at the map-reduce boundary, and BinInterTuples will be created after the boundary. I don't see a bug there.

          +1 for the patch.

          Show
          Thejas M Nair added a comment - That only fixes the spilling behavior. It does not fix the fact that custom tuples whose serialization does not match BinInterSedes can't be used across the Map-Reduce boundary. Yes, the tuples will be serialized using the BinInterSedes format at the map-reduce boundary, and BinInterTuples will be created after the boundary. I don't see a bug there. +1 for the patch.
          Hide
          Daniel Dai added a comment -

          That's true. After serialization/deserialization, you will get BinSedesTuple instead of custom tuple.

          Show
          Daniel Dai added a comment - That's true. After serialization/deserialization, you will get BinSedesTuple instead of custom tuple.
          Hide
          Dmitriy V. Ryaboy added a comment -

          That only fixes the spilling behavior. It does not fix the fact that custom tuples whose serialization does not match BinInterSedes can't be used across the Map-Reduce boundary.

          Show
          Dmitriy V. Ryaboy added a comment - That only fixes the spilling behavior. It does not fix the fact that custom tuples whose serialization does not match BinInterSedes can't be used across the Map-Reduce boundary.
          Hide
          Vivek Padmanabhan added a comment -

          Thanks Daniel, the script goes through fine with the patch

          Show
          Vivek Padmanabhan added a comment - Thanks Daniel, the script goes through fine with the patch
          Hide
          Daniel Dai added a comment -

          Vivek, can you try the patch?

          Show
          Daniel Dai added a comment - Vivek, can you try the patch?
          Hide
          Daniel Dai added a comment -

          I was able to reproduce now. The problem is we spill data using custom tuple's serializer, but when we read it back, we use BinInterSedes. I try to make a fix tomorrow.

          Show
          Daniel Dai added a comment - I was able to reproduce now. The problem is we spill data using custom tuple's serializer, but when we read it back, we use BinInterSedes. I try to make a fix tomorrow.
          Hide
          Amol Kekre added a comment -

          Any updates on when we can get a fix?

          Show
          Amol Kekre added a comment - Any updates on when we can get a fix?
          Hide
          Vivek Padmanabhan added a comment -

          For the error to happen DefaultDataBag should be spilling data on reduce side. This test was run on MR mode.

          gen_data is same as asgen_data_02 but with a smaller value.

          import java.io.BufferedWriter;
          import java.io.File;
          import java.io.FileWriter;
          import java.io.IOException;
          public class Gen {
          
              public static void main(String[] args) throws IOException {
                  BufferedWriter bw = new BufferedWriter( new FileWriter( new File ("tmp_data")));
          
                  StringBuffer sb = new StringBuffer ();
                  for(int i=0;i < 200; i ++)
                      sb.append(i);
                  bw.write(sb.toString());
                  bw.write("\t");
                  
          	for(int i=0;i < 5*1000 ; i ++)
                      bw.write(""+i%10);
                  bw.close();        
              }
          }
          

          BTW I tried to set pig.data.tuple.factory.name to org.apache.pig.data.DefaultTupleFactory but the property is not picked up in the map/reduce tasks.

          Show
          Vivek Padmanabhan added a comment - For the error to happen DefaultDataBag should be spilling data on reduce side. This test was run on MR mode. gen_data is same as asgen_data_02 but with a smaller value. import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; public class Gen { public static void main( String [] args) throws IOException { BufferedWriter bw = new BufferedWriter( new FileWriter( new File ( "tmp_data" ))); StringBuffer sb = new StringBuffer (); for ( int i=0;i < 200; i ++) sb.append(i); bw.write(sb.toString()); bw.write( "\t" ); for ( int i=0;i < 5*1000 ; i ++) bw.write(""+i%10); bw.close(); } } BTW I tried to set pig.data.tuple.factory.name to org.apache.pig.data.DefaultTupleFactory but the property is not picked up in the map/reduce tasks.
          Hide
          Daniel Dai added a comment -

          I follow your instructions but not able to reproduce it. In step 2, you mention how gen_data_02 is generated, but how gen_data is generated? Is that reproducible in local mode?

          Show
          Daniel Dai added a comment - I follow your instructions but not able to reproduce it. In step 2, you mention how gen_data_02 is generated, but how gen_data is generated? Is that reproducible in local mode?
          Hide
          Vivek Padmanabhan added a comment -

          I got the script running after overriding read and write methods in the custom tuple

          public class MyCustomTuple extends DefaultTuple {
              private static final long serialVersionUID = 8156382697467819543L;
              private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance();
              public MyCustomTuple() {
                  super();
              }
              public MyCustomTuple(Object t) {
                  super();
                  append(t);
              }
              public void write(DataOutput out) throws IOException {
                  sedes.writeDatum(out, this);
              }
              public void readFields(DataInput in) throws IOException {
                  // Clear our fields, in case we're being reused.
                  mFields.clear();
                  sedes.addColsToTuple(in, this);
              } 
          
          }
          

          I am not sure whether overriding write() will have any other impacts. Could this be considered as a workaround ?

          Show
          Vivek Padmanabhan added a comment - I got the script running after overriding read and write methods in the custom tuple public class MyCustomTuple extends DefaultTuple { private static final long serialVersionUID = 8156382697467819543L; private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance(); public MyCustomTuple() { super (); } public MyCustomTuple( Object t) { super (); append(t); } public void write(DataOutput out) throws IOException { sedes.writeDatum(out, this ); } public void readFields(DataInput in) throws IOException { // Clear our fields, in case we're being reused. mFields.clear(); sedes.addColsToTuple(in, this ); } } I am not sure whether overriding write() will have any other impacts. Could this be considered as a workaround ?
          Hide
          Vivek Padmanabhan added a comment -

          Attaching the artifacts used to reproduce this issue.

          -Dmapred.reduce.child.java.opts="-Xmx512M"

          It looks like this job runs fine with Pig 0.7

          Show
          Vivek Padmanabhan added a comment - Attaching the artifacts used to reproduce this issue. -Dmapred.reduce.child.java.opts="-Xmx512M" It looks like this job runs fine with Pig 0.7

            People

            • Assignee:
              Daniel Dai
              Reporter:
              Vivek Padmanabhan
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development