Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.22.0
    • Component/s: contrib/gridmix
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Support for Sleep jobs in gridmix

      1. 1376-5-yhadoop20-100-3.patch
        124 kB
        rahul k singh
      2. 1594-diff-4-5.patch
        18 kB
        Hong Tang
      3. 1594-yhadoop-20-1xx.patch
        6 kB
        rahul k singh
      4. 1594-yhadoop-20-1xx-1.patch
        21 kB
        rahul k singh
      5. 1594-yhadoop-20-1xx-1-2.patch
        104 kB
        rahul k singh
      6. 1594-yhadoop-20-1xx-1-3.patch
        51 kB
        rahul k singh
      7. 1594-yhadoop-20-1xx-1-4.patch
        70 kB
        rahul k singh
      8. 1594-yhadoop-20-1xx-1-5.patch
        79 kB
        Hong Tang

        Issue Links

          Activity

          Hide
          rahul k singh added a comment -

          Gridmix V3 needs to have support for Sleep jobs . This would help gridmix to run load JT w.r.t to hearbeats as we can have multiple TTs on single node . we can try to match the similar load we have on clusters with less machines.

          Show
          rahul k singh added a comment - Gridmix V3 needs to have support for Sleep jobs . This would help gridmix to run load JT w.r.t to hearbeats as we can have multiple TTs on single node . we can try to match the similar load we have on clusters with less machines.
          Hide
          Hong Tang added a comment -

          The main motivation of this extension is to allow us to stress the JT heartbeat handling logic by emulating production size cluster (at least 4K nodes) with an order of magnitude reduction of testing cluster size (several hundred nodes) by running multiple tasktrackers per node. To avoid overwhelming individual nodes, each task would simply sleep for the duration as recorded in the job trace.

          Show
          Hong Tang added a comment - The main motivation of this extension is to allow us to stress the JT heartbeat handling logic by emulating production size cluster (at least 4K nodes) with an order of magnitude reduction of testing cluster size (several hundred nodes) by running multiple tasktrackers per node. To avoid overwhelming individual nodes, each task would simply sleep for the duration as recorded in the job trace.
          Hide
          rahul k singh added a comment -

          Attaching the first cut patch for 1594 for yhadoop 20.1xx branch.
          This patch is dependent on 1376 yhadoop 20.1xx patch .

          To make it work in apply "https://issues.apache.org/jira/secure/attachment/12438576/1376-2-yhadoop-security.patch" and on top of it apply this patch.

          Show
          rahul k singh added a comment - Attaching the first cut patch for 1594 for yhadoop 20.1xx branch. This patch is dependent on 1376 yhadoop 20.1xx patch . To make it work in apply "https://issues.apache.org/jira/secure/attachment/12438576/1376-2-yhadoop-security.patch" and on top of it apply this patch.
          Hide
          rahul k singh added a comment -

          Earlier patch was incomplete , attaching the correct patch

          Show
          rahul k singh added a comment - Earlier patch was incomplete , attaching the correct patch
          Hide
          Hong Tang added a comment -
          • Overall, the approach looks fine although the implementation is still a bit hacky in the sense that users still need to specify input/output directories for running sleep jobs (which should be ignored). But I am fine with it for now as the structure is likely evolving with more extensions to be added to Gridmix.
          • Style comment: Consider change JobType to JobCreator - it sounds more natural to call JobCreator.createGridmixJob(...).
          • Structure wise, it would be better to rename GridmixJob to LoadJob, and create a common base (probably should be abstract) class for LoadJob and SleepJob and call it GridmixJob that only contains the shared parts of LoadJob and SleepJob. E.g. outdir may only belong to LoadJob but not SleepJob. (BTW, are File {Input,Output}Format.set{Input,Output}

            Path needed for SleepJob.call()?)

          • SleepInputFormat.createRecordReader - should return a record reader that produces consecutive keys that match the expected wakeup time for the mapper process. Something like the following:
                  return new RecordReader<LongWritable, LongWritable>() {
                    long start = -1;
                    long slept = 0L;
                    long sleep = 0L;
                    final LongWritable key = new LongWritable();
                    final LongWritable val = new LongWritable();
            
                    @Override
                    public boolean nextKeyValue() throws IOException {
                      if (start == -1) {
                        start = System.nanoTime()/1000000;
                      }
                      slept += sleep;
                      sleep = Math.min(duration - slept, RINTERVAL);
                      key.set(slept + sleep + start);
                      val.set(duration - slept);
                      return slept < duration;
                    }
            
                    @Override
                    public float getProgress() throws IOException {
                      return slept / ((float) duration);
                    }
            
                    @Override
                    public LongWritable getCurrentKey() {
                      return key;
                    }
            
                    @Override
                    public LongWritable getCurrentValue() {
                      return val;
                    }
            
                    @Override
                    public void close() throws IOException {
                      final String msg = "Slept for " + duration;
                      LOG.info(msg);
                    }
            
                    public void initialize(InputSplit split, TaskAttemptContext ctxt) {
                    }
                  };
            

          Accordingly, SleepMapper.map(...) should be modified as follows:

              public void map(LongWritable key, LongWritable value, Context context)
                throws IOException, InterruptedException {
                context.setStatus("Sleeping... " + value.get() + " ms left");
                long now = System.nanoTime()/1000000;
                if (now < key.get()) {
                  TimeUnit.MILLISECONDS.sleep(key.get()-now);
                }
              }
          

          This is to avoid the actual sleep time deviates from the expected sleep time and the error gets accumulated over many map() calls.

          • Similar idea should be applied to SleepReducer too.
          • Do I read it right that by default the mapper() updates its progress once every 10 seconds? It is more interesting to make RINTERVAL == Math.min(1sec, totalDuration/20) so that the reported map task progress could be smoother. (Unfortunately the reduce progress may not be very useful).
          • Should issue a warning in SleepJob.getSuccessfulAttemptInfo() if no successful attempt is found.
          • SleepJob.buildSplits(): should not use InputStriper at all. At the end, you should set locations of SleepSplit to "new String[0]" instead of "striper.splitFor(inputDir, 512, 3).getLocations()"
          Show
          Hong Tang added a comment - Overall, the approach looks fine although the implementation is still a bit hacky in the sense that users still need to specify input/output directories for running sleep jobs (which should be ignored). But I am fine with it for now as the structure is likely evolving with more extensions to be added to Gridmix. Style comment: Consider change JobType to JobCreator - it sounds more natural to call JobCreator.createGridmixJob(...). Structure wise, it would be better to rename GridmixJob to LoadJob, and create a common base (probably should be abstract) class for LoadJob and SleepJob and call it GridmixJob that only contains the shared parts of LoadJob and SleepJob. E.g. outdir may only belong to LoadJob but not SleepJob. (BTW, are File {Input,Output}Format.set{Input,Output} Path needed for SleepJob.call()?) SleepInputFormat.createRecordReader - should return a record reader that produces consecutive keys that match the expected wakeup time for the mapper process. Something like the following: return new RecordReader<LongWritable, LongWritable>() { long start = -1; long slept = 0L; long sleep = 0L; final LongWritable key = new LongWritable(); final LongWritable val = new LongWritable(); @Override public boolean nextKeyValue() throws IOException { if (start == -1) { start = System.nanoTime()/1000000; } slept += sleep; sleep = Math.min(duration - slept, RINTERVAL); key.set(slept + sleep + start); val.set(duration - slept); return slept < duration; } @Override public float getProgress() throws IOException { return slept / ((float) duration); } @Override public LongWritable getCurrentKey() { return key; } @Override public LongWritable getCurrentValue() { return val; } @Override public void close() throws IOException { final String msg = "Slept for " + duration; LOG.info(msg); } public void initialize(InputSplit split, TaskAttemptContext ctxt) { } }; Accordingly, SleepMapper.map(...) should be modified as follows: public void map(LongWritable key, LongWritable value, Context context) throws IOException, InterruptedException { context.setStatus("Sleeping... " + value.get() + " ms left"); long now = System.nanoTime()/1000000; if (now < key.get()) { TimeUnit.MILLISECONDS.sleep(key.get()-now); } } This is to avoid the actual sleep time deviates from the expected sleep time and the error gets accumulated over many map() calls. Similar idea should be applied to SleepReducer too. Do I read it right that by default the mapper() updates its progress once every 10 seconds? It is more interesting to make RINTERVAL == Math.min(1sec, totalDuration/20) so that the reported map task progress could be smoother. (Unfortunately the reduce progress may not be very useful). Should issue a warning in SleepJob.getSuccessfulAttemptInfo() if no successful attempt is found. SleepJob.buildSplits(): should not use InputStriper at all. At the end, you should set locations of SleepSplit to "new String [0] " instead of "striper.splitFor(inputDir, 512, 3).getLocations()"
          Hide
          rahul k singh added a comment -

          Attaching the new patch with hong's comments.

          Show
          rahul k singh added a comment - Attaching the new patch with hong's comments.
          Hide
          rahul k singh added a comment -

          I have implemented all the comments except

          -Structure wise, it would be better to rename GridmixJob to LoadJob, and create a common base (probably should be abstract) class for LoadJob and SleepJob and call it GridmixJob that only contains the shared parts of LoadJob and SleepJob. E.g. outdir may only belong to LoadJob but not SleepJob. (BTW, are File

          {Input,Output}Format.set{Input,Output}

          Path needed for SleepJob.call()?)

          GridmixJob is created as an abstract class , outdir has been pushed to GridmixJob as SleepJob is also using this. We need File

          {Input,Output}Format.set{Input,Output}

          Path for mapreduce . Iam not sure if this is a bug , but it is required.

          Show
          rahul k singh added a comment - I have implemented all the comments except -Structure wise, it would be better to rename GridmixJob to LoadJob, and create a common base (probably should be abstract) class for LoadJob and SleepJob and call it GridmixJob that only contains the shared parts of LoadJob and SleepJob. E.g. outdir may only belong to LoadJob but not SleepJob. (BTW, are File {Input,Output}Format.set{Input,Output} Path needed for SleepJob.call()?) GridmixJob is created as an abstract class , outdir has been pushed to GridmixJob as SleepJob is also using this. We need File {Input,Output}Format.set{Input,Output} Path for mapreduce . Iam not sure if this is a bug , but it is required.
          Hide
          rahul k singh added a comment -

          Uploading the new patch to accomodate new changes , Old patch wasnt applying.

          Correct order to make it work is
          20.100 + 1376 security patch and on top of it this patch.

          Show
          rahul k singh added a comment - Uploading the new patch to accomodate new changes , Old patch wasnt applying. Correct order to make it work is 20.100 + 1376 security patch and on top of it this patch.
          Hide
          Hong Tang added a comment -
          • GenerateData should extend from GridmixJob instead of LoadJob. I think we can have a default implementation of buildSplits (as an empty function) in GridmixJob and remove the "abstract" keyword.
          • The indentation of JobCreator looks weird.
          • Replace the following with Configuration.getEnum:
            +    Configuration conf, JobCreator defaultPolicy) {
            +    String policy = conf.get(GRIDMIX_JOB_TYPE, defaultPolicy.name());
            +    return valueOf(policy.toUpperCase());
            
          • unused and extra imports (CommonConfigurationKeys, FsPermission) in TestGridmixSubmission.java.
          • There does not seem be any unit tests covering the added feature.
          • Avoiding directly setting "gridmix.job.seq" in both LoadJob and SleepJob. Instead, refactor the statement to a common method in GridmixJob called setSeqId(Job job). Similarly, adding a method getSeqId(Job job) in GridmixJob and avoid directly calling conf.get("girdmix.job.seq", -1) in {GridmixInputFormat, SleepInputFormat}

            .getSplits(...).

          • Should we rename Gridmix {Mapper, Reducer, InputFormat, RecordReader, Split} to Load{Mapper, Reducer, InputFormat, RecordReader, Split}

            ?

          • Also suggest to refactor the statement of setting the original job id to a method in GridmixJob.
          • There are some comments in LoadJob that are hard to understand.
              // TODO replace with ThreadLocal submitter?
            
              // not nesc when TL
            
          • reduces in SleepMapper is not used.
          • SleepJob hacks GridmixKey to pass along the sleep duration from map tasks to reduce tasks. We should document that in the code and file a jira to fix it.
          • The following code does not seem to do what the comments claim to:
                  //This is to stop accumulating deviation from expected sleep time
                  //over a period of time.
                  long now = System.nanoTime() / 1000000;
                  if (now < duration) {
                    duration = duration - now;
                  }
                  long slept = 0L;
                  long sleep = 0L;
                  while (slept < duration) {
                    final long rem = duration - slept;
                    sleep = Math.min(rem, RINTERVAL);
                    context.setStatus("Sleeping... " + rem + " ms left");
                    slept += sleep;
                    TimeUnit.MILLISECONDS.sleep(sleep);
                  }
            

          Should it be more like the following?

                //This is to stop accumulating deviation from expected sleep time
                //over a period of time.
                long start = System.nanoTime() / 1000000;
                long slept = 0L;
                long sleep = 0L;
                while (slept < duration) {
                  final long rem = duration - slept;
                  sleep = Math.min(rem, RINTERVAL);
                  context.setStatus("Sleeping... " + rem + " ms left");
                  TimeUnit.MILLISECONDS.sleep(sleep);
                  slept = System.nanoTime() / 1000000 - start;
                }
          
          • The following seems incorrect:
                  final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
                    context.getConfiguration().getLong(
                      GRIDMIX_SLEEP_INTERVAL, Math.min(
                        1, duration / 20)), TimeUnit.SECONDS);
            

            "duration" is in ms not in seconds. It should be changed to

                  final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
                    context.getConfiguration().getLong(
                      GRIDMIX_SLEEP_INTERVAL, Math.min(
                        1, Math.max(1, duration/1000/20))), TimeUnit.SECONDS);
            
          • I cannot find anywhere outdir is used by SleepJob. Did you encounter an error if FOF.setOutputPath is commented out in SleepJob.call()?
          • Both SleepJob and GridmixJob calls FileInputFormat.addInputPath(job, new path("ignored")), but one is surrounded with a try-catch block and the other is not. Not sure why. I am also curious to know what would be the error if FIF.addInputPath is not called in both classes.
          Show
          Hong Tang added a comment - GenerateData should extend from GridmixJob instead of LoadJob. I think we can have a default implementation of buildSplits (as an empty function) in GridmixJob and remove the "abstract" keyword. The indentation of JobCreator looks weird. Replace the following with Configuration.getEnum: + Configuration conf, JobCreator defaultPolicy) { + String policy = conf.get(GRIDMIX_JOB_TYPE, defaultPolicy.name()); + return valueOf(policy.toUpperCase()); unused and extra imports (CommonConfigurationKeys, FsPermission) in TestGridmixSubmission.java. There does not seem be any unit tests covering the added feature. Avoiding directly setting "gridmix.job.seq" in both LoadJob and SleepJob. Instead, refactor the statement to a common method in GridmixJob called setSeqId(Job job). Similarly, adding a method getSeqId(Job job) in GridmixJob and avoid directly calling conf.get("girdmix.job.seq", -1) in {GridmixInputFormat, SleepInputFormat} .getSplits(...). Should we rename Gridmix {Mapper, Reducer, InputFormat, RecordReader, Split} to Load{Mapper, Reducer, InputFormat, RecordReader, Split} ? Also suggest to refactor the statement of setting the original job id to a method in GridmixJob. There are some comments in LoadJob that are hard to understand. // TODO replace with ThreadLocal submitter? // not nesc when TL reduces in SleepMapper is not used. SleepJob hacks GridmixKey to pass along the sleep duration from map tasks to reduce tasks. We should document that in the code and file a jira to fix it. The following code does not seem to do what the comments claim to: //This is to stop accumulating deviation from expected sleep time //over a period of time. long now = System.nanoTime() / 1000000; if (now < duration) { duration = duration - now; } long slept = 0L; long sleep = 0L; while (slept < duration) { final long rem = duration - slept; sleep = Math.min(rem, RINTERVAL); context.setStatus("Sleeping... " + rem + " ms left"); slept += sleep; TimeUnit.MILLISECONDS.sleep(sleep); } Should it be more like the following? //This is to stop accumulating deviation from expected sleep time //over a period of time. long start = System.nanoTime() / 1000000; long slept = 0L; long sleep = 0L; while (slept < duration) { final long rem = duration - slept; sleep = Math.min(rem, RINTERVAL); context.setStatus("Sleeping... " + rem + " ms left"); TimeUnit.MILLISECONDS.sleep(sleep); slept = System.nanoTime() / 1000000 - start; } The following seems incorrect: final long RINTERVAL = TimeUnit.MILLISECONDS.convert( context.getConfiguration().getLong( GRIDMIX_SLEEP_INTERVAL, Math.min( 1, duration / 20)), TimeUnit.SECONDS); "duration" is in ms not in seconds. It should be changed to final long RINTERVAL = TimeUnit.MILLISECONDS.convert( context.getConfiguration().getLong( GRIDMIX_SLEEP_INTERVAL, Math.min( 1, Math.max(1, duration/1000/20))), TimeUnit.SECONDS); I cannot find anywhere outdir is used by SleepJob. Did you encounter an error if FOF.setOutputPath is commented out in SleepJob.call()? Both SleepJob and GridmixJob calls FileInputFormat.addInputPath(job, new path("ignored")), but one is surrounded with a try-catch block and the other is not. Not sure why. I am also curious to know what would be the error if FIF.addInputPath is not called in both classes.
          Hide
          rahul k singh added a comment -

          I have implemented all the comments except few:

          • GenerateData should extend from GridmixJob instead of LoadJob. I think we can have a default implementation of buildSplits (as an empty function) in GridmixJob and remove the "abstract" keyword.

          GenerateData is now extending GridmixJob. But GridmixJob is still abstract as call() method is abstract.And it is implemented by all the derived classes.

          • Avoiding directly setting "gridmix.job.seq" in both LoadJob and SleepJob. Instead, refactor the statement to a common method in GridmixJob called setSeqId(Job job). Similarly, adding a method getSeqId(Job job) in GridmixJob and avoid directly calling conf.get("girdmix.job.seq", -1) in {GridmixInputFormat, SleepInputFormat}.getSplits(...).

            getSeqId is not there as {GridmixInputFormat, SleepInputFormat}

            .getSplits(...). is part of static inner classes and can only access static method.

          • I cannot find anywhere outdir is used by SleepJob. Did you encounter an error if FOF.setOutputPath is commented out in SleepJob.call()?
            Removed this code and tested , things work fine.
          • Both SleepJob and GridmixJob calls FileInputFormat.addInputPath(job, new path("ignored")), but one is surrounded with a try-catch block and the other is not. Not sure why. I am also curious to know what would be the error if FIF.addInputPath is not called in both classes

          I have remove FIF.addInputPath . things are working fine on cluster. I have removed try-catch block and added the exception in the signature of call

          Show
          rahul k singh added a comment - I have implemented all the comments except few: GenerateData should extend from GridmixJob instead of LoadJob. I think we can have a default implementation of buildSplits (as an empty function) in GridmixJob and remove the "abstract" keyword. GenerateData is now extending GridmixJob. But GridmixJob is still abstract as call() method is abstract.And it is implemented by all the derived classes. Avoiding directly setting "gridmix.job.seq" in both LoadJob and SleepJob. Instead, refactor the statement to a common method in GridmixJob called setSeqId(Job job). Similarly, adding a method getSeqId(Job job) in GridmixJob and avoid directly calling conf.get("girdmix.job.seq", -1) in {GridmixInputFormat, SleepInputFormat}.getSplits(...). getSeqId is not there as {GridmixInputFormat, SleepInputFormat} .getSplits(...). is part of static inner classes and can only access static method. I cannot find anywhere outdir is used by SleepJob. Did you encounter an error if FOF.setOutputPath is commented out in SleepJob.call()? Removed this code and tested , things work fine. Both SleepJob and GridmixJob calls FileInputFormat.addInputPath(job, new path("ignored")), but one is surrounded with a try-catch block and the other is not. Not sure why. I am also curious to know what would be the error if FIF.addInputPath is not called in both classes I have remove FIF.addInputPath . things are working fine on cluster. I have removed try-catch block and added the exception in the signature of call
          Hide
          rahul k singh added a comment -
          • SleepJob hacks GridmixKey to pass along the sleep duration from map tasks to reduce tasks. We should document that in the code and file a jira to fix it.

          Documented in the code and opened the jira https://issues.apache.org/jira/browse/MAPREDUCE-1675

          Show
          rahul k singh added a comment - SleepJob hacks GridmixKey to pass along the sleep duration from map tasks to reduce tasks. We should document that in the code and file a jira to fix it. Documented in the code and opened the jira https://issues.apache.org/jira/browse/MAPREDUCE-1675
          Hide
          Hong Tang added a comment -

          The latest patch ("1594-yhadoop-20-1xx-1-4.patch") still has a few minor issues. I fixed them and attached a new patch (1594-yhadoop-20-1xx-1-5.patch). I also attached a separated diff from 1594-yhadoop-20-1xx-1-4.patch (1594-diff-4-5.patch, so that it is easier to see what I have changed):

          • Replacing the pattern of "enum.name().equals()" to directly enum comparison in various places.
          • In GridmixJob, made job field final. Cleaned up the exception logic of PrivilegedExceptionAction.run().
          • Move the functionality of setting SeqId and original name in GridmixJob ctor, eliminating the need of setSeqId and setJobId.
          • Added another pullDescription method in GridmixJob that takes a JobContext object. (to avoid expose the key "gridmix.job.seq").
          • The way RINTERVAL is calculated is wrong - when duration is less than 20000, it would lead to RINTERVAL==0. Per offline conversation with Chris, it seems that having a fine granularity for RINTERVAL is pointless since progress is only updated when TT sends heartbeat to JT. So I reverted the way how RINTERVAL is calculated.
          • Changed to use System.currentTimeMilis() in SleepJob instead of System.nanoTime() since we are dealing with mili-sec granularity.
          • In both TestGridmixSubmission and TestSleepJob, the statements for setting the configuration seems to be useless.
          • Some indentation issues in GenerateData.
          • Removed unused imports in GridmixJob
          • Removed the unused LoadJob ctor.
          Show
          Hong Tang added a comment - The latest patch ("1594-yhadoop-20-1xx-1-4.patch") still has a few minor issues. I fixed them and attached a new patch (1594-yhadoop-20-1xx-1-5.patch). I also attached a separated diff from 1594-yhadoop-20-1xx-1-4.patch (1594-diff-4-5.patch, so that it is easier to see what I have changed): Replacing the pattern of "enum.name().equals()" to directly enum comparison in various places. In GridmixJob, made job field final. Cleaned up the exception logic of PrivilegedExceptionAction.run(). Move the functionality of setting SeqId and original name in GridmixJob ctor, eliminating the need of setSeqId and setJobId. Added another pullDescription method in GridmixJob that takes a JobContext object. (to avoid expose the key "gridmix.job.seq"). The way RINTERVAL is calculated is wrong - when duration is less than 20000, it would lead to RINTERVAL==0. Per offline conversation with Chris, it seems that having a fine granularity for RINTERVAL is pointless since progress is only updated when TT sends heartbeat to JT. So I reverted the way how RINTERVAL is calculated. Changed to use System.currentTimeMilis() in SleepJob instead of System.nanoTime() since we are dealing with mili-sec granularity. In both TestGridmixSubmission and TestSleepJob, the statements for setting the configuration seems to be useless. Some indentation issues in GenerateData. Removed unused imports in GridmixJob Removed the unused LoadJob ctor.
          Hide
          Chris Douglas added a comment -

          Fixed in MAPREDUCE-1840

          Show
          Chris Douglas added a comment - Fixed in MAPREDUCE-1840

            People

            • Assignee:
              rahul k singh
              Reporter:
              rahul k singh
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development