Index: src/test/org/apache/hadoop/mapred/TestCounters.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestCounters.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestCounters.java	(working copy)
@@ -67,9 +67,9 @@
   }
   
   public void testCounters() throws IOException {
-    Enum[] keysWithResource = {Task.FileSystemCounter.HDFS_READ, 
-                               Task.Counter.MAP_INPUT_BYTES, 
-                               Task.Counter.MAP_OUTPUT_BYTES};
+    Enum[] keysWithResource = {TaskFileSystemCounter.HDFS_READ, 
+                               TaskCounter.MAP_INPUT_BYTES, 
+                               TaskCounter.MAP_OUTPUT_BYTES};
     
     Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};
     
Index: src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java	(working copy)
@@ -78,9 +78,9 @@
       assertEquals("number of cleanups", 2, reports.length);
       Counters counters = ret.job.getCounters();
       assertEquals("number of map inputs", 3, 
-                   counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
+                   counters.getCounter(TaskCounter.MAP_INPUT_RECORDS));
       assertEquals("number of reduce outputs", 9, 
-                   counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
+                   counters.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS));
       runCustomFormats(mr);
     } finally {
       if (mr != null) { mr.shutdown(); }
Index: src/test/org/apache/hadoop/mapred/TestBadRecords.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestBadRecords.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestBadRecords.java	(working copy)
@@ -112,25 +112,25 @@
     
     //validate counters
     Counters counters = runningJob.getCounters();
-    assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_SKIPPED_RECORDS).
         getCounter(),mapperBadRecords.size());
     
     int mapRecs = input.size() - mapperBadRecords.size();
-    assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).
         getCounter(),mapRecs);
-    assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).
         getCounter(),mapRecs);
     
     int redRecs = mapRecs - redBadRecords.size();
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_SKIPPED_RECORDS).
         getCounter(),redBadRecords.size());
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_SKIPPED_GROUPS).
         getCounter(),redBadRecords.size());
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).
         getCounter(),redRecs);
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).
         getCounter(),redRecs);
-    assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
+    assertEquals(counters.findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).
         getCounter(),redRecs);
     
     //validate skipped records
Index: src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java	(working copy)
@@ -81,12 +81,12 @@
     RunningJob job = launchJob(jobConf, in, out, numMaps, jobName);
     Counters counters = job.getCounters();
     assertEquals("Number of local maps", 
-            counters.getCounter(JobInProgress.Counter.OTHER_LOCAL_MAPS), otherLocalMaps);
+            counters.getCounter(JobCounter.OTHER_LOCAL_MAPS), otherLocalMaps);
     assertEquals("Number of Data-local maps", 
-            counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 
+            counters.getCounter(JobCounter.DATA_LOCAL_MAPS), 
                                 dataLocalMaps);
     assertEquals("Number of Rack-local maps", 
-            counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 
+            counters.getCounter(JobCounter.RACK_LOCAL_MAPS), 
                                 rackLocalMaps);
     mr.waitUntilIdle();
     mr.shutdown();
Index: src/test/org/apache/hadoop/mapred/TestReduceFetch.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestReduceFetch.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestReduceFetch.java	(working copy)
@@ -35,8 +35,8 @@
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.HDFS_WRITE;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.LOCAL_READ;
+import static org.apache.hadoop.mapred.TaskFileSystemCounter.HDFS_WRITE;
+import static org.apache.hadoop.mapred.TaskFileSystemCounter.LOCAL_READ;
 
 public class TestReduceFetch extends TestCase {
 
Index: src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
===================================================================
--- src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(revision 695860)
+++ src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java	(working copy)
@@ -202,9 +202,9 @@
     assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
     Counters counters = result.job.getCounters();
     long hdfsRead = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_READ).getCounter();
+      counters.findCounter(TaskFileSystemCounter.HDFS_READ).getCounter();
     long hdfsWrite = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_WRITE).getCounter();
+      counters.findCounter(TaskFileSystemCounter.HDFS_WRITE).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
     assertEquals(input.length(), hdfsRead);
 
Index: src/mapred/org/apache/hadoop/mapred/TaskFileSystemCounter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskFileSystemCounter.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/TaskFileSystemCounter.java	(revision 0)
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+
+/**
+ * {@link Counter}s to measure the usage of the different file systems.
+ */
+public enum TaskFileSystemCounter {
+  LOCAL_READ, LOCAL_WRITE, 
+  HDFS_READ, HDFS_WRITE, 
+  S3_READ, S3_WRITE,
+  KFS_READ, KFSWRITE
+}
\ No newline at end of file
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 695860)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -144,16 +144,6 @@
   private long inputLength = 0;
   private long maxVirtualMemoryForTask;
   
-  // Per-job counters
-  public static enum Counter { 
-    NUM_FAILED_MAPS, 
-    NUM_FAILED_REDUCES,
-    TOTAL_LAUNCHED_MAPS,
-    TOTAL_LAUNCHED_REDUCES,
-    OTHER_LOCAL_MAPS,
-    DATA_LOCAL_MAPS,
-    RACK_LOCAL_MAPS
-  }
   private Counters jobCounters = new Counters();
   
   private MetricsRecord jobMetrics;
@@ -938,13 +928,13 @@
     if (tip.isMapTask()) {
       ++runningMapTasks;
       name = Values.MAP.name();
-      counter = Counter.TOTAL_LAUNCHED_MAPS;
+      counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
-      counter = Counter.TOTAL_LAUNCHED_REDUCES;
+      counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
     }
@@ -995,17 +985,17 @@
       switch (level) {
       case 0 :
         LOG.info("Choosing data-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+        jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
         break;
       case 1:
         LOG.info("Choosing rack-local task " + tip.getTIPId());
-        jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+        jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
         break;
       default :
         // check if there is any locality
         if (level != this.maxLevel) {
           LOG.info("Choosing cached task at level " + level + tip.getTIPId());
-          jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
+          jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
         }
         break;
       }
@@ -1933,9 +1923,9 @@
       //
       if (!tip.isCleanupTask()) {
         if (tip.isMapTask()) {
-          jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+          jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1);
         } else {
-          jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+          jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1);
         }
       }
     }
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 695860)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -57,36 +57,6 @@
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
 
-  // Counters used by Task subclasses
-  protected static enum Counter { 
-    MAP_INPUT_RECORDS, 
-    MAP_OUTPUT_RECORDS,
-    MAP_SKIPPED_RECORDS,
-    MAP_INPUT_BYTES, 
-    MAP_OUTPUT_BYTES,
-    COMBINE_INPUT_RECORDS,
-    COMBINE_OUTPUT_RECORDS,
-    REDUCE_INPUT_GROUPS,
-    REDUCE_INPUT_RECORDS,
-    REDUCE_OUTPUT_RECORDS,
-    REDUCE_SKIPPED_GROUPS,
-    REDUCE_SKIPPED_RECORDS
-  }
-  
-  /**
-   * Counters to measure the usage of the different file systems.
-   */
-  protected static enum FileSystemCounter {
-    LOCAL_READ, LOCAL_WRITE, 
-    HDFS_READ, HDFS_WRITE, 
-    S3_READ, S3_WRITE,
-    KFS_READ, KFSWRITE
-  }
-
-  ///////////////////////////////////////////////////////////
-  // Helper methods to construct task-output paths
-  ///////////////////////////////////////////////////////////
-  
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
@@ -489,11 +459,11 @@
     private FileSystem.Statistics stats;
     private Counters.Counter readCounter = null;
     private Counters.Counter writeCounter = null;
-    private FileSystemCounter read;
-    private FileSystemCounter write;
+    private TaskFileSystemCounter read;
+    private TaskFileSystemCounter write;
 
-    FileSystemStatisticUpdater(FileSystemCounter read,
-                               FileSystemCounter write,
+    FileSystemStatisticUpdater(TaskFileSystemCounter read,
+                               TaskFileSystemCounter write,
                                Class<? extends FileSystem> cls) {
       stats = FileSystem.getStatistics(cls);
       this.read = read;
@@ -527,20 +497,20 @@
      new ArrayList<FileSystemStatisticUpdater>();
   {
     statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
-                                      FileSystemCounter.LOCAL_WRITE,
+      (new FileSystemStatisticUpdater(TaskFileSystemCounter.LOCAL_READ,
+                                      TaskFileSystemCounter.LOCAL_WRITE,
                                       RawLocalFileSystem.class));
     statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
-                                      FileSystemCounter.HDFS_WRITE,
+      (new FileSystemStatisticUpdater(TaskFileSystemCounter.HDFS_READ,
+                                      TaskFileSystemCounter.HDFS_WRITE,
                                       DistributedFileSystem.class));
     statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
-                                    FileSystemCounter.KFSWRITE,
+    (new FileSystemStatisticUpdater(TaskFileSystemCounter.KFS_READ,
+                                    TaskFileSystemCounter.KFSWRITE,
                                     KosmosFileSystem.class));
     statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
-                                    FileSystemCounter.S3_WRITE,
+    (new FileSystemStatisticUpdater(TaskFileSystemCounter.S3_READ,
+                                    TaskFileSystemCounter.S3_WRITE,
                                     S3FileSystem.class));
   }
 
Index: src/mapred/org/apache/hadoop/mapred/Task_FileSystemCounter.properties (deleted)
===================================================================
Index: src/mapred/org/apache/hadoop/mapred/ReduceTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(revision 695860)
+++ src/mapred/org/apache/hadoop/mapred/ReduceTask.java	(working copy)
@@ -115,15 +115,15 @@
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
   private Counters.Counter reduceCombineInputCounter =
-    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
-    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
+    getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
 
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
@@ -267,9 +267,9 @@
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
+         getCounters().findCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
-         getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+         getCounters().findCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
        this.keyClass = keyClass;
        this.valClass = valClass;
        skipIt = getFailedRanges().skipRangeIterator();
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (deleted)
===================================================================
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 695860)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.COMBINE_OUTPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.COMBINE_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.COMBINE_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_INPUT_BYTES;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.TaskCounter.MAP_OUTPUT_RECORDS;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -209,7 +209,7 @@
         TaskUmbilicalProtocol umbilical) throws IOException{
       super(raw,counters);
       this.umbilical = umbilical;
-      this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
+      this.skipRecCounter = counters.findCounter(TaskCounter.MAP_SKIPPED_RECORDS);
       skipIt = getFailedRanges().skipRangeIterator();
     }
     
Index: src/mapred/org/apache/hadoop/mapred/JobCounter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobCounter.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/JobCounter.java	(revision 0)
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+
+/**
+ * Per-job {@link Counter}s.
+ */
+public enum JobCounter { 
+  NUM_FAILED_MAPS, 
+  NUM_FAILED_REDUCES,
+  TOTAL_LAUNCHED_MAPS,
+  TOTAL_LAUNCHED_REDUCES,
+  OTHER_LOCAL_MAPS,
+  DATA_LOCAL_MAPS,
+  RACK_LOCAL_MAPS
+}
\ No newline at end of file
Index: src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (deleted)
===================================================================
Index: src/mapred/org/apache/hadoop/mapred/TaskCounter.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskCounter.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/TaskCounter.java	(revision 0)
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+
+/**
+ * {@link Counter}s for data processing metrics.
+ */
+public enum TaskCounter { 
+  MAP_INPUT_RECORDS, 
+  MAP_OUTPUT_RECORDS,
+  MAP_SKIPPED_RECORDS,
+  MAP_INPUT_BYTES, 
+  MAP_OUTPUT_BYTES,
+  COMBINE_INPUT_RECORDS,
+  COMBINE_OUTPUT_RECORDS,
+  REDUCE_INPUT_GROUPS,
+  REDUCE_INPUT_RECORDS,
+  REDUCE_OUTPUT_RECORDS,
+  REDUCE_SKIPPED_GROUPS,
+  REDUCE_SKIPPED_RECORDS
+}
\ No newline at end of file
