Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 681764)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -163,7 +163,6 @@
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
-  private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -901,13 +900,8 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time, wakes up if a task is finished.
-          synchronized(finishedCount) {
-            if (finishedCount[0] == 0) {
-              finishedCount.wait(waitTime);
-            }
-            finishedCount[0] = 0;
-          }
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
         }
 
         // If the TaskTracker is just starting up:
@@ -2075,10 +2069,6 @@
     }
     if (tip != null) {
       tip.taskFinished();
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
Index: src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(revision 681764)
+++ src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(working copy)
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -84,6 +85,10 @@
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
+    int totalFreeMapSlots = 
+      clusterStatus.getMaxMapTasks() -  clusterStatus.getMapTasks();
+    int totalFreeReduceSlots = 
+      clusterStatus.getMaxReduceTasks() - clusterStatus.getReduceTasks();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
@@ -103,14 +108,16 @@
     // find out the maximum number of maps or reduces that we are willing
     // to run on any node.
     int maxMapLoad = 0;
-    int maxReduceLoad = 0;
-    if (numTaskTrackers > 0) {
+    if (totalFreeMapSlots > 0) {
       maxMapLoad = Math.min(maxCurrentMapTasks,
                             (int) Math.ceil((double) remainingMapLoad / 
-                                            numTaskTrackers));
+                                            totalFreeMapSlots));
+    }
+    int maxReduceLoad = 0;
+    if (totalFreeReduceSlots > 0) {
       maxReduceLoad = Math.min(maxCurrentReduceTasks,
                                (int) Math.ceil((double) remainingReduceLoad
-                                               / numTaskTrackers));
+                                               / totalFreeReduceSlots));
     }
         
     //
@@ -137,36 +144,41 @@
     // has a workload that's less than the maximum load of that kind of
     // task.
     //
-       
-    if (numMaps < maxMapLoad) {
+    List<Task> assignedTasks = new ArrayList<Task>();
+    
+    int mapSlots = totalMapTaskCapacity - numMaps;
+    for (int slot=0; slot < mapSlots; ++slot) {
+      if (numMaps < maxMapLoad) {
+        int totalNeededMaps = 0;
+        synchronized (jobQueue) {
+          for (JobInProgress job : jobQueue) {
+            if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+              continue;
+            }
 
-      int totalNeededMaps = 0;
-      synchronized (jobQueue) {
-        for (JobInProgress job : jobQueue) {
-          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-            continue;
-          }
+            Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+                taskTrackerManager.getNumberOfUniqueHosts());
+            if (t != null) {
+              assignedTasks.add(t);
+              ++numMaps;
+            }
 
-          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
-              taskTrackerManager.getNumberOfUniqueHosts());
-          if (t != null) {
-            return Collections.singletonList(t);
+            //
+            // Beyond the highest-priority task, reserve a little 
+            // room for failures and speculative executions; don't 
+            // schedule tasks to the hilt.
+            //
+            totalNeededMaps += job.desiredMaps();
+            int padding = 0;
+            if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+              padding = Math.min(maxCurrentMapTasks,
+                  (int)(totalNeededMaps * padFraction));
+            }
+            if (totalMaps + padding >= totalMapTaskCapacity) {
+              maxMapLoad = 0;               // break out of the outer loop too!
+              break;
+            }
           }
-
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededMaps += job.desiredMaps();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = Math.min(maxCurrentMapTasks,
-                               (int)(totalNeededMaps * padFraction));
-          }
-          if (totalMaps + padding >= totalMapTaskCapacity) {
-            break;
-          }
         }
       }
     }
@@ -174,41 +186,46 @@
     //
     // Same thing, but for reduce tasks
     //
-    if (numReduces < maxReduceLoad) {
+    int reduceSlots = totalReduceTaskCapacity - numReduces;
+    for (int slot=0; slot < reduceSlots; ++slot) {
+      if (numReduces < maxReduceLoad) {
 
-      int totalNeededReduces = 0;
-      synchronized (jobQueue) {
-        for (JobInProgress job : jobQueue) {
-          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
-              job.numReduceTasks == 0) {
-            continue;
-          }
+        int totalNeededReduces = 0;
+        synchronized (jobQueue) {
+          for (JobInProgress job : jobQueue) {
+            if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+                job.numReduceTasks == 0) {
+              continue;
+            }
 
-          Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
-              taskTrackerManager.getNumberOfUniqueHosts());
-          if (t != null) {
-            return Collections.singletonList(t);
-          }
+            Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+                taskTrackerManager.getNumberOfUniqueHosts());
+            if (t != null) {
+              assignedTasks.add(t);
+              ++numReduces;
+            }
 
-          //
-          // Beyond the highest-priority task, reserve a little 
-          // room for failures and speculative executions; don't 
-          // schedule tasks to the hilt.
-          //
-          totalNeededReduces += job.desiredReduces();
-          int padding = 0;
-          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
-            padding = 
-              Math.min(maxCurrentReduceTasks,
-                       (int) (totalNeededReduces * padFraction));
+            //
+            // Beyond the highest-priority task, reserve a little 
+            // room for failures and speculative executions; don't 
+            // schedule tasks to the hilt.
+            //
+            totalNeededReduces += job.desiredReduces();
+            int padding = 0;
+            if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+              padding = 
+                Math.min(maxCurrentReduceTasks,
+                    (int) (totalNeededReduces * padFraction));
+            }
+            if (totalReduces + padding >= totalReduceTaskCapacity) {
+              maxReduceLoad = 0;             // break out of the outer loop too!
+              break;
+            }
           }
-          if (totalReduces + padding >= totalReduceTaskCapacity) {
-            break;
-          }
         }
       }
     }
-    return null;
+    return assignedTasks;
   }
 
 }
