Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 684317)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -165,7 +165,6 @@
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
-  private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -873,13 +872,8 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time, wakes up if a task is finished.
-          synchronized(finishedCount) {
-            if (finishedCount[0] == 0) {
-              finishedCount.wait(waitTime);
-            }
-            finishedCount[0] = 0;
-          }
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
         }
 
         // If the TaskTracker is just starting up:
@@ -2051,10 +2045,6 @@
     }
     if (tip != null) {
       tip.taskFinished();
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
     } else {
       LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
     }
Index: src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(revision 684317)
+++ src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(working copy)
@@ -18,8 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -84,50 +84,48 @@
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
+    int totalMaps = clusterStatus.getMapTasks();
+    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
+    int totalReduces = clusterStatus.getReduceTasks();
+    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
 
     Collection<JobInProgress> jobQueue =
       jobQueueJobInProgressListener.getJobQueue();
     synchronized (jobQueue) {
       for (JobInProgress job : jobQueue) {
         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-          int totalMapTasks = job.desiredMaps();
-          int totalReduceTasks = job.desiredReduces();
-          remainingMapLoad += (totalMapTasks - job.finishedMaps());
-          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
+          remainingReduceLoad += (job.desiredReduces() - job.finishedReduces());
         }
       }
     }
 
-    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
-    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
     // find out the maximum number of maps or reduces that we are willing
     // to run on any node.
     int maxMapLoad = 0;
+    if (totalMapTaskCapacity > 0) {
+      maxMapLoad = Math.max(1,
+                            (int) Math.ceil((double) remainingMapLoad / 
+                                            totalMapTaskCapacity));
+    }
     int maxReduceLoad = 0;
-    if (numTaskTrackers > 0) {
-      maxMapLoad = Math.min(maxCurrentMapTasks,
-                            (int) Math.ceil((double) remainingMapLoad / 
-                                            numTaskTrackers));
-      maxReduceLoad = Math.min(maxCurrentReduceTasks,
+    if (totalReduceTaskCapacity > 0) {
+      maxReduceLoad = Math.max(1,
                                (int) Math.ceil((double) remainingReduceLoad
-                                               / numTaskTrackers));
+                                               / totalReduceTaskCapacity));
     }
         
     //
     // Get map + reduce counts for the current tracker.
     //
-
     int numMaps = taskTracker.countMapTasks();
     int numReduces = taskTracker.countReduceTasks();
+    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
+    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
     
-    int totalMaps = clusterStatus.getMapTasks();
-    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
-    int totalReduces = clusterStatus.getReduceTasks();
-    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();
-
     //
-    // In the below steps, we allocate first a map task (if appropriate),
-    // and then a reduce task if appropriate.  We go through all jobs
+    // In the below steps, we allocate first map tasks (if appropriate),
+    // and then reduce tasks if appropriate.  We go through all jobs
     // in order of job arrival; jobs only get serviced if their 
     // predecessors are serviced, too.
     //
@@ -137,9 +135,13 @@
     // has a workload that's less than the maximum load of that kind of
     // task.
     //
-       
-    if (numMaps < maxMapLoad) {
-
+    List<Task> assignedTasks = new ArrayList<Task>();
+    
+    int trackerMaxMapLoad = 
+      Math.min(maxMapLoad * maxCurrentMapTasks, maxCurrentMapTasks);
+    int availableMapSlots = trackerMaxMapLoad - numMaps;
+    boolean exceededMapPadding = false;
+    for (int i=0; i < availableMapSlots; ++i) {
       int totalNeededMaps = 0;
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
@@ -150,7 +152,7 @@
           Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
               taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
-            return Collections.singletonList(t);
+            assignedTasks.add(t);
           }
 
           //
@@ -165,17 +167,25 @@
                                (int)(totalNeededMaps * padFraction));
           }
           if (totalMaps + padding >= totalMapTaskCapacity) {
+            exceededMapPadding = true;
             break;
           }
         }
+        
+        if (exceededMapPadding) {
+          break;
+        }
       }
     }
 
     //
     // Same thing, but for reduce tasks
     //
-    if (numReduces < maxReduceLoad) {
-
+    int trackerMaxReduceLoad = 
+      Math.min(maxReduceLoad * maxCurrentReduceTasks, maxCurrentReduceTasks);
+    int availableReduceSlots = trackerMaxReduceLoad - numReduces;
+    boolean exceededReducePadding = false;
+    for (int i=0; i < availableReduceSlots; ++i) {
       int totalNeededReduces = 0;
       synchronized (jobQueue) {
         for (JobInProgress job : jobQueue) {
@@ -187,7 +197,7 @@
           Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
               taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
-            return Collections.singletonList(t);
+            assignedTasks.add(t);
           }
 
           //
@@ -203,12 +213,18 @@
                        (int) (totalNeededReduces * padFraction));
           }
           if (totalReduces + padding >= totalReduceTaskCapacity) {
+            exceededReducePadding = true;
             break;
           }
         }
+        
+        if (exceededReducePadding) {
+          break;
+        }
       }
     }
-    return null;
+    
+    return assignedTasks;
   }
 
 }
