MRUnit
  1. MRUnit
  2. MRUNIT-165

MapReduceDriver calls Mapper#cleanup for each input instead of once

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 1.0.0
    • Labels:
      None

      Description

      MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

      I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

      This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

      One solution might be to pass the Mapper class into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

      See attached patch for an example of a stateful mapper and a test which fails due to the bug.

        Issue Links

          Activity

          Dave Beech made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Fix Version/s 1.0.0 [ 12320548 ]
          Resolution Fixed [ 1 ]
          Dave Beech made changes -
          Link This issue is related to MRUNIT-64 [ MRUNIT-64 ]
          Yoni Ben-Meshulam made changes -
          Description MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class StatefulMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(this.someState));
              }

          }
          {code}
          MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          See attached patch for an example of a stateful mapper and a test which fails due to the bug.
          Yoni Ben-Meshulam made changes -
          Attachment reproduce_MRUNIT-165.patch [ 12555786 ]
          Yoni Ben-Meshulam made changes -
          Attachment reproduce_MRUNIT-165.patch [ 12555785 ]
          Yoni Ben-Meshulam made changes -
          Attachment reproduce_MRUNIT-165.patch [ 12555785 ]
          Yoni Ben-Meshulam made changes -
          Attachment reproduce_MRUNIT-165.patch [ 12555784 ]
          Yoni Ben-Meshulam made changes -
          Attachment reproduce_MRUNIT-165.patch [ 12555784 ]
          Dave Beech made changes -
          Assignee Dave Beech [ dbeech ]
          Yoni Ben-Meshulam made changes -
          Description MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class ClosedFormRegressionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(this.someState));
              }

          }
          {code}
          MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class StatefulMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(this.someState));
              }

          }
          {code}
          Yoni Ben-Meshulam made changes -
          Description MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class ClosedFormRegressionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(someState));
              }

          }
          {code}
          MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class ClosedFormRegressionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(this.someState));
              }

          }
          {code}
          Yoni Ben-Meshulam made changes -
          Field Original Value New Value
          Description MapReduceDriver calls the {{run}} method for each input, causing the {{cleanup}} method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the {{Mapper#cleanup}} method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class ClosedFormRegressionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(someState));
              }

          }
          {code}
          MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times.

          I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here.

          This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper.

          One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible).

          ----

          To reproduce, create a MapReduce job with some stateful mapper:

          {code}
          public class ClosedFormRegressionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

              public static final Text KEY = new Text("SomeKey");
              private Int someState = 0;

              /**
               * Increment someState for each input.
               *
               * @param context the Hadoop job Map context
               * @throws java.io.IOException
               */
              @Override
              public void map(
                      LongWritable key,
                      Text value,
                      Context context
              ) throws IOException, InterruptedException {

                  this.someState += 1;

              }

              /**
               * Runs once after all maps have occurred. Dumps the accumulated state to the output.
               * @param context the Hadoop job Map context
               */
              @Override
              protected void cleanup(Context context) throws IOException, InterruptedException {
                  context.write(this.KEY, new IntWritable(someState));
              }

          }
          {code}
          Yoni Ben-Meshulam created issue -

            People

            • Assignee:
              Dave Beech
              Reporter:
              Yoni Ben-Meshulam
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development