Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 694475)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -177,7 +177,6 @@
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
-  private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -923,13 +922,8 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time, wakes up if a task is finished.
-          synchronized(finishedCount) {
-            if (finishedCount[0] == 0) {
-              finishedCount.wait(waitTime);
-            }
-            finishedCount[0] = 0;
-          }
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
         }
 
         // If the TaskTracker is just starting up:
@@ -2261,10 +2255,6 @@
       if (!commitPending) {
         tip.taskFinished();
       }
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
Index: src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(revision 694475)
+++ src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(working copy)
@@ -18,8 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -78,7 +78,9 @@
       throws IOException {
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    final int numTaskTrackers = clusterStatus.getTaskTrackers();
+    final int clusterMapSlots = clusterStatus.getMaxMapTasks();
+    final int clusterReduceSlots = clusterStatus.getMaxReduceTasks();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
@@ -86,85 +88,74 @@
     //
     // Get map + reduce counts for the current tracker.
     //
-    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
-    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
-    int numMaps = taskTracker.countMapTasks();
-    int numReduces = taskTracker.countReduceTasks();
+    final int trackerMapSlots = taskTracker.getMaxMapTasks();
+    final int trackerReduceSlots = taskTracker.getMaxReduceTasks();
+    final int numMaps = taskTracker.countMapTasks();
+    final int numReduces = taskTracker.countReduceTasks();
 
+    // Assigned tasks
+    List<Task> assignedTasks = new ArrayList<Task>();
+
     //
-    // Compute average map and reduce task numbers across pool
+    // Compute (running + pending) map and reduce task numbers across pool
     //
     int remainingReduceLoad = 0;
     int remainingMapLoad = 0;
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-          int totalMapTasks = job.desiredMaps();
-          int totalReduceTasks = job.desiredReduces();
-          remainingMapLoad += (totalMapTasks - job.finishedMaps());
-          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
+          remainingReduceLoad += (job.desiredReduces() - job.finishedReduces());
         }
       }
     }
 
-    // find out the maximum number of maps or reduces that we are willing
-    // to run on any node.
-    int maxMapLoad = 0;
-    int maxReduceLoad = 0;
-    if (numTaskTrackers > 0) {
-      maxMapLoad = Math.min(maxCurrentMapTasks,
-                            (int) Math.ceil((double) remainingMapLoad / 
-                                            numTaskTrackers));
-      maxReduceLoad = Math.min(maxCurrentReduceTasks,
-                               (int) Math.ceil((double) remainingReduceLoad
-                                               / numTaskTrackers));
+    // Compute the 'load factor' for maps and reduces
+    double mapLoadFactor = 0.0;
+    if (clusterMapSlots > 0) {
+      mapLoadFactor = (double)remainingMapLoad / clusterMapSlots;
     }
+    double reduceLoadFactor = 0;
+    if (clusterReduceSlots > 0) {
+      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceSlots;
+    }
         
-    int totalMaps = clusterStatus.getMapTasks();
-    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
-    int totalReduces = clusterStatus.getReduceTasks();
-    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
-
     //
-    // In the below steps, we allocate first a map task (if appropriate),
-    // and then a reduce task if appropriate.  We go through all jobs
+    // In the below steps, we allocate first map tasks (if appropriate),
+    // and then reduce tasks if appropriate.  We go through all jobs
     // in order of job arrival; jobs only get serviced if their 
     // predecessors are serviced, too.
     //
 
     //
-    // We hand a task to the current taskTracker if the given machine 
+    // We assign tasks to the current taskTracker if the given machine 
     // has a workload that's less than the maximum load of that kind of
     // task.
+    // However, if the cluster is close to getting loaded i.e. we don't
+    // have enough _padding_ for speculative executions etc., we only 
+    // schedule the "highest priority" task i.e. the task from the job 
+    // with the highest priority.
     //
-       
-    if (numMaps < maxMapLoad) {
-
-      int totalNeededMaps = 0;
+    
+    int trackerMaxMapSlots = 
+      Math.min((int)Math.ceil(mapLoadFactor*trackerMapSlots), trackerMapSlots);
+    int availableMapSlots = trackerMaxMapSlots - numMaps;
+    if (availableMapSlots > 1 && 
+        exceededPadding(true, clusterStatus, trackerMapSlots)) {
+      availableMapSlots = 1;
+    }
+    for (int i=0; i < availableMapSlots; ++i) {
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
             continue;
           }
 
-          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
-              taskTrackerManager.getNumberOfUniqueHosts());
+          Task t = 
+            job.obtainNewMapTask(taskTracker, numTaskTrackers,
+                                 taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
-            return Collections.singletonList(t);
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededMaps += job.desiredMaps();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = Math.min(maxCurrentMapTasks,
-                               (int)(totalNeededMaps * padFraction));
-          }
-          if (totalMaps + padding >= totalMapTaskCapacity) {
+            assignedTasks.add(t);
             break;
           }
         }
@@ -174,9 +165,15 @@
     //
     // Same thing, but for reduce tasks
     //
-    if (numReduces < maxReduceLoad) {
-
-      int totalNeededReduces = 0;
+    final int trackerMaxReduceSlots = 
+      Math.min((int)Math.ceil(reduceLoadFactor*trackerReduceSlots), 
+               trackerReduceSlots);
+    int availableReduceSlots = trackerMaxReduceSlots - numReduces;
+    if (availableReduceSlots > 1 && 
+        exceededPadding(false, clusterStatus, trackerReduceSlots)) {
+      availableReduceSlots = 1;
+    }
+    for (int i=0; i < availableReduceSlots; ++i) {
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
@@ -184,31 +181,64 @@
             continue;
           }
 
-          Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
-              taskTrackerManager.getNumberOfUniqueHosts());
+          Task t = 
+            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+                                    taskTrackerManager.getNumberOfUniqueHosts()
+                                   );
           if (t != null) {
-            return Collections.singletonList(t);
-          }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededReduces += job.desiredReduces();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = 
-              Math.min(maxCurrentReduceTasks,
-                       (int) (totalNeededReduces * padFraction));
-          }
-          if (totalReduces + padding >= totalReduceTaskCapacity) {
+            assignedTasks.add(t);
             break;
           }
         }
       }
     }
-    return null;
+
+    return assignedTasks;
   }
 
+  private boolean exceededPadding(boolean isMapTask, 
+                                  ClusterStatus clusterStatus, 
+                                  int maxTaskTrackerSlots) { 
+    int numTaskTrackers = clusterStatus.getTaskTrackers();
+    int totalTasks = 
+      (isMapTask) ? clusterStatus.getMapTasks() : 
+                    clusterStatus.getReduceTasks();
+    int totalTaskCapacity = 
+      isMapTask ? clusterStatus.getMaxMapTasks() : 
+                  clusterStatus.getMaxReduceTasks();
+  
+    Collection<JobInProgress> jobQueue =
+      jobQueueJobInProgressListener.getJobQueue();
+
+    boolean exceededPadding = false;
+    synchronized (jobQueue) {
+      int totalNeededTasks = 0;
+      for (JobInProgress job : jobQueue) {
+        if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+            job.numReduceTasks == 0) {
+          continue;
+        }
+
+        //
+        // Beyond the highest-priority task, reserve a little 
+        // room for failures and speculative executions; don't 
+        // schedule tasks to the hilt.
+        //
+        totalNeededTasks += job.desiredReduces();
+        int padding = 0;
+        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+          padding = 
+            Math.min(maxTaskTrackerSlots,
+                     (int) (totalNeededTasks * padFraction));
+        }
+        if (totalTasks + padding >= totalTaskCapacity) {
+          exceededPadding = true;
+          break;
+        }
+      }
+    }
+    
+    return exceededPadding;
+  }
+  
 }
