Index: conf/hadoop-default.xml
===================================================================
--- conf/hadoop-default.xml	(revision 778915)
+++ conf/hadoop-default.xml	(working copy)
@@ -1548,4 +1548,32 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.max.maps.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.max.reduces.per.node</name>
+  <value>-1</value>
+  <description>Per-node limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.map.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running map tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
+<property>
+  <name>mapred.running.reduce.limit</name>
+  <value>-1</value>
+  <description>Cluster-wide limit on running reduce tasks for the job. A value
+    of -1 signifies no limit.</description>
+</property>
+
 </configuration>
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 778915)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -80,6 +80,12 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
+  // Limits on concurrent running tasks per-node and cluster-wide
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  private int runningMapLimit;
+  private int runningReduceLimit;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -236,6 +242,11 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerNode = conf.getMaxMapsPerNode();
+    this.maxReducesPerNode = conf.getMaxReducesPerNode();
+    this.runningMapLimit = conf.getRunningMapLimit();
+    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -1579,6 +1590,25 @@
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
+    // Check cluster-wide limit on running maps
+    if (runningMapLimit != -1 && runningMapTasks >= runningMapLimit) {
+      return -1;
+    }
+    
+    // Check per-node limit on running maps
+    if (maxMapsPerNode != -1) {
+      int runningMapsOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningMapsOnNode++;
+        }
+      }
+      if (runningMapsOnNode >= maxMapsPerNode) {
+        return -1;
+      }
+    }
+    
     //
     // Update the last-known clusterSize
     //
@@ -1769,7 +1799,26 @@
                                              double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
+
+    // Check cluster-wide limit on running reduces
+    if (runningReduceLimit != -1 && runningReduceTasks >= runningReduceLimit) {
+      return -1;
+    }
     
+    // Check per-node limit on running reduces
+    if (maxReducesPerNode != -1) {
+      int runningReducesOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && !ts.getIsMap() &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningReducesOnNode++;
+        }
+      }
+      if (runningReducesOnNode >= maxReducesPerNode) {
+        return -1;
+      }
+    }
+    
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
 
Index: src/mapred/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobConf.java	(revision 778915)
+++ src/mapred/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -1385,6 +1385,78 @@
     set("mapred.job.queue.name", queueName);
   }
   
+  /**
+   * Get the per-node limit on running maps for the job
+   * 
+   * @return per-node running map limit
+   */
+  public int getMaxMapsPerNode() {
+    return getInt("mapred.max.maps.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running maps for the job
+   * 
+   * @param limit per-node running map limit
+   */
+  public void setMaxMapsPerNode(int limit) {
+    setInt("mapred.max.maps.per.node", limit);
+  }
+  
+  /**
+   * Get the per-node limit on running reduces for the job
+   * 
+   * @return per-node running reduce limit
+   */
+  public int getMaxReducesPerNode() {
+    return getInt("mapred.max.reduces.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running reduces for the job
+   * 
+   * @param limit per-node running reduce limit
+   */
+  public void setMaxReducesPerNode(int limit) {
+    setInt("mapred.max.reduces.per.node", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running maps for the job
+   * 
+   * @return cluster-wide running map limit
+   */
+  public int getRunningMapLimit() {
+    return getInt("mapred.running.map.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running maps for the job
+   * 
+   * @param limit cluster-wide running map limit
+   */
+  public void setRunningMapLimit(int limit) {
+    setInt("mapred.running.map.limit", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running reduces for the job
+   * 
+   * @return cluster-wide running reduce limit
+   */
+  public int getRunningReduceLimit() {
+    return getInt("mapred.running.reduce.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running reduces for the job
+   * 
+   * @param limit cluster-wide running reduce limit
+   */
+  public void setRunningReduceLimit(int limit) {
+    setInt("mapred.running.reduce.limit", limit);
+  }
+  
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
