Uploaded image for project: 'Avro'
  1. Avro
  2. AVRO-1786

Strange IndexOutofBoundException in GenericDatumReader.readString

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.7.4, 1.7.7
    • Fix Version/s: None
    • Component/s: java
    • Labels:
      None
    • Environment:

      CentOS 6.5 Linux x64, 2.6.32-358.14.1.el6.x86_64
      Use IBM JVM:
      IBM J9 VM (build 2.7, JRE 1.7.0 Linux amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT enabled)

      Description

      Our production cluster is CENTOS 6.5 (2.6.32-358.14.1.el6.x86_64), running IBM BigInsight V3.0.0.2. In Apache term, it is Hadoop 2.2.0 with MRV1(no yarn), and comes with AVRO 1.7.4, running with IBM J9 VM (build 2.7, JRE 1.7.0 Linux amd64-64 Compressed References 20140515_199835 (JIT enabled, AOT enabled). Not sure if the JDK matters, but it is NOT Oracle JVM.

      We have a ETL implemented in a chain of MR jobs. In one MR job, it is going to merge 2 sets of AVRO data. Dataset1 is in HDFS location A, and Dataset2 is in HDFS location B, and both contains the AVRO records binding to the same AVRO schema. The record contains an unique id field, and a timestamp field. The MR job is to merge the records based on the ID, and use the later timestamp record to replace previous timestamp record, and omit the final AVRO record out. Very straightforward.

      Now we faced a problem that one reducer keeps failing with the following stacktrace on JobTracker:

      java.lang.IndexOutOfBoundsException
      	at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:191)
      	at java.io.DataInputStream.read(DataInputStream.java:160)
      	at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
      	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
      	at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
      	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
      	at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:143)
      	at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:125)
      	at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:121)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
      	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
      	at org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:108)
      	at org.apache.avro.hadoop.io.AvroDeserializer.deserialize(AvroDeserializer.java:48)
      	at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:142)
      	at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:117)
      	at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:297)
      	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:165)
      	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
      	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
      	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
      	at java.security.AccessController.doPrivileged(AccessController.java:366)
      	at javax.security.auth.Subject.doAs(Subject.java:572)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1502)
      	at org.apache.hadoop.mapred.Child.main(Child.java:249)
      

      Here is the my Mapper and Reducer methods:
      Mapper:
      public void map(AvroKey<SpecificRecord> key, NullWritable value, Context context) throws IOException, InterruptedException
      Reducer:
      protected void reduce(CustomPartitionKeyClass key, Iterable<AvroValue<SpecificRecord>> values, Context context) throws IOException, InterruptedException

      What bother me are the following facts:
      1) All the mappers finish without error
      2) Most of the reducers finish without error, but one reducer keeps failing with the above error.
      3) It looks like caused by the data? But keep in mind that all the avro records passed the mapper side, but failed in one reducer.
      4) From the stacktrace, it looks like our reducer code was NOT invoked yet, but failed before that. So my guess is that all the AVRO records pass through the mapper side, but AVRO complains the intermediate result generated by the one mapper? In my understanding, that will be a Sequence file generated by Hadoop, and value part will be the AVRO bytes. Is the above error meaning that AVRO cannot deserialize the value part from the sequence file?
      5) Our ETL run fine for more than one year, but suddenly got this error starting from one day, and kept getting this problem after that.
      6) If it helps, here is the schema for the avro record:

      {
          "namespace" : "company name",
          "type" : "record",
          "name" : "Lists",
          "fields" : [
              {"name" : "account_id", "type" : "long"},
              {"name" : "list_id", "type" : "string"},
              {"name" : "sequence_id", "type" : ["int", "null"]} ,
              {"name" : "name", "type" : ["string", "null"]},
              {"name" : "state", "type" : ["string", "null"]},
              {"name" : "description", "type" : ["string", "null"]},
              {"name" : "dynamic_filtered_list", "type" : ["int", "null"]},
              {"name" : "filter_criteria", "type" : ["string", "null"]},
              {"name" : "created_at", "type" : ["long", "null"]},
              {"name" : "updated_at", "type" : ["long", "null"]},
              {"name" : "deleted_at", "type" : ["long", "null"]},
              {"name" : "favorite", "type" : ["int", "null"]},
              {"name" : "delta", "type" : ["boolean", "null"]},
              {
                  "name" : "list_memberships", "type" : {
                      "type" : "array", "items" : {
                          "name" : "ListMembership", "type" : "record",
                          "fields" : [
                              {"name" : "channel_id", "type" : "string"},
                              {"name" : "created_at", "type" : ["long", "null"]},
                              {"name" : "created_source", "type" : ["string", "null"]},
                              {"name" : "deleted_at", "type" : ["long", "null"]},
                              {"name" : "sequence_id", "type" : ["long", "null"]}
                          ]
                      }
                  }
              }
          ]
      }
      

        Activity

        Hide
        java8964 Yong Zhang added a comment -

        With lots of time debugging this, I found out some interested facts, and want to know if any one can provide more information related to this.

        In our AVRO schema, if the "list_id", which is a UUID, contain the following characters "3a3ffb10be8b11e3977ad4ae5284344f", that record will cause this exception.

        In this case, if you check the following Avro class: BinaryDecoder.java, from here:

        https://github.com/apache/avro/blob/release-1.7.4/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java

        The above link is pointing to release 1.7.4, and
        starting from line 259:

        int length = readInt();
        Utf8 result = (old != null ? old : new Utf8());
        result.setByteLength(length);
        if (0 != length)

        { doReadBytes(result.getBytes(), 0, length); }

        In this case when the exception happens, the length in fact is "-51", which is calculated from method "readInt()", causing IndexOutOfBoundsException.

        Based on AVRO-1198, when a negative value return in this case, it means the the data is Malformed, and this JIRA even patched a message to show it in the Exception.

        But in my case:

        The avro data is NOT malformed. Why? I can query the whole ready to merge data set in the Hive without any issue, even for the complained records.
        In my case, the real runtime object is "DirectBinaryDecoder", as you can see the stacktrace, doReadBytes method indeed point to this class.
        So the ReadInt() method returns "-51" in this case, must be from DirectBinaryDecoder class, which I list here:
        https://github.com/apache/avro/blob/release-1.7.4/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java
        starting from line 97, but I really don't know why in my case, it return "-51".
        I tested with AVRO 1.7.7 (The latest release of 1.7.x), the same problem still happens.

        Thanks

        Show
        java8964 Yong Zhang added a comment - With lots of time debugging this, I found out some interested facts, and want to know if any one can provide more information related to this. In our AVRO schema, if the "list_id", which is a UUID, contain the following characters "3a3ffb10be8b11e3977ad4ae5284344f", that record will cause this exception. In this case, if you check the following Avro class: BinaryDecoder.java, from here: https://github.com/apache/avro/blob/release-1.7.4/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java The above link is pointing to release 1.7.4, and starting from line 259: int length = readInt(); Utf8 result = (old != null ? old : new Utf8()); result.setByteLength(length); if (0 != length) { doReadBytes(result.getBytes(), 0, length); } In this case when the exception happens, the length in fact is "-51", which is calculated from method "readInt()", causing IndexOutOfBoundsException. Based on AVRO-1198 , when a negative value return in this case, it means the the data is Malformed, and this JIRA even patched a message to show it in the Exception. But in my case: The avro data is NOT malformed. Why? I can query the whole ready to merge data set in the Hive without any issue, even for the complained records. In my case, the real runtime object is "DirectBinaryDecoder", as you can see the stacktrace, doReadBytes method indeed point to this class. So the ReadInt() method returns "-51" in this case, must be from DirectBinaryDecoder class, which I list here: https://github.com/apache/avro/blob/release-1.7.4/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java starting from line 97, but I really don't know why in my case, it return "-51". I tested with AVRO 1.7.7 (The latest release of 1.7.x), the same problem still happens. Thanks
        Hide
        java8964 Yong Zhang added a comment -

        I modify the DirectBinaryDecorder class to add the following line from 116, at runtime to dump more information about it:

        if (((n >>> 1 ^ -(n & 1)) < 0)

        { System.out.println("Got ((n >>> 1 ^ -(n & 1)) = " + ((n >>> 1 ^ -(n & 1))); System.out.println("And b = " + b); System.out.println("And shift = " + shift); System.out.println("And n = " + n); }

        Here is the output:
        Got ((n >>> 1) ^ -(n & 1)) = -51
        And b = 101
        And shift = 0
        And n = 101

        Not sure what do these values mean, but does it help to prove this is a bug?

        Thanks

        Show
        java8964 Yong Zhang added a comment - I modify the DirectBinaryDecorder class to add the following line from 116, at runtime to dump more information about it: if (((n >>> 1 ^ -(n & 1)) < 0) { System.out.println("Got ((n >>> 1 ^ -(n & 1)) = " + ((n >>> 1 ^ -(n & 1))); System.out.println("And b = " + b); System.out.println("And shift = " + shift); System.out.println("And n = " + n); } Here is the output: Got ((n >>> 1) ^ -(n & 1)) = -51 And b = 101 And shift = 0 And n = 101 Not sure what do these values mean, but does it help to prove this is a bug? Thanks
        Hide
        java8964 Yong Zhang added a comment -

        It looks like AVRO assume the length byte for utf8 is an even number, but in this case, the byte coming from the data is an odd number (101), but the avro data in this case is read in Hive without any issue. Under what case this will happen?

        Show
        java8964 Yong Zhang added a comment - It looks like AVRO assume the length byte for utf8 is an even number, but in this case, the byte coming from the data is an odd number (101), but the avro data in this case is read in Hive without any issue. Under what case this will happen?
        Hide
        rdblue Ryan Blue added a comment -

        Hi Yong Zhang. From that stack trace, it looks like the problem is when the reducer is reading data written out by the mapper. That's why you can read the source data just fine, the problem is in the job's intermediate data. As far as why this happens on just one reducer, can you post the job counters? This could be explained by there only being one output key or just one reduce task.

        This sort of error usually happens when an Avro file is corrupt, or when another binary format is read as Avro. Since this fails before calling your reducer, it looks like none of the data values are readable so I think it may be a configuration problem between your mapper and reducer. Could you post the code for where you set up this job?

        Show
        rdblue Ryan Blue added a comment - Hi Yong Zhang . From that stack trace, it looks like the problem is when the reducer is reading data written out by the mapper. That's why you can read the source data just fine, the problem is in the job's intermediate data. As far as why this happens on just one reducer, can you post the job counters? This could be explained by there only being one output key or just one reduce task. This sort of error usually happens when an Avro file is corrupt, or when another binary format is read as Avro. Since this fails before calling your reducer, it looks like none of the data values are readable so I think it may be a configuration problem between your mapper and reducer. Could you post the code for where you set up this job?
        Hide
        java8964 Yong Zhang added a comment -

        Hi, Ryan:

        I listed the Mapper and Reducer method signature in the ticket, and just listed here briefly for more explaining:

        Mapper side
        public void map(AvroKey<SpecificRecord> key, NullWritable value, Context context) throws IOException, InterruptedException

        { SpecificRecord specificRecord = key.datum(); context.write(outputKey, new AvroValue(lists)); }

        // So we know that the mapper doesn't complain anything about the Avor record

        @Override
        protected void reduce(PartitionKey key, Iterable<AvroValue<SpecificRecord>> values, Context context) throws IOException, InterruptedException {
        }
        // The PartitionKey object is a customer class to partition the value based on the ID, and also do the 2nd sort, to sort the data based on the timestamp.
        // I know the key part is fine, as the KeyDeserializer in the ReduceContextImpl works fine.

        The problem happened in the ValueDeserializer.deserialize(value), which indeed throws exception from AVRO codebase. I even hack the hadoop code to dump the key when the exception happens.
        Yes, I understand the exception is for the intermediate data, but I don't know why in this case AVRO treat this intermediate data as invalid AVRO data. In this case, most of the data are fine, as most reducers finished successfully, but from the dump, I can see at least 3 records throw exception in this case, then I stopped dumping more exception records.
        Our schema may change before (I will check that), but the new schema should be always compatible with the old data. For example, if the trouble data is in fact generated by old schema, it is still read fine in the Hive (which is defined as the latest schema) for these 3 records.

        What I am going to do next is:

        1) I will dump one good AVRO record + one bad AVRO record (I had the 3 ids) out, and just store them in a different AVRO file, then to see if I can still reproduce this problem, and then compare what is the difference between these 2 records.
        2) I will try to check the MR counter during this test.

        Show
        java8964 Yong Zhang added a comment - Hi, Ryan: I listed the Mapper and Reducer method signature in the ticket, and just listed here briefly for more explaining: Mapper side public void map(AvroKey<SpecificRecord> key, NullWritable value, Context context) throws IOException, InterruptedException { SpecificRecord specificRecord = key.datum(); context.write(outputKey, new AvroValue(lists)); } // So we know that the mapper doesn't complain anything about the Avor record @Override protected void reduce(PartitionKey key, Iterable<AvroValue<SpecificRecord>> values, Context context) throws IOException, InterruptedException { } // The PartitionKey object is a customer class to partition the value based on the ID, and also do the 2nd sort, to sort the data based on the timestamp. // I know the key part is fine, as the KeyDeserializer in the ReduceContextImpl works fine. The problem happened in the ValueDeserializer.deserialize(value), which indeed throws exception from AVRO codebase. I even hack the hadoop code to dump the key when the exception happens. Yes, I understand the exception is for the intermediate data, but I don't know why in this case AVRO treat this intermediate data as invalid AVRO data. In this case, most of the data are fine, as most reducers finished successfully, but from the dump, I can see at least 3 records throw exception in this case, then I stopped dumping more exception records. Our schema may change before (I will check that), but the new schema should be always compatible with the old data. For example, if the trouble data is in fact generated by old schema, it is still read fine in the Hive (which is defined as the latest schema) for these 3 records. What I am going to do next is: 1) I will dump one good AVRO record + one bad AVRO record (I had the 3 ids) out, and just store them in a different AVRO file, then to see if I can still reproduce this problem, and then compare what is the difference between these 2 records. 2) I will try to check the MR counter during this test.
        Hide
        rdblue Ryan Blue added a comment -

        The mapper and reducer code is less helpful than your job setup code. There are a few things that could be happening here and that job setup code is the most likely place for the problem to be. Is this a project in github you could point us to?

        Show
        rdblue Ryan Blue added a comment - The mapper and reducer code is less helpful than your job setup code. There are a few things that could be happening here and that job setup code is the most likely place for the problem to be. Is this a project in github you could point us to?
        Hide
        java8964 Yong Zhang added a comment -

        Sorry, the code base is in my company's github, which is NOT accessible from outside.

        I understand your concern, here is the brief code of the Driver class to start this MR job:

        Job mergeJob = new Job(getConf());
        mergeJob.setJarByClass(MergeDriver.class);
        Path internalOutputPath = new Path(new Path(new Path(baseDir, ETLConstants.WORKING_DIR), epoch), ETLConstants.MERGE_DIR);
        FileOutputFormat.setOutputPath(mergeJob, internalOutputPath);
        mergeJob.setJobName("Delta Lists merge Job");
        AvroJob.setInputKeySchema(mergeJob, Lists.SCHEMA$);
        AvroJob.setOutputKeySchema(mergeJob, Lists.SCHEMA$);
        AvroJob.setMapOutputValueSchema(mergeJob, Lists.SCHEMA$);
        AvroKeyInputFormat.addInputPath(mergeJob, new Path(new Path(baseDir, ETLConstants.DATA_DIR), ETLConstants.MULTI_OUTPUT_NAME_LISTS));
        mergeJob.setOutputFormatClass(AvroKeyOutputFormat.class);
        // Add the input data
        AvroKeyInputFormat.addInputPath(mergeJob, new Path(new Path(new Path(baseDir, ETLConstants.WORKING_DIR), epoch), ETLConstants.DETAL_DIR));
        mergeJob.setInputFormatClass(AvroKeyInputFormat.class);
        mergeJob.getConfiguration().set(ETLConstants.BASE_DIR, baseDir.toString());
        mergeJob.getConfiguration().set(ETLConstants.ETL_RUNTIME_EPOCH, epoch);
        mergeJob.setMapperClass(Contact2ETLMergeMapper.class);
        mergeJob.setReducerClass(Contact2ETLMergeReducer.class);
        mergeJob.setMapOutputKeyClass(Contact2MergePartitionKey.class);
        mergeJob.setMapOutputValueClass(AvroValue.class);
        mergeJob.setPartitionerClass(Contact2MergePartitioner.class);
        mergeJob.setSortComparatorClass(Contact2MergePartitionKey.Contact2MergePartitionKeyComparator.class);
        mergeJob.setGroupingComparatorClass(Contact2MergePartitionKey.Contact2MergePartitionGroupComparator.class);

        logger.info("Start running merge job...");
        if (mergeJob.waitForCompletion(true)) {
        }

        Show
        java8964 Yong Zhang added a comment - Sorry, the code base is in my company's github, which is NOT accessible from outside. I understand your concern, here is the brief code of the Driver class to start this MR job: Job mergeJob = new Job(getConf()); mergeJob.setJarByClass(MergeDriver.class); Path internalOutputPath = new Path(new Path(new Path(baseDir, ETLConstants.WORKING_DIR), epoch), ETLConstants.MERGE_DIR); FileOutputFormat.setOutputPath(mergeJob, internalOutputPath); mergeJob.setJobName("Delta Lists merge Job"); AvroJob.setInputKeySchema(mergeJob, Lists.SCHEMA$); AvroJob.setOutputKeySchema(mergeJob, Lists.SCHEMA$); AvroJob.setMapOutputValueSchema(mergeJob, Lists.SCHEMA$); AvroKeyInputFormat.addInputPath(mergeJob, new Path(new Path(baseDir, ETLConstants.DATA_DIR), ETLConstants.MULTI_OUTPUT_NAME_LISTS)); mergeJob.setOutputFormatClass(AvroKeyOutputFormat.class); // Add the input data AvroKeyInputFormat.addInputPath(mergeJob, new Path(new Path(new Path(baseDir, ETLConstants.WORKING_DIR), epoch), ETLConstants.DETAL_DIR)); mergeJob.setInputFormatClass(AvroKeyInputFormat.class); mergeJob.getConfiguration().set(ETLConstants.BASE_DIR, baseDir.toString()); mergeJob.getConfiguration().set(ETLConstants.ETL_RUNTIME_EPOCH, epoch); mergeJob.setMapperClass(Contact2ETLMergeMapper.class); mergeJob.setReducerClass(Contact2ETLMergeReducer.class); mergeJob.setMapOutputKeyClass(Contact2MergePartitionKey.class); mergeJob.setMapOutputValueClass(AvroValue.class); mergeJob.setPartitionerClass(Contact2MergePartitioner.class); mergeJob.setSortComparatorClass(Contact2MergePartitionKey.Contact2MergePartitionKeyComparator.class); mergeJob.setGroupingComparatorClass(Contact2MergePartitionKey.Contact2MergePartitionGroupComparator.class); logger.info("Start running merge job..."); if (mergeJob.waitForCompletion(true)) { }
        Hide
        java8964 Yong Zhang added a comment -

        One more thing. When I mean the other reducers finished successfully, I didn't mean these reducers didn't process any data.

        This source of this ETL job is to process hundred of M of records, and I can see all the output of other reducers in HDFS. To summary, for millions of AVRO records, most of them can pass this MR job without any issue, but some of records (so far I can find at least 3, and they just happened to send to the same reducer, and the first one will fail the reducer).

        Show
        java8964 Yong Zhang added a comment - One more thing. When I mean the other reducers finished successfully, I didn't mean these reducers didn't process any data. This source of this ETL job is to process hundred of M of records, and I can see all the output of other reducers in HDFS. To summary, for millions of AVRO records, most of them can pass this MR job without any issue, but some of records (so far I can find at least 3, and they just happened to send to the same reducer, and the first one will fail the reducer).
        Hide
        java8964 Yong Zhang added a comment -

        So right now, I can find this one AVRO file, (which is about 900M, compressed with Snappy, still too big), which contains at least one of the AVRO record will cause this this issue.

        I will try to dump this bad record, plus any good one record, and generate a new AVRO file, and if I can reproduce this problem with this small file, so I will attached in this ticket, so any of your AVRO expect can verify if this problem happened in your environment, or not.

        Show
        java8964 Yong Zhang added a comment - So right now, I can find this one AVRO file, (which is about 900M, compressed with Snappy, still too big), which contains at least one of the AVRO record will cause this this issue. I will try to dump this bad record, plus any good one record, and generate a new AVRO file, and if I can reproduce this problem with this small file, so I will attached in this ticket, so any of your AVRO expect can verify if this problem happened in your environment, or not.
        Hide
        java8964 Yong Zhang added a comment -

        Here is what I did, and here is the output result. I really don't know if this is an AVRO issue, or hadoop MR issue.

        I have this 900M avor file, and I know when this exception happened for my MR job, the reducer is trying to deserialize a value with key = '3a410d00656911e3ac0bd4ae52986b44', which is the list id in my schema.

        Now if I change my mapper, only omit the three key value pairs: one of them is the above key, and 2 more randomly pick up keys. To my surprise, I didn't get this exception in the reducer of MR job.

        If I remove the this filter, I will get this exception in MR job.

        So with more key/value pairs in the intermediate result, I will get this exception consistently for this key. But if I explicit pick up this avro record, and 2 other records, I cannot reproduce this problem.

        I can confirm the list id is unique in this AVRO file, and I also changed the ReducerContextImpl of Hadoop 2.2.0 codebase, to dump the key when the value deserialize failed like following:

        https://github.com/apache/hadoop/blob/release-2.2.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java

        line 145 ==> value = valueDeserializer.deserialize(value);

        So right now, I am stuck to simplify the data, any idea how should I debug this issue? Thanks
        to ==> try

        { value = valueDeserializer.deserialize(value); }

        catch (RuntimeException re)

        { System.out.println("key = " + key); throw re }
        Show
        java8964 Yong Zhang added a comment - Here is what I did, and here is the output result. I really don't know if this is an AVRO issue, or hadoop MR issue. I have this 900M avor file, and I know when this exception happened for my MR job, the reducer is trying to deserialize a value with key = '3a410d00656911e3ac0bd4ae52986b44', which is the list id in my schema. Now if I change my mapper, only omit the three key value pairs: one of them is the above key, and 2 more randomly pick up keys. To my surprise, I didn't get this exception in the reducer of MR job. If I remove the this filter, I will get this exception in MR job. So with more key/value pairs in the intermediate result, I will get this exception consistently for this key. But if I explicit pick up this avro record, and 2 other records, I cannot reproduce this problem. I can confirm the list id is unique in this AVRO file, and I also changed the ReducerContextImpl of Hadoop 2.2.0 codebase, to dump the key when the value deserialize failed like following: https://github.com/apache/hadoop/blob/release-2.2.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java line 145 ==> value = valueDeserializer.deserialize(value); So right now, I am stuck to simplify the data, any idea how should I debug this issue? Thanks to ==> try { value = valueDeserializer.deserialize(value); } catch (RuntimeException re) { System.out.println("key = " + key); throw re }
        Hide
        java8964 Yong Zhang added a comment -

        I copied the this 900M AVRO file to our DEV cluster, which has identical OS and software as our production cluster, but less nodes.

        I can reproduce this problem on another cluster, so this is a software issue, not related to environment.

        If in my mapper, I filter out other AVRO records, and only omit the 3 AVRO records as my previous test, I cannot trigger this exception.

        In this case, I can some different values of local variable of "b" in readInt() method of DirectBinaryDecorder, but none of them is "101", and of course, no negative value ever generated by this method.

        If I don't filter out any AVRO records, I will get this exception, and hit the "b" = 101 case. But there are too many records to track.

        Thanks

        Show
        java8964 Yong Zhang added a comment - I copied the this 900M AVRO file to our DEV cluster, which has identical OS and software as our production cluster, but less nodes. I can reproduce this problem on another cluster, so this is a software issue, not related to environment. If in my mapper, I filter out other AVRO records, and only omit the 3 AVRO records as my previous test, I cannot trigger this exception. In this case, I can some different values of local variable of "b" in readInt() method of DirectBinaryDecorder, but none of them is "101", and of course, no negative value ever generated by this method. If I don't filter out any AVRO records, I will get this exception, and hit the "b" = 101 case. But there are too many records to track. Thanks
        Hide
        rdblue Ryan Blue added a comment -

        Yong Zhang, when you get a chance, can you post the counters from your test jobs? Results from the successful run (with the filter) would be helpful as well.

        Any information you have on the values that cause this problem would be great. This looks suspiciously like a schema evolution change that was inconsistently applied. Did you change your schema lately?

        Show
        rdblue Ryan Blue added a comment - Yong Zhang , when you get a chance, can you post the counters from your test jobs? Results from the successful run (with the filter) would be helpful as well. Any information you have on the values that cause this problem would be great. This looks suspiciously like a schema evolution change that was inconsistently applied. Did you change your schema lately?
        Hide
        java8964 Yong Zhang added a comment -

        Hi, Ryan:

        First, I think this bug's priority should be Minor, as it is a vary rare case. So I changed it.

        In this test, I changed the class of https://github.com/apache/hadoop/blob/release-2.2.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java, and add the following lines right after line 144:
        if (key.toString().contains("xxxxx"))

        { System.out.println("current key length = " + (nextKey.getLength() - nextKey.getPosition())); System.out.println("current value length = " + (nextVal.getLength() - nextVal.getPosition())); }

        // in the above code "xxxx" is the key I know in the bad case, next line "value = valueDeserializer.deserialize(value)" will throw out exception. I want to check the value's length as bytes changed or not in both cases.

        Anyway, here is the count in the good case, which means the mapper only omit 3 records, include the "xxxxx" record:

        16/01/28 14:03:58 INFO mapred.JobClient: Job complete: job_201512111403_1109
        16/01/28 14:03:58 INFO mapred.JobClient: Counters: 31
        16/01/28 14:03:58 INFO mapred.JobClient: File System Counters
        16/01/28 14:03:58 INFO mapred.JobClient: FILE: BYTES_READ=524
        16/01/28 14:03:58 INFO mapred.JobClient: FILE: BYTES_WRITTEN=1888562
        16/01/28 14:03:58 INFO mapred.JobClient: HDFS: BYTES_READ=965292334
        16/01/28 14:03:58 INFO mapred.JobClient: HDFS: BYTES_WRITTEN=1508
        16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.JobCounter
        16/01/28 14:03:58 INFO mapred.JobClient: TOTAL_LAUNCHED_MAPS=9
        16/01/28 14:03:58 INFO mapred.JobClient: TOTAL_LAUNCHED_REDUCES=1
        16/01/28 14:03:58 INFO mapred.JobClient: DATA_LOCAL_MAPS=9
        16/01/28 14:03:58 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=415253
        16/01/28 14:03:58 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24692
        16/01/28 14:03:58 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_MAPS=0
        16/01/28 14:03:58 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_REDUCES=0
        16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.TaskCounter
        16/01/28 14:03:58 INFO mapred.JobClient: MAP_INPUT_RECORDS=234962
        16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_RECORDS=3
        16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_BYTES=822
        16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_MATERIALIZED_BYTES=717
        16/01/28 14:03:58 INFO mapred.JobClient: SPLIT_RAW_BYTES=1461
        16/01/28 14:03:58 INFO mapred.JobClient: COMBINE_INPUT_RECORDS=0
        16/01/28 14:03:58 INFO mapred.JobClient: COMBINE_OUTPUT_RECORDS=0
        16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_INPUT_GROUPS=3
        16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_SHUFFLE_BYTES=717
        16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_INPUT_RECORDS=3
        16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_OUTPUT_RECORDS=3
        16/01/28 14:03:58 INFO mapred.JobClient: SPILLED_RECORDS=6
        16/01/28 14:03:58 INFO mapred.JobClient: CPU_MILLISECONDS=346620
        16/01/28 14:03:58 INFO mapred.JobClient: PHYSICAL_MEMORY_BYTES=10232893440
        16/01/28 14:03:58 INFO mapred.JobClient: VIRTUAL_MEMORY_BYTES=43124248576
        16/01/28 14:03:58 INFO mapred.JobClient: COMMITTED_HEAP_BYTES=13987872768
        16/01/28 14:03:58 INFO mapred.JobClient: ETLCounter$CounterType
        16/01/28 14:03:58 INFO mapred.JobClient: REDUCER_OUTPUT_RECORD=3
        16/01/28 14:03:58 INFO mapred.JobClient: VALID_RECORD=3
        16/01/28 14:03:58 INFO mapred.JobClient: File Input Format Counters
        16/01/28 14:03:58 INFO mapred.JobClient: Bytes Read=965224378
        16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat$Counter
        16/01/28 14:03:58 INFO mapred.JobClient: BYTES_WRITTEN=1508
        16/01/28 14:03:58 INFO mapreduce.Contact2ETLDeltaDriver: Merge job done!

        In this case, I got the following in the log:

        current key length = 34
        current value length = 402681

        It looks like the value's length is 402681.

        Then I run the same MR job again, but in this case, I am omitting all the records out, instead of just 3, and got the following in the log:

        current key length = 34
        current value length = 403167
        Got ((n >>> 1) ^ -(n & 1)) = -51
        And b = 101
        And shift = 0
        And n = 101

        So this case, for the same avro record, shipped to reducer from the mapper, but the length of value changes to 403167, which will also cause the readInt method return "-51".

        Even I am not 100% sure about how the intermediate data of MR job generated, but for the same AVRO record, should the value's length be the same in the ReduceContextImpl stage? Why the length will change? Is this correct?

        Show
        java8964 Yong Zhang added a comment - Hi, Ryan: First, I think this bug's priority should be Minor, as it is a vary rare case. So I changed it. In this test, I changed the class of https://github.com/apache/hadoop/blob/release-2.2.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java , and add the following lines right after line 144: if (key.toString().contains("xxxxx")) { System.out.println("current key length = " + (nextKey.getLength() - nextKey.getPosition())); System.out.println("current value length = " + (nextVal.getLength() - nextVal.getPosition())); } // in the above code "xxxx" is the key I know in the bad case, next line "value = valueDeserializer.deserialize(value)" will throw out exception. I want to check the value's length as bytes changed or not in both cases. Anyway, here is the count in the good case, which means the mapper only omit 3 records, include the "xxxxx" record: 16/01/28 14:03:58 INFO mapred.JobClient: Job complete: job_201512111403_1109 16/01/28 14:03:58 INFO mapred.JobClient: Counters: 31 16/01/28 14:03:58 INFO mapred.JobClient: File System Counters 16/01/28 14:03:58 INFO mapred.JobClient: FILE: BYTES_READ=524 16/01/28 14:03:58 INFO mapred.JobClient: FILE: BYTES_WRITTEN=1888562 16/01/28 14:03:58 INFO mapred.JobClient: HDFS: BYTES_READ=965292334 16/01/28 14:03:58 INFO mapred.JobClient: HDFS: BYTES_WRITTEN=1508 16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.JobCounter 16/01/28 14:03:58 INFO mapred.JobClient: TOTAL_LAUNCHED_MAPS=9 16/01/28 14:03:58 INFO mapred.JobClient: TOTAL_LAUNCHED_REDUCES=1 16/01/28 14:03:58 INFO mapred.JobClient: DATA_LOCAL_MAPS=9 16/01/28 14:03:58 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=415253 16/01/28 14:03:58 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24692 16/01/28 14:03:58 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_MAPS=0 16/01/28 14:03:58 INFO mapred.JobClient: FALLOW_SLOTS_MILLIS_REDUCES=0 16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.TaskCounter 16/01/28 14:03:58 INFO mapred.JobClient: MAP_INPUT_RECORDS=234962 16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_RECORDS=3 16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_BYTES=822 16/01/28 14:03:58 INFO mapred.JobClient: MAP_OUTPUT_MATERIALIZED_BYTES=717 16/01/28 14:03:58 INFO mapred.JobClient: SPLIT_RAW_BYTES=1461 16/01/28 14:03:58 INFO mapred.JobClient: COMBINE_INPUT_RECORDS=0 16/01/28 14:03:58 INFO mapred.JobClient: COMBINE_OUTPUT_RECORDS=0 16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_INPUT_GROUPS=3 16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_SHUFFLE_BYTES=717 16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_INPUT_RECORDS=3 16/01/28 14:03:58 INFO mapred.JobClient: REDUCE_OUTPUT_RECORDS=3 16/01/28 14:03:58 INFO mapred.JobClient: SPILLED_RECORDS=6 16/01/28 14:03:58 INFO mapred.JobClient: CPU_MILLISECONDS=346620 16/01/28 14:03:58 INFO mapred.JobClient: PHYSICAL_MEMORY_BYTES=10232893440 16/01/28 14:03:58 INFO mapred.JobClient: VIRTUAL_MEMORY_BYTES=43124248576 16/01/28 14:03:58 INFO mapred.JobClient: COMMITTED_HEAP_BYTES=13987872768 16/01/28 14:03:58 INFO mapred.JobClient: ETLCounter$CounterType 16/01/28 14:03:58 INFO mapred.JobClient: REDUCER_OUTPUT_RECORD=3 16/01/28 14:03:58 INFO mapred.JobClient: VALID_RECORD=3 16/01/28 14:03:58 INFO mapred.JobClient: File Input Format Counters 16/01/28 14:03:58 INFO mapred.JobClient: Bytes Read=965224378 16/01/28 14:03:58 INFO mapred.JobClient: org.apache.hadoop.mapreduce.lib.output.FileOutputFormat$Counter 16/01/28 14:03:58 INFO mapred.JobClient: BYTES_WRITTEN=1508 16/01/28 14:03:58 INFO mapreduce.Contact2ETLDeltaDriver: Merge job done! In this case, I got the following in the log: current key length = 34 current value length = 402681 It looks like the value's length is 402681. Then I run the same MR job again, but in this case, I am omitting all the records out, instead of just 3, and got the following in the log: current key length = 34 current value length = 403167 Got ((n >>> 1) ^ -(n & 1)) = -51 And b = 101 And shift = 0 And n = 101 So this case, for the same avro record, shipped to reducer from the mapper, but the length of value changes to 403167, which will also cause the readInt method return "-51". Even I am not 100% sure about how the intermediate data of MR job generated, but for the same AVRO record, should the value's length be the same in the ReduceContextImpl stage? Why the length will change? Is this correct?
        Hide
        java8964 Yong Zhang added a comment -

        And in this dataset, I don't think we ever change the schema.

        Show
        java8964 Yong Zhang added a comment - And in this dataset, I don't think we ever change the schema.
        Hide
        belugabehr BELUGA BEHR added a comment -

        May be experiencing this issue as well.... trying to collect more information...

        Show
        belugabehr BELUGA BEHR added a comment - May be experiencing this issue as well.... trying to collect more information...

          People

          • Assignee:
            Unassigned
            Reporter:
            java8964 Yong Zhang
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development