Index: src/test/org/apache/hadoop/mapred/TestCounters.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestCounters.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestCounters.java	(working copy)
@@ -67,8 +67,8 @@
   }
   
   public void testCounters() throws IOException {
-    Enum[] keysWithResource = {Task.Counter.MAP_INPUT_BYTES, 
-                               Task.Counter.MAP_OUTPUT_BYTES};
+    Enum[] keysWithResource = {TaskCounter.MAP_INPUT_BYTES, 
+                               TaskCounter.MAP_OUTPUT_BYTES};
     
     Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};
     
Index: src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(working copy)
@@ -76,9 +76,9 @@
       assertEquals("number of cleanups", 2, reports.length);
       Counters counters = ret.job.getCounters();
       assertEquals("number of map inputs", 3, 
-                   counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
+                   counters.getCounter(TaskCounter.MAP_INPUT_RECORDS));
       assertEquals("number of reduce outputs", 9, 
-                   counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
+                   counters.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS));
       runCustomFormats(mr);
     } finally {
       if (mr != null) { mr.shutdown(); }
Index: src/test/org/apache/hadoop/mapred/TestBadRecords.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestBadRecords.java	(working copy)
@@ -113,25 +113,25 @@
     
     //validate counters
     Counters counters = runningJob.getCounters();
-    assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_SKIPPED_RECORDS).
         getCounter(),mapperBadRecords.size());
     
     int mapRecs = input.size() - mapperBadRecords.size();
-    assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).
         getCounter(),mapRecs);
-    assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).
         getCounter(),mapRecs);
     
     int redRecs = mapRecs - redBadRecords.size();
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_SKIPPED_RECORDS).
         getCounter(),redBadRecords.size());
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_SKIPPED_GROUPS).
         getCounter(),redBadRecords.size());
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).
         getCounter(),redRecs);
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).
         getCounter(),redRecs);
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).
         getCounter(),redRecs);
     
     //validate skipped records
Index: src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java	(working copy)
@@ -81,12 +81,12 @@
     RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
     Counters counters = job.getCounters();
     assertEquals("Number of local maps", 
-            counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+            counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
     assertEquals("Number of Data-local maps", 
-            counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 
+            counters.getCounter(JobCounter.DATA_LOCAL_MAPS), 
                                 dataLocalMaps);
     assertEquals("Number of Rack-local maps", 
-            counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 
+            counters.getCounter(JobCounter.RACK_LOCAL_MAPS), 
                                 rackLocalMaps);
     mr.waitUntilIdle();
     mr.shutdown();
Index: src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java	(working copy)
@@ -47,7 +47,7 @@
 
   private void validateCounters(Counters counter, long spillRecCnt) {
       // Check if the numer of Spilled Records is same as expected
-      assertEquals(counter.findCounter(Task.Counter.SPILLED_RECORDS).
+      assertEquals(counter.findCounter(TaskCounter.SPILLED_RECORDS).
                      getCounter(), spillRecCnt);
   }
 
Index: src/test/org/apache/hadoop/mapred/TestJobInProgress.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestJobInProgress.java	(revision 722878)
+++ src/test/org/apache/hadoop/mapred/TestJobInProgress.java	(working copy)
@@ -133,8 +133,8 @@
       JobInProgress jip = jt.getJob(js.getJobID());
       Counters counter = jip.getJobCounters();
       long totalTaskCount = counter
-          .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS)
-          + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
+          .getCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+          + counter.getCounter(JobCounter.TOTAL_LAUNCHED_REDUCES);
       while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
         assertEquals(true, (jip.runningMaps() >= 0));
         assertEquals(true, (jip.pendingMaps() >= 0));
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -152,16 +152,6 @@
   private long maxVirtualMemoryForTask;
   private long maxPhysicalMemoryForTask;
   
-  // Per-job counters
-  public static enum Counter { 
-    NUM_FAILED_MAPS, 
-    NUM_FAILED_REDUCES,
-    TOTAL_LAUNCHED_MAPS,
-    TOTAL_LAUNCHED_REDUCES,
-    OTHER_LOCAL_MAPS,
-    DATA_LOCAL_MAPS,
-    RACK_LOCAL_MAPS
-  }
   private Counters jobCounters = new Counters();
   
   private MetricsRecord jobMetrics;
@@ -1119,14 +1109,14 @@
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
       name = Values.MAP.name();
-      counter = Counter.TOTAL_LAUNCHED_MAPS;
+      counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
-      counter = Counter.TOTAL_LAUNCHED_REDUCES;
+      counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
     }
@@ -1179,17 +1169,17 @@
       switch (level) {
       case 0 :
         LOG.info("Choosing data-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+        jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
         break;
       case 1:
         LOG.info("Choosing rack-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+        jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
         break;
       default :
         // check if there is any locality
         if (level != this.maxLevel) {
           LOG.info("Choosing cached task at level " + level + tip.getTIPId());
-          jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+          jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
         }
         break;
       }
@@ -2194,9 +2184,9 @@
       //
       if (!tip.isCleanupTask() && !tip.isSetupTask()) {
         if (tip.isMapTask()) {
-          jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+          jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1);
         } else {
-          jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+          jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1);
         }
       }
     }
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -55,23 +55,6 @@
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
 
-  // Counters used by Task subclasses
-  protected static enum Counter { 
-    MAP_INPUT_RECORDS, 
-    MAP_OUTPUT_RECORDS,
-    MAP_SKIPPED_RECORDS,
-    MAP_INPUT_BYTES, 
-    MAP_OUTPUT_BYTES,
-    COMBINE_INPUT_RECORDS,
-    COMBINE_OUTPUT_RECORDS,
-    REDUCE_INPUT_GROUPS,
-    REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS,
-    REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS,
-    SPILLED_RECORDS
-  }
-  
   /**
    * Counters to measure the usage of the different file systems.
    * Always return the String array with two elements. First one is the name of  
@@ -141,7 +124,7 @@
   public Task() {
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
-    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
+    spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition) {
@@ -158,7 +141,7 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     this.mapOutputFile.setJobId(taskId.getJobID());
-    spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
+    spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
   }
 
   ////////////////////////////////////////////
Index: src/mapred/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -108,15 +108,15 @@
   private Progress sortPhase;
   private Progress reducePhase;
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
   private Counters.Counter reduceCombineInputCounter =
-    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
-    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
 
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
@@ -263,9 +263,9 @@
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
+         getCounters().findCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+         getCounters().findCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties	(working copy)
@@ -1,12 +0,0 @@
-# ResourceBundle properties file for job-level counters
-
-CounterGroupName=              Job Counters 
-
-NUM_FAILED_MAPS.name=          Failed map tasks
-NUM_FAILED_REDUCES.name=       Failed reduce tasks
-TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
-TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
-OTHER_LOCAL_MAPS.name=         Other local map tasks
-DATA_LOCAL_MAPS.name=          Data-local map tasks
-RACK_LOCAL_MAPS.name=          Rack-local map tasks
-
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.COMBINE_OUTPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.COMBINE_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.COMBINE_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_INPUT_BYTES;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_OUTPUT_RECORDS;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -211,7 +211,7 @@
         TaskUmbilicalProtocol umbilical) throws IOException{
       super(raw,counters);
       this.umbilical = umbilical;
-      this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
+      this.skipRecCounter = counters.findCounter(TaskCounter.MAP_SKIPPED_RECORDS);
       this.toWriteSkipRecs = toWriteSkipRecs() &&  
         SkipBadRecords.getSkipOutputPath(conf)!=null;
       skipIt = getSkipRanges().skipRangeIterator();
Index: src/mapred/org/apache/hadoop/mapred/JobCounter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobCounter.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/JobCounter.java	(revision 0)
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * Per-job {@link Counter}s.
+ */
+public enum JobCounter { 
+  NUM_FAILED_MAPS, 
+  NUM_FAILED_REDUCES,
+  TOTAL_LAUNCHED_MAPS,
+  TOTAL_LAUNCHED_REDUCES,
+  OTHER_LOCAL_MAPS,
+  DATA_LOCAL_MAPS,
+  RACK_LOCAL_MAPS
+}
Index: src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(revision 722878)
+++ src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(working copy)
@@ -1,18 +0,0 @@
-# ResourceBundle properties file for Map-Reduce counters
-
-CounterGroupName=              Map-Reduce Framework
-
-MAP_INPUT_RECORDS.name=        Map input records
-MAP_INPUT_BYTES.name=          Map input bytes
-MAP_OUTPUT_RECORDS.name=       Map output records
-MAP_OUTPUT_BYTES.name=         Map output bytes
-MAP_SKIPPED_RECORDS.name=      Map skipped records
-COMBINE_INPUT_RECORDS.name=    Combine input records
-COMBINE_OUTPUT_RECORDS.name=   Combine output records
-REDUCE_INPUT_GROUPS.name=      Reduce input groups
-REDUCE_INPUT_RECORDS.name=     Reduce input records
-REDUCE_OUTPUT_RECORDS.name=    Reduce output records
-REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
-REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
-SPILLED_RECORDS.name=          Spilled Records
-
Index: src/mapred/org/apache/hadoop/mapred/TaskCounter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskCounter.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/TaskCounter.java	(revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * {@link Counter}s for data processing metrics.
+ */
+public enum TaskCounter { 
+  MAP_INPUT_RECORDS, 
+  MAP_OUTPUT_RECORDS,
+  MAP_SKIPPED_RECORDS,
+  MAP_INPUT_BYTES, 
+  MAP_OUTPUT_BYTES,
+  COMBINE_INPUT_RECORDS,
+  COMBINE_OUTPUT_RECORDS,
+  REDUCE_INPUT_GROUPS,
+  REDUCE_INPUT_RECORDS,
+  REDUCE_OUTPUT_RECORDS,
+  REDUCE_SKIPPED_GROUPS,
+  REDUCE_SKIPPED_RECORDS,
+  SPILLED_RECORDS
+}
