Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 770857)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -86,6 +86,12 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
+  // Limits on concurrent running tasks cluster-wide and per-node
+  private int maxMapsPerCluster;
+  private int maxReducesPerCluster;
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -267,6 +273,15 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerCluster = conf.getInt(
+        "mapred.max.maps.per.cluster", Integer.MAX_VALUE);
+    this.maxReducesPerCluster = conf.getInt(
+        "mapred.max.reduces.per.cluster", Integer.MAX_VALUE);
+    this.maxMapsPerNode = conf.getInt(
+        "mapred.max.maps.per.node", Integer.MAX_VALUE);
+    this.maxReducesPerNode = conf.getInt(
+        "mapred.max.reduces.per.node", Integer.MAX_VALUE);
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -1715,6 +1730,23 @@
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
+    // Check cluster-wide limit on running maps
+    if (runningMapTasks >= maxMapsPerCluster) {
+      return -1;
+    }
+    
+    // Check per-node limit on running maps
+    int runningMapsOnNode = 0;
+    for (TaskStatus ts: tts.getTaskReports()) {
+      if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() &&
+          ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+        runningMapsOnNode++;
+      }
+    }
+    if (runningMapsOnNode >= maxMapsPerNode) {
+      return -1;
+    }
+    
     //
     // Update the last-known clusterSize
     //
@@ -1919,7 +1951,24 @@
                                              double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
+
+    // Check cluster-wide limit on running reduces
+    if (runningReduceTasks >= maxReducesPerCluster) {
+      return -1;
+    }
     
+    // Check per-node limit on running reduces
+    int runningReducesOnNode = 0;
+    for (TaskStatus ts: tts.getTaskReports()) {
+      if (ts.getTaskID().getJobID().equals(jobId) && !ts.getIsMap() &&
+          ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+        runningReducesOnNode++;
+      }
+    }
+    if (runningReducesOnNode >= maxReducesPerNode) {
+      return -1;
+    }
+    
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
 
