Index: src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java	(revision 708984)
+++ src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java	(working copy)
@@ -53,6 +53,18 @@
     }
 
     @Override
+    public List<Task> obtainNewMapTasks(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      Task task = obtainNewMapTask(tts, clusterSize, ignored);
+      if (task == null) {
+        return null;
+      }
+      List<Task> tlist =  new ArrayList<Task>(1);
+      tlist.add(task);
+      return tlist;
+    }
+
+    @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 708984)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -150,6 +150,8 @@
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
   private long maxVirtualMemoryForTask;
+  private int maxSplitsPerMapper = 1;
+  private int rackLevel = 1;                    // XXX Rack local
   
   // Per-job counters
   public static enum Counter { 
@@ -246,6 +248,7 @@
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
     this.maxVirtualMemoryForTask = conf.getMaxVirtualMemoryForTask();
+    this.maxSplitsPerMapper = conf.getMaxSplitsPerMapper();
   }
 
   /**
@@ -871,27 +874,53 @@
   /**
    * Return a MapTask, if appropriate, to run on the given tasktracker
    */
-  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
                                             int clusterSize, 
                                             int numUniqueHosts
                                            ) throws IOException {
+    List<Task> task = obtainNewMapTasks(tts, clusterSize, numUniqueHosts, 1);
+    assert task == null || task.size() == 1;
+    return task == null ? null : task.get(0);
+  }
+  /**
+   * Return a list of MapTask, if appropriate, to run on the given tasktracker
+   */
+  public synchronized List<Task> obtainNewMapTasks(TaskTrackerStatus tts,
+                                            int clusterSize, 
+                                            int numUniqueHosts) throws IOException {
+    return obtainNewMapTasks(tts, clusterSize, numUniqueHosts, maxSplitsPerMapper);
+  }
+
+  private synchronized List<Task> obtainNewMapTasks(TaskTrackerStatus tts,  // XXX
+                                            int clusterSize, 
+                                            int numUniqueHosts,
+                                            int splitsPerMap
+                                           ) throws IOException {
     if (status.getRunState() != JobStatus.RUNNING) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
         
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                status.mapProgress());
-    if (target == -1) {
+    int[] targets = findNewMapTask(tts, clusterSize, numUniqueHosts, 
+                                   status.mapProgress(), splitsPerMap);
+    if (targets == null) {
       return null;
     }
     
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+    List<Task> retval = new ArrayList<Task>();
+    for (int i = 0; i < targets.length; i++) {
+      int target = targets[i];
+      if (target == -1) {
+        break;
+      }
+      Task result = maps[target].getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+        retval.add(result);
+      }
     }
 
-    return result;
+    return retval;
   }    
 
   /**
@@ -1494,6 +1523,47 @@
     }
     return null;
   }
+
+  /**
+   * Add more splits to current map task. These splits have to be local to the
+   * specified node. Select a maximum of 
+   * @param tts The task tracker that is asking for a task
+   * @param node The machine on which the specified task tracker is running
+   * @param level The level of the current node in the cluster topology
+   * @param numResults items already filled up in results
+   * @param results Filled up on return with indices to map objects
+   */
+  void addMoreLocalSplits(TaskTrackerStatus tts, Node node, int level,        // XXX
+                          int numUniqueHosts, int numResults, int[] results) {
+    //
+    // if there are more tasks that are local to the same rack as the 
+    // specified node, then pick them. This is an optimization that helps
+    // when map-outputs are very small. We choose only from nonRunningMaps
+    // because it is ok top not trigger this optimization for already running
+    // tasks.
+    //
+    for (; level < rackLevel && numResults < results.length; ++level) {
+      List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(node);
+      if (cacheForLevel != null) {
+        TaskInProgress tip = findTaskFromList(cacheForLevel, tts, 
+                                 numUniqueHosts,level == 0);
+        if (tip != null) {
+          // Add to running cache
+          scheduleMap(tip);
+
+          // remove the cache if its empty
+          if (cacheForLevel.size() == 0) {
+            nonRunningMapCache.remove(node);
+          }
+
+          results[numResults++] = tip.getIdWithinJob();
+        }
+      }
+    }
+    if (numResults < results.length) {
+      results[numResults++] = -1;     // indicate end of list
+    }
+  }
   
   /**
    * Find new map task
@@ -1503,10 +1573,11 @@
    * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
-  private synchronized int findNewMapTask(TaskTrackerStatus tts, 
+  private synchronized int[] findNewMapTask(TaskTrackerStatus tts,  // XXX
                                           int clusterSize,
                                           int numUniqueHosts,
-                                          double avgProgress) {
+                                          double avgProgress,
+                                          int splitsPerMapper) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
@@ -1516,24 +1587,35 @@
     this.clusterSize = clusterSize;
 
     if (!shouldRunOnTaskTracker(taskTracker)) {
-      return -1;
+      return null;
     }
 
+    if (jobtracker == null) {
+      System.out.println("XXX JT");
+    }
+
     Node node = jobtracker.getNode(tts.getHost());
     Node nodeParentAtMaxLevel = null;
     
-
     long outSize = resourceEstimator.getEstimatedMapOutputSize();
     long availSpace = tts.getResourceStatus().getAvailableSpace();
-    if(availSpace < outSize) {
+
+    // trim the number of splits depending on the amount of free
+    // space on task tracker
+    if (availSpace < splitsPerMapper * outSize) {
+      splitsPerMapper = (int)Math.min(1L, availSpace/outSize);
+    }
+
+    if(availSpace < splitsPerMapper * outSize) {
       LOG.warn("No room for map task. Node " + node + 
                " has " + availSpace + 
                " bytes free; but we expect map to take " + outSize);
 
-      return -1; //see if a different TIP might work better. 
+      return null; //see if a different TIP might work better. 
     }
+    int[] results = new int[splitsPerMapper];
+    int numResults= 0;
     
-    
     // For scheduling a map task, we have two caches and a list (optional)
     //  I)   one for non-running task
     //  II)  one for running task (this is for handling speculation)
@@ -1572,7 +1654,10 @@
               nonRunningMapCache.remove(key);
             }
 
-            return tip.getIdWithinJob();
+            results[numResults++] = tip.getIdWithinJob();
+            addMoreLocalSplits(tts, key, level, numUniqueHosts, 
+                               numResults, results);
+            return results;
           }
         }
         key = key.getParent();
@@ -1609,7 +1694,8 @@
             nonRunningMapCache.remove(parent);
           }
           LOG.info("Choosing a non-local task " + tip.getTIPId());
-          return tip.getIdWithinJob();
+          results[numResults++] = tip.getIdWithinJob();
+          return results;
         }
       }
     }
@@ -1621,7 +1707,8 @@
       scheduleMap(tip);
 
       LOG.info("Choosing a non-local task " + tip.getTIPId());
-      return tip.getIdWithinJob();
+      results[numResults++] = tip.getIdWithinJob();
+      return results;
     }
 
     //
@@ -1643,7 +1730,8 @@
               if (cacheForLevel.size() == 0) {
                 runningMapCache.remove(key);
               }
-              return tip.getIdWithinJob();
+              results[numResults++] = tip.getIdWithinJob();
+              return results;
             }
           }
           key = key.getParent();
@@ -1669,7 +1757,8 @@
             }
             LOG.info("Choosing a non-local task " + tip.getTIPId() 
                      + " for speculation");
-            return tip.getIdWithinJob();
+            results[numResults++] = tip.getIdWithinJob();
+            return results;
           }
         }
       }
@@ -1680,10 +1769,11 @@
       if (tip != null) {
         LOG.info("Choosing a non-local task " + tip.getTIPId() 
                  + " for speculation");
-        return tip.getIdWithinJob();
+        results[numResults++] = tip.getIdWithinJob();
+        return results;
       }
     }
-    return -1;
+    return null;
   }
 
   /**
Index: src/mapred/org/apache/hadoop/mapred/JobConf.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobConf.java	(revision 708984)
+++ src/mapred/org/apache/hadoop/mapred/JobConf.java	(working copy)
@@ -1384,6 +1384,23 @@
   public void setQueueName(String queueName) {
     set("mapred.job.queue.name", queueName);
   }
+
+  /**
+   * The maximum number of splits than can be processed by a single mapper instance.
+   * 
+   * If this parameter is not set, then each mapper will process one
+   * input split.
+   * 
+   * If a single instance of a mapper is assigned multiple input splits to
+   * process, the guarantee is that each of those splits are local to the
+   * same rack on which the mapper is running.
+   * 
+   * @return The maximum number of splits than can be executed by a single 
+   *         mapper instance
+   */
+  int getMaxSplitsPerMapper() {
+    return getInt("mapred.job.maxsplits.per.mapper", 1);
+  }
   
   /** 
    * Find a jar that contains a class of the same name, if any.
Index: src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(revision 708984)
+++ src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java	(working copy)
@@ -147,10 +147,10 @@
             continue;
           }
 
-          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+          List<Task> t = job.obtainNewMapTasks(taskTracker, numTaskTrackers,
               taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
-            return Collections.singletonList(t);
+            return t;
           }
 
           //
Index: src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java	(revision 708984)
+++ src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java	(working copy)
@@ -138,8 +138,9 @@
             continue;
           }
           if (step == 0 || step == 2) {
-            task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
-                taskTrackerManager.getNumberOfUniqueHosts());
+            List<Task> tasklist = job.obtainNewMapTasks(taskTracker, numTaskTrackers,
+                                        taskTrackerManager.getNumberOfUniqueHosts());
+            return tasklist;
           }
           else {
             task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
