commit 85d9d3b1346a580e1d10f0bfa99ccf1d14d39e29
Author: Todd Lipcon <todd@cloudera.com>
Date:   Wed May 27 13:54:30 2009 -0700

    Backport HADOOP-5170: tasklimits-v3.patch
    
    Conflicts:
    
    	src/mapred/mapred-default.xml
    	src/mapred/org/apache/hadoop/mapred/JobInProgress.java

diff --git conf/hadoop-default.xml conf/hadoop-default.xml
index 2d5fe68..e14fc15 100644
--- conf/hadoop-default.xml
+++ conf/hadoop-default.xml
@@ -1278,4 +1278,32 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.max.maps.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.max.reduces.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.map.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.reduce.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
 </configuration>
diff --git src/mapred/org/apache/hadoop/mapred/JobConf.java src/mapred/org/apache/hadoop/mapred/JobConf.java
index 2ab8221..11be95a 100644
--- src/mapred/org/apache/hadoop/mapred/JobConf.java
+++ src/mapred/org/apache/hadoop/mapred/JobConf.java
@@ -984,6 +984,78 @@ public class JobConf extends Configuration {
     return getInt("mapred.map.max.attempts", 4);
   }
   
+  /**
+   * Get the per-node limit on running maps for the job
+   * 
+   * @return per-node running map limit
+   */
+  public int getMaxMapsPerNode() {
+    return getInt("mapred.max.maps.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running maps for the job
+   * 
+   * @param limit per-node running map limit
+   */
+  public void setMaxMapsPerNode(int limit) {
+    setInt("mapred.max.maps.per.node", limit);
+  }
+  
+  /**
+   * Get the per-node limit on running reduces for the job
+   * 
+   * @return per-node running reduce limit
+   */
+  public int getMaxReducesPerNode() {
+    return getInt("mapred.max.reduces.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running reduces for the job
+   * 
+   * @param limit per-node running reduce limit
+   */
+  public void setMaxReducesPerNode(int limit) {
+    setInt("mapred.max.reduces.per.node", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running maps for the job
+   * 
+   * @return cluster-wide running map limit
+   */
+  public int getRunningMapLimit() {
+    return getInt("mapred.running.map.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running maps for the job
+   * 
+   * @param limit cluster-wide running map limit
+   */
+  public void setRunningMapLimit(int limit) {
+    setInt("mapred.running.map.limit", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running reduces for the job
+   * 
+   * @return cluster-wide running reduce limit
+   */
+  public int getRunningReduceLimit() {
+    return getInt("mapred.running.reduce.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running reduces for the job
+   * 
+   * @param limit cluster-wide running reduce limit
+   */
+  public void setRunningReduceLimit(int limit) {
+    setInt("mapred.running.reduce.limit", limit);
+  }
+  
   /** 
    * Expert: Set the number of maximum attempts that will be made to run a
    * map task.
diff --git src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobInProgress.java
index 48ee159..ab09b13 100644
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java
@@ -72,6 +72,20 @@ class JobInProgress {
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
   
+  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+  int completedMapsForReduceSlowstart = 0;
+  
+  // runningMapTasks include speculative tasks, so we need to capture 
+  // speculative tasks separately 
+  int speculativeMapTasks = 0;
+  int speculativeReduceTasks = 0;
+  
+  // Limits on concurrent running tasks per-node and cluster-wide
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  private int runningMapLimit;
+  private int runningReduceLimit;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -202,6 +216,11 @@ class JobInProgress {
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerNode = conf.getMaxMapsPerNode();
+    this.maxReducesPerNode = conf.getMaxReducesPerNode();
+    this.runningMapLimit = conf.getRunningMapLimit();
+    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     JobHistory.JobInfo.logSubmitted(jobid, conf, jobFile.toString(), 
                                     System.currentTimeMillis()); 
@@ -1058,6 +1077,25 @@ class JobInProgress {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
+    // Check cluster-wide limit on running maps
+    if (runningMapLimit != -1 && runningMapTasks >= runningMapLimit) {
+      return -1;
+    }
+    
+    // Check per-node limit on running maps
+    if (maxMapsPerNode != -1) {
+      int runningMapsOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningMapsOnNode++;
+        }
+      }
+      if (runningMapsOnNode >= maxMapsPerNode) {
+        return -1;
+      }
+    }
+    
     //
     // Update the last-known clusterSize
     //
@@ -1258,6 +1296,25 @@ class JobInProgress {
                                              double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
+
+    // Check cluster-wide limit on running reduces
+    if (runningReduceLimit != -1 && runningReduceTasks >= runningReduceLimit) {
+      return -1;
+    }
+    
+    // Check per-node limit on running reduces
+    if (maxReducesPerNode != -1) {
+      int runningReducesOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && !ts.getIsMap() &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningReducesOnNode++;
+        }
+      }
+      if (runningReducesOnNode >= maxReducesPerNode) {
+        return -1;
+      }
+    }
     
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
diff --git src/test/org/apache/hadoop/mapred/MiniMRCluster.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java
index 5541b62..99ba9a5 100644
--- src/test/org/apache/hadoop/mapred/MiniMRCluster.java
+++ src/test/org/apache/hadoop/mapred/MiniMRCluster.java
@@ -107,6 +107,10 @@ public class MiniMRCluster {
         LOG.error("Problem shutting down job tracker", e);
       }
     }
+
+    public JobTracker getJobTracker() {
+      return tracker;
+    }
   }
     
   /**
@@ -121,12 +125,17 @@ public class MiniMRCluster {
     volatile boolean isDead = false;
     int numDir;
 
-    TaskTrackerRunner(int trackerId, int numDir, String hostname) 
+    TaskTrackerRunner(int trackerId, int numDir, String hostname, JobConf cfg) 
     throws IOException {
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
-      JobConf conf = createJobConf();
+      JobConf conf;
+      if (cfg == null) {
+        conf = createJobConf();
+      } else {
+        conf = createJobConf(cfg);
+      }
       if (hostname != null) {
         conf.set("slave.host.name", hostname);
       }
@@ -220,6 +229,10 @@ public class MiniMRCluster {
     return (taskTrackerList.get(taskTracker)).getLocalDir();
   }
 
+  public JobTrackerRunner getJobTrackerRunner() {
+    return jobTracker;
+  }
+
   /**
    * Get the number of task trackers in the cluster
    */
@@ -444,7 +457,7 @@ public class MiniMRCluster {
       }
       TaskTrackerRunner taskTracker;
       taskTracker = new TaskTrackerRunner(idx, numDir, 
-          hosts == null ? null : hosts[idx]);
+                                          hosts == null ? null : hosts[idx], conf);
       
       Thread taskTrackerThread = new Thread(taskTracker);
       taskTrackerList.add(taskTracker);
diff --git src/test/org/apache/hadoop/mapred/TestRunningTaskLimits.java src/test/org/apache/hadoop/mapred/TestRunningTaskLimits.java
new file mode 100644
index 0000000..632aaf3
--- /dev/null
+++ src/test/org/apache/hadoop/mapred/TestRunningTaskLimits.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.GenericMRLoadGenerator.RandomInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Test the running task limits - mapred.max.maps.per.node,
+ * mapred.max.reduces.per.node, mapred.max.running.maps and
+ * mapred.max.running.reduces.
+ */
+public class TestRunningTaskLimits extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestRunningTaskLimits.class);
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data", "/tmp"), 
+             "test-running-task-limits");
+  
+  /**
+   * This test creates a cluster with 1 tasktracker with 3 map and 3 reduce 
+   * slots. We then submit a job with a limit of 2 maps and 1 reduce per
+   * node, and check that these limits are obeyed in launching tasks.
+   */
+  public void testPerNodeLimits() throws Exception {
+    LOG.info("Running testPerNodeLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 1 tasktracker with 3 map slots and 3 reduce slots
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 3 maps/node and 2 reduces/node 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setMaxMapsPerNode(2);
+    jobConf.setMaxReducesPerNode(1);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(2, jip.runningMaps());
+    assertEquals(1, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
+   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+   * cluster-wide, and check that these limits are obeyed in launching tasks.
+   */
+  public void testClusterWideLimits() throws Exception {
+    LOG.info("Running testClusterWideLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setRunningMapLimit(5);
+    jobConf.setRunningReduceLimit(3);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(5, jip.runningMaps());
+    assertEquals(3, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
+   * slots each. We then submit a job with a limit of 5 maps and 3 reduces
+   * cluster-wide, and 2 maps and 2 reduces per node. We should end up with
+   * 4 maps and 3 reduces running: the maps hit the per-node limit first,
+   * while the reduces hit the cluster-wide limit.
+   */
+  public void testClusterWideAndPerNodeLimits() throws Exception {
+    LOG.info("Running testClusterWideAndPerNodeLimits");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 2 tasktrackers with 3 map and reduce slots each
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 10 maps and 5 reduces on the entire cluster 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setRunningMapLimit(5);
+    jobConf.setRunningReduceLimit(3);
+    jobConf.setMaxMapsPerNode(2);
+    jobConf.setMaxReducesPerNode(2);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(4, jip.runningMaps());
+    assertEquals(3, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * This test creates a cluster with 2 tasktrackers with 3 map and 3 reduce 
+   * slots. We then submit a job with limits of 5 maps/node and 5 reduces/node,
+   * as well as 10 maps and 10 reduces cluster-wide, which are higher than the
+   * number of slots. We make sure that only 6 slots of each type are used.
+   */
+  public void testLimitsIgnoredBeyondSlotCount() throws Exception {
+    LOG.info("Running testLimitsIgnoredBeyondSlotCount");
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup test dir
+    
+    // Create a cluster with 2 tasktrackers with 3 map slots and 3 reduce slots
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 3);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 3);
+    MiniMRCluster mr = new MiniMRCluster(2, "file:///", 1, null, null, conf);
+    
+    // Create a job with limits of 6 maps/node and 6 reduces/node 
+    JobConf jobConf = createWaitJobConf(mr, "job1", 20, 20);
+    jobConf.setRunningMapLimit(10);
+    jobConf.setRunningReduceLimit(10);
+    jobConf.setMaxMapsPerNode(5);
+    jobConf.setMaxReducesPerNode(5);
+    
+    // Submit the job
+    RunningJob rJob = (new JobClient(jobConf)).submitJob(jobConf);
+    
+    // Wait 20 seconds for it to start up
+    UtilsForTests.waitFor(20000);
+    
+    // Check the number of running tasks
+    JobTracker jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobTracker.getJob(rJob.getID());
+    assertEquals(6, jip.runningMaps());
+    assertEquals(6, jip.runningReduces());
+    
+    rJob.killJob();
+    mr.shutdown();
+  }
+  
+  /**
+   * Create a JobConf for a job using the WaitingMapper and IdentityReducer,
+   * which will sleep until a signal file is created. In this test we never
+   * create the signal file so the job just occupies slots for the duration
+   * of the test as they are assigned to it. 
+   */
+  JobConf createWaitJobConf(MiniMRCluster mr, String jobName,
+      int numMaps, int numRed)
+  throws IOException {
+    JobConf jobConf = mr.createJobConf();
+    Path inDir = new Path(TEST_DIR, "input");
+    Path outDir = new Path(TEST_DIR, "output-" + jobName);
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    jobConf.setMapperClass(UtilsForTests.WaitingMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setInputFormat(RandomInputFormat.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(numRed);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    jobConf.set(UtilsForTests.getTaskSignalParameter(true), signalFile);
+    jobConf.set(UtilsForTests.getTaskSignalParameter(false), signalFile);
+    return jobConf;
+  }
+
+}
diff --git src/test/org/apache/hadoop/mapred/UtilsForTests.java src/test/org/apache/hadoop/mapred/UtilsForTests.java
index 85e7bd9..076ef4b 100644
--- src/test/org/apache/hadoop/mapred/UtilsForTests.java
+++ src/test/org/apache/hadoop/mapred/UtilsForTests.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.*;
 
 /** 
  * Utilities used in unit test.
@@ -44,6 +45,7 @@ public class UtilsForTests {
   final static long GB = 1024L * MB;
   final static long TB = 1024L * GB;
   final static long PB = 1024L * TB;
+  final static Object waitLock = new Object();
 
   static DecimalFormat dfm = new DecimalFormat("####.000");
   static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
@@ -189,4 +191,91 @@ public class UtilsForTests {
     }
     return new String(space, 0, len);
   }
+
+  /**
+   * A utility that waits for specified amount of time
+   */
+  static void waitFor(long duration) {
+    try {
+      synchronized (waitLock) {
+        waitLock.wait(duration);
+      }
+    } catch (InterruptedException ie) {}
+  }
+
+  static String getTaskSignalParameter(boolean isMap) {
+    return isMap 
+           ? "test.mapred.map.waiting.target" 
+           : "test.mapred.reduce.waiting.target";
+  }
+
+
+  static class WaitingMapper 
+  extends MapReduceBase 
+  implements Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+
+    FileSystem fs = null;
+    Path signal;
+    int id = 0;
+    int totalMaps = 0;
+
+    /**
+     * Checks if the map task needs to wait. By default all the maps will wait.
+     * This method needs to be overridden to make a custom waiting mapper. 
+     */
+    public boolean shouldWait(int id) {
+      return true;
+    }
+    
+    /**
+     * Returns a signal file on which the map task should wait. By default all 
+     * the maps wait on a single file passed as test.mapred.map.waiting.target.
+     * This method needs to be overridden to make a custom waiting mapper
+     */
+    public Path getSignalFile(int id) {
+      return signal;
+    }
+    
+    /** The waiting function.  The map exits once it gets a signal. Here the 
+     * signal is the file existence. 
+     */
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      if (shouldWait(id)) {
+        if (fs != null) {
+          while (!fs.exists(getSignalFile(id))) {
+            try {
+              reporter.progress();
+              synchronized (this) {
+                this.wait(1000); // wait for 1 sec
+              }
+            } catch (InterruptedException ie) {
+              System.out.println("Interrupted while the map was waiting for "
+                                 + " the signal.");
+              break;
+            }
+          }
+        } else {
+          throw new IOException("Could not get the DFS!!");
+        }
+      }
+    }
+
+    public void configure(JobConf conf) {
+      try {
+        String taskId = conf.get("mapred.task.id");
+        id = Integer.parseInt(taskId.split("_")[4]);
+        totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
+        fs = FileSystem.get(conf);
+        signal = new Path(conf.get(getTaskSignalParameter(true)));
+      } catch (IOException ioe) {
+        System.out.println("Got an exception while obtaining the filesystem");
+      }
+    }
+  }
+
+
 }
