Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None

      Description

      It would be nice to have map / reduce job keep aggregated counts for arbitrary events occuring in its tasks – the numer of records processed, the numer of exceptions of a specific type, the number of sentences in passive voice, whatever the jobs finds useful.

      This can be implemented by tasks periodically sending <name, value> pairs to the jobtracker (in some implementations such messages are piggy-backed on the heartbeats), so that the job tracker stores all the latests values from each task and aggregates them on a request. It should also make the aggregated values available at the job end. The value for a task would be flushed when the task fails.

      #491 and #490 may be related to this one.

      1. counters3.patch
        62 kB
        David Bowen
      2. counters2.patch
        62 kB
        David Bowen
      3. counters1.patch
        58 kB
        David Bowen

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          The metrics API is designed for this:

          http://lucene.apache.org/hadoop/docs/api/org/apache/hadoop/metrics/package-summary.html

          Also, Reporter.setStatus() permits tasks to dynamically alter the string shown in the web ui (and available programatically).

          Show
          Doug Cutting added a comment - The metrics API is designed for this: http://lucene.apache.org/hadoop/docs/api/org/apache/hadoop/metrics/package-summary.html Also, Reporter.setStatus() permits tasks to dynamically alter the string shown in the web ui (and available programatically).
          Hide
          Owen O'Malley added a comment -

          I would propose that the JobConf defines a list of counters that is augmented with the "generic" ones like records, bytes, etc. The TaskTracker heart beat then pushes a list of longs with the status of each task as part of the heartbeats. These counters are visible as the job runs via the web/ui.

          So it would look like:
          job.setCounterList("foo,bar,baz");

          the Reporters pick up a new field:
          void addCounter(String counterName, long increment);

          Show
          Owen O'Malley added a comment - I would propose that the JobConf defines a list of counters that is augmented with the "generic" ones like records, bytes, etc. The TaskTracker heart beat then pushes a list of longs with the status of each task as part of the heartbeats. These counters are visible as the job runs via the web/ui. So it would look like: job.setCounterList("foo,bar,baz"); the Reporters pick up a new field: void addCounter(String counterName, long increment);
          Hide
          Doug Cutting added a comment -

          Isn't this redundant with the metrics API? Why do we need both?

          Show
          Doug Cutting added a comment - Isn't this redundant with the metrics API? Why do we need both?
          Hide
          Milind Bhandarkar added a comment -

          We need separate global counters because there is no way to programmatically add new metrics to the TaskMetrics class, which will (after I resubmit my TaskMetrics patch) be accumulated in JobTracker via heartbeats. OTOH, TaskMetrics could be implemented in terms of global counters.

          Show
          Milind Bhandarkar added a comment - We need separate global counters because there is no way to programmatically add new metrics to the TaskMetrics class, which will (after I resubmit my TaskMetrics patch) be accumulated in JobTracker via heartbeats. OTOH, TaskMetrics could be implemented in terms of global counters.
          Hide
          Owen O'Malley added a comment -

          In a lot of ways, this is a weaker form of HADOOP-48. The advantage of counters is that it is clear how to aggregate them across tasks to form a count for the entire job. We could do:

          public class JobCounters implements Writable

          { <methods to get/set generic counters for records/bytes, whatever> public void add(JobCounters other); }

          and in JobConf:
          public set/getJobCounterClass(...);

          and in Reporter add:
          public JobCounters getJobCounters();

          and the JobCounter is sent up as part of the heartbeat.

          Show
          Owen O'Malley added a comment - In a lot of ways, this is a weaker form of HADOOP-48 . The advantage of counters is that it is clear how to aggregate them across tasks to form a count for the entire job. We could do: public class JobCounters implements Writable { <methods to get/set generic counters for records/bytes, whatever> public void add(JobCounters other); } and in JobConf: public set/getJobCounterClass(...); and in Reporter add: public JobCounters getJobCounters(); and the JobCounter is sent up as part of the heartbeat.
          Hide
          Doug Cutting added a comment -

          For the third time: why can't we use the Metrics API here? This is precisely the sort of thing it was designed for, no?

          Milind says, "there is no way to programmatically add new metrics to the TaskMetrics class". Okay, so we shouldn't use TaskMetrics for this. But shouldn't we use a MetricsRecord?

          If we use MetricsRecord to collect metrics, then we need to decide how to aggregate these. We could use Ganglia, or we could aggregate over heartbeats, having the JobTracker and TaskTracker implement a MetricsContext.

          Show
          Doug Cutting added a comment - For the third time: why can't we use the Metrics API here? This is precisely the sort of thing it was designed for, no? Milind says, "there is no way to programmatically add new metrics to the TaskMetrics class". Okay, so we shouldn't use TaskMetrics for this. But shouldn't we use a MetricsRecord? If we use MetricsRecord to collect metrics, then we need to decide how to aggregate these. We could use Ganglia, or we could aggregate over heartbeats, having the JobTracker and TaskTracker implement a MetricsContext.
          Hide
          Doug Cutting added a comment -

          Both of these issues concern the propagation of metrics from the tasktracker, aggregated at the jobtracker. And both should probably be implemented using the existing metrics API.

          Show
          Doug Cutting added a comment - Both of these issues concern the propagation of metrics from the tasktracker, aggregated at the jobtracker. And both should probably be implemented using the existing metrics API.
          Hide
          arkady borkovsky added a comment -

          [[ Old comment, sent by email on Wed, 30 Aug 2006 16:40:37 -0700 ]]

          One of the intentions of Global Counters is for use in application code.
          E.g. if I count words in a the input, I'd like to know the total number
          of words, not just the count for each word.
          With vanilla MapReduce, I need a separate job to do the totals. Global
          Counter would let me to do this during the first job.

          Show
          arkady borkovsky added a comment - [[ Old comment, sent by email on Wed, 30 Aug 2006 16:40:37 -0700 ]] One of the intentions of Global Counters is for use in application code. E.g. if I count words in a the input, I'd like to know the total number of words, not just the count for each word. With vanilla MapReduce, I need a separate job to do the totals. Global Counter would let me to do this during the first job.
          Hide
          arkady borkovsky added a comment -

          [[ Old comment, sent by email on Thu, 31 Aug 2006 10:40:32 -0700 ]]

          It may be that Metrics API can be used for this purpose.

          However Metrics API is an "API for reporting performance metric
          information", while this proposal is more application oriented.
          I'd like to be able to something like this:
          – in main() of MapReduce job
          JobConf conf ;
          GlobalCounters ggg = conf.addGlobalCounter( "TotalWordCount");

          – in map() ,
          GlobalCounter totalWords = reporter.getGlobalCounter(
          "TotalWords");
          and whenever it processes a word
          totalWords.inc(1);

          – in the end of main(), after the job has completed.

          int totalWords = 0;
          for (int i=0; i < ggg.size(); i++)

          { totalWords += ggg.get(i); }

          (I've pretended the GlobalCounter s are always int)

          Currently, using vanilla MapReduce would require running two jobs –
          one to count the individual words, another to aggregate the counts (or
          to extract the aggregated counts from the output.)

          Although not shown in this example, I assume that ggg is updated real
          time, and main() can run a thread to monitor it while the tasks are
          running.
          Also it assumes that task failures and speculative execution are
          handled correctly.

          Show
          arkady borkovsky added a comment - [[ Old comment, sent by email on Thu, 31 Aug 2006 10:40:32 -0700 ]] It may be that Metrics API can be used for this purpose. However Metrics API is an "API for reporting performance metric information", while this proposal is more application oriented. I'd like to be able to something like this: – in main() of MapReduce job JobConf conf ; GlobalCounters ggg = conf.addGlobalCounter( "TotalWordCount"); – in map() , GlobalCounter totalWords = reporter.getGlobalCounter( "TotalWords"); and whenever it processes a word totalWords.inc(1); – in the end of main(), after the job has completed. int totalWords = 0; for (int i=0; i < ggg.size(); i++) { totalWords += ggg.get(i); } (I've pretended the GlobalCounter s are always int) Currently, using vanilla MapReduce would require running two jobs – one to count the individual words, another to aggregate the counts (or to extract the aggregated counts from the output.) Although not shown in this example, I assume that ggg is updated real time, and main() can run a thread to monitor it while the tasks are running. Also it assumes that task failures and speculative execution are handled correctly.
          Hide
          Doug Cutting added a comment -

          I talked to Owen about this last week. My concerns are:

          1. We should only instrument code once, for counters and for monitoring metrics.

          2. Users should be able to easily add new counters & metrics to their code that are visible in the JobTracker web ui and/or a separate metrics monitoring system.

          3. Counters should be accessible programatically through JobClient.

          One way to implement this would be to implement counters through the metrics API, as I've promoted above. Another approach would be to add a new counter-only API (a subset of metrics features) that routes values to the jobtracker, and can also be configured to talk to the metrics system. Then user code can decide whether to use the metrics API directly (for non-counter metrics) or use the counter-only API, and get the benefit of the JobTracker-based aggregation, built into the MapReduce runtime. I don't have a strong preference about which implementation strategy is pursued.

          Show
          Doug Cutting added a comment - I talked to Owen about this last week. My concerns are: 1. We should only instrument code once, for counters and for monitoring metrics. 2. Users should be able to easily add new counters & metrics to their code that are visible in the JobTracker web ui and/or a separate metrics monitoring system. 3. Counters should be accessible programatically through JobClient. One way to implement this would be to implement counters through the metrics API, as I've promoted above. Another approach would be to add a new counter-only API (a subset of metrics features) that routes values to the jobtracker, and can also be configured to talk to the metrics system. Then user code can decide whether to use the metrics API directly (for non-counter metrics) or use the counter-only API, and get the benefit of the JobTracker-based aggregation, built into the MapReduce runtime. I don't have a strong preference about which implementation strategy is pursued.
          Hide
          David Bowen added a comment -

          This requirement is not an exact match with the Metrics API. A MetricsRecord has a number of capabilities that aren't relevant here:

          • gauges as well as counters
          • adding any number of tags to the data to support various ways of aggregating it
          • atomic update of multiple metrics
          • removing metrics

          So I don't think it makes sense to expose any aspect of the Metrics API here. We can simply add one method to Reporter:

          void incrCounter(String name, long amount);

          Behind the scenes, we can automatically send this data to the Metrics API with appropriate tags, as well as aggregating it into the TaskStatus and JobStatus objects so that it is accessible via JobClient.

          We would have some counters that are maintained by the framework. Currently, these would be:

          shuffle_input_bytes
          map_input_records
          map_input_bytes
          map_output_records
          map_output_bytes
          reduce_input_records
          reduce_output_records

          Do we need some sort of counter naming convention to prevent future conflicts between framework-maintained counters and user-defined counters?

          Show
          David Bowen added a comment - This requirement is not an exact match with the Metrics API. A MetricsRecord has a number of capabilities that aren't relevant here: gauges as well as counters adding any number of tags to the data to support various ways of aggregating it atomic update of multiple metrics removing metrics So I don't think it makes sense to expose any aspect of the Metrics API here. We can simply add one method to Reporter: void incrCounter(String name, long amount); Behind the scenes, we can automatically send this data to the Metrics API with appropriate tags, as well as aggregating it into the TaskStatus and JobStatus objects so that it is accessible via JobClient. We would have some counters that are maintained by the framework. Currently, these would be: shuffle_input_bytes map_input_records map_input_bytes map_output_records map_output_bytes reduce_input_records reduce_output_records Do we need some sort of counter naming convention to prevent future conflicts between framework-maintained counters and user-defined counters?
          Hide
          Doug Cutting added a comment -

          That sounds like a great plan.

          > Do we need some sort of counter naming convention to prevent future conflicts between framework-maintained counters and user-defined counters?

          We could perhaps piggyback of Java's naming system by changing the Reporter method to be:

          void incrCounter(Enum key, long amount);

          Then, internally, we can convert the key to a String with something like:

          String name = key.getDeclaringClass().getName()+"#"+key.toString();

          This serves two purposes: keys are checked at compile time (since they have to be defined with enums) and they're also package-qualified.

          In the web ui, it would be great if all counters, both user and system defined, were displayed in various forms: raw totals, total rates (counts/second), and per-task averages (average count/task, average rate/task).

          Show
          Doug Cutting added a comment - That sounds like a great plan. > Do we need some sort of counter naming convention to prevent future conflicts between framework-maintained counters and user-defined counters? We could perhaps piggyback of Java's naming system by changing the Reporter method to be: void incrCounter(Enum key, long amount); Then, internally, we can convert the key to a String with something like: String name = key.getDeclaringClass().getName()+"#"+key.toString(); This serves two purposes: keys are checked at compile time (since they have to be defined with enums) and they're also package-qualified. In the web ui, it would be great if all counters, both user and system defined, were displayed in various forms: raw totals, total rates (counts/second), and per-task averages (average count/task, average rate/task).
          Hide
          Runping Qi added a comment -

          If we change Reporter method to:
          void incrCounter(Enum key, long amount);

          How does a user to accumulate on his/her specific counters?
          I think it is better to also have:

          void incrCounter(String name, long amount);

          Show
          Runping Qi added a comment - If we change Reporter method to: void incrCounter(Enum key, long amount); How does a user to accumulate on his/her specific counters? I think it is better to also have: void incrCounter(String name, long amount);
          Hide
          Doug Cutting added a comment -

          > How does a user to accumulate on his/her specific counters?

          public enum MyCounters

          { FROBS, WIDGETS }

          ;

          reporter.incrCounter(FROBS, 17);

          Show
          Doug Cutting added a comment - > How does a user to accumulate on his/her specific counters? public enum MyCounters { FROBS, WIDGETS } ; reporter.incrCounter(FROBS, 17);
          Hide
          David Bowen added a comment -

          I like the enum approach. It solves the namespace problem, and provides compile-time checking of the counter names.

          The only possible drawback I can see is the need to send longer strings between processes. I have no idea if this would be a significant performance issue. If it is, we could potentially optimize the wire format by having a way to specify what the counters are in the job configuration, so that the counter names never have to be sent.

          Show
          David Bowen added a comment - I like the enum approach. It solves the namespace problem, and provides compile-time checking of the counter names. The only possible drawback I can see is the need to send longer strings between processes. I have no idea if this would be a significant performance issue. If it is, we could potentially optimize the wire format by having a way to specify what the counters are in the job configuration, so that the counter names never have to be sent.
          Hide
          Doug Cutting added a comment -

          > The only possible drawback I can see is the need to send longer strings between processes.

          Another approach might be to make the protocol stateful, where the first time a counter name is sent in a session, a String is sent, and, thereafter it is only referred to by numeric ID. But I wouldn't worry about this right off: first let's get it working, then optimize it. We can also increase the update interval to decrease traffic.

          Show
          Doug Cutting added a comment - > The only possible drawback I can see is the need to send longer strings between processes. Another approach might be to make the protocol stateful, where the first time a counter name is sent in a session, a String is sent, and, thereafter it is only referred to by numeric ID. But I wouldn't worry about this right off: first let's get it working, then optimize it. We can also increase the update interval to decrease traffic.
          Hide
          Runping Qi added a comment -

          As a user, I normally am interested only in the final accumulated values of my counters, and don't need/want to know them as the job runs.
          I think each task can do local aggregations for the counters. If I need to display during it, I can explicitly get the values and call Reporter.setStatus() method (or LOG) for that purpose. That way, I can control the frequence of the refreshment.

          Show
          Runping Qi added a comment - As a user, I normally am interested only in the final accumulated values of my counters, and don't need/want to know them as the job runs. I think each task can do local aggregations for the counters. If I need to display during it, I can explicitly get the values and call Reporter.setStatus() method (or LOG) for that purpose. That way, I can control the frequence of the refreshment.
          Hide
          Doug Cutting added a comment -

          > I normally am interested only in the final accumulated values of my counters

          Yes, but others may be interested in counting, e.g., network errors, to judge the health of long-running jobs. Still, I don't think we need to provide updates but every minute or so, which, for many tasks, means only the final values will be transmitted.

          Show
          Doug Cutting added a comment - > I normally am interested only in the final accumulated values of my counters Yes, but others may be interested in counting, e.g., network errors, to judge the health of long-running jobs. Still, I don't think we need to provide updates but every minute or so, which, for many tasks, means only the final values will be transmitted.
          Hide
          David Bowen added a comment -

          Here's a patch for review. Some issues and notes:

          • I haven't managed to test this properly with LocalJobRunner because (I think) the namenode keeps throwing SafeModeException. Tips on how to resolve this would be appreciated.
          • I called the main counters class Statistics. Maybe it should be called Counters?
          • I added a couple of counters to the WordCount example. If it is preferred to keep that example minimalist, these don't need to go there.
          • From the job info page you can navigate to per-tip and per-task counters if you are interested.
          • JobInProgress sends the per-job counters to the metrics package whenever it updates them.
          Show
          David Bowen added a comment - Here's a patch for review. Some issues and notes: I haven't managed to test this properly with LocalJobRunner because (I think) the namenode keeps throwing SafeModeException. Tips on how to resolve this would be appreciated. I called the main counters class Statistics. Maybe it should be called Counters? I added a couple of counters to the WordCount example. If it is preferred to keep that example minimalist, these don't need to go there. From the job info page you can navigate to per-tip and per-task counters if you are interested. JobInProgress sends the per-job counters to the metrics package whenever it updates them.
          Hide
          Andrzej Bialecki added a comment -

          I personally prefer the name Counters. It's also conceptually easier to match this Hadoop implementation with the Google papers.

          Show
          Andrzej Bialecki added a comment - I personally prefer the name Counters. It's also conceptually easier to match this Hadoop implementation with the Google papers.
          Hide
          David Bowen added a comment -


          Oops, forgot to include 2 new files in the patch.

          Show
          David Bowen added a comment - Oops, forgot to include 2 new files in the patch.
          Hide
          Doug Cutting added a comment -

          Overall this looks good to me. A couple of minor comments:

          1. You've changed Reporter from an interface to an abstract class. That's a significant enough change that I'd like to understand its motivation. I'd like to see an analysis of the tradeoffs of that before we make such a change to a core public API.

          2. You've commented out some code rather than deleted it. We generally try to avoid that.

          Show
          Doug Cutting added a comment - Overall this looks good to me. A couple of minor comments: 1. You've changed Reporter from an interface to an abstract class. That's a significant enough change that I'd like to understand its motivation. I'd like to see an analysis of the tradeoffs of that before we make such a change to a core public API. 2. You've commented out some code rather than deleted it. We generally try to avoid that.
          Hide
          David Bowen added a comment -

          Thanks Andrzej and Doug for your comments. There is one vote for changing the class name to Counters, and none against, so unless anyone else wants to argue about it, I will switch to Counters.

          Re Doug's comments:

          1. Reporter: I was thinking only about source compatibility, which is actually improved by making this an abstract class so that code like this:

          Reporter reporter = new Reporter()

          { // definitions of abstract methods }

          ;

          will still work because the new method (incrCounter(String name)) is not abstract. However, that is pretty unimportant, because it doesn't affect users, since they don't have any reason to implement Reporter. The down side of the change is that it breaks binary compatibility - so that users would need to recompile their applications, and there isn't a good enough reason for doing this. So I will change it back to an interface.

          2. I will remove the method that I commented out.

          Show
          David Bowen added a comment - Thanks Andrzej and Doug for your comments. There is one vote for changing the class name to Counters, and none against, so unless anyone else wants to argue about it, I will switch to Counters. Re Doug's comments: 1. Reporter: I was thinking only about source compatibility, which is actually improved by making this an abstract class so that code like this: Reporter reporter = new Reporter() { // definitions of abstract methods } ; will still work because the new method (incrCounter(String name)) is not abstract. However, that is pretty unimportant, because it doesn't affect users, since they don't have any reason to implement Reporter. The down side of the change is that it breaks binary compatibility - so that users would need to recompile their applications, and there isn't a good enough reason for doing this. So I will change it back to an interface. 2. I will remove the method that I commented out.
          Hide
          David Bowen added a comment -

          Here is the updated patch with the fixes: (1) Reporter is an interface again, (2) Statistics is renamed Counters, and (3) a commented-out method has been removed.

          Show
          David Bowen added a comment - Here is the updated patch with the fixes: (1) Reporter is an interface again, (2) Statistics is renamed Counters, and (3) a commented-out method has been removed.
          Hide
          David Bowen added a comment -


          Merged with recent changes.

          Show
          David Bowen added a comment - Merged with recent changes.
          Hide
          Doug Cutting added a comment -

          I just committed this. Thanks, David!

          Show
          Doug Cutting added a comment - I just committed this. Thanks, David!
          Hide
          Albert Chern added a comment -

          I played around with this a bit today and I have a few questions:

          1) Why does the method to increment a counter take an enum whereas the method to read the value takes a String? Wouldn't it be more convenient if Counters.getCounter() also took an enum?

          2) As a test, I created an enum with the value MY_COUNTER and placed a call to reporter.incrCounter(MY_COUNTER, 1) at the very beginning of a map(). Surprisingly, the final value was slightly less than MapTask's INPUT_RECORDS (120925196 vs. 120926095). Am I missing something here, or is this potentially a bug?

          Show
          Albert Chern added a comment - I played around with this a bit today and I have a few questions: 1) Why does the method to increment a counter take an enum whereas the method to read the value takes a String? Wouldn't it be more convenient if Counters.getCounter() also took an enum? 2) As a test, I created an enum with the value MY_COUNTER and placed a call to reporter.incrCounter(MY_COUNTER, 1) at the very beginning of a map(). Surprisingly, the final value was slightly less than MapTask's INPUT_RECORDS (120925196 vs. 120926095). Am I missing something here, or is this potentially a bug?
          Hide
          David Bowen added a comment -

          > 1) Why does the method to increment a counter take an enum whereas the method to read the value takes a String?
          > Wouldn't it be more convenient if Counters.getCounter() also took an enum?

          Yes it would. The issue is that Counters objects move between processes, including back to the client. I don't think we can safely assume that the right Enum type will be available everywhere.

          FYI I've changed the Counters API in a patch attached to Hadoop-1041, but it isn't any simpler . Counters are now grouped by the enum type that they came from.

          With regard to your test, it could be a bug. It would be interesting to see if you get a similar discrepancy after applying the 1041 patch.

          Show
          David Bowen added a comment - > 1) Why does the method to increment a counter take an enum whereas the method to read the value takes a String? > Wouldn't it be more convenient if Counters.getCounter() also took an enum? Yes it would. The issue is that Counters objects move between processes, including back to the client. I don't think we can safely assume that the right Enum type will be available everywhere. FYI I've changed the Counters API in a patch attached to Hadoop-1041, but it isn't any simpler . Counters are now grouped by the enum type that they came from. With regard to your test, it could be a bug. It would be interesting to see if you get a similar discrepancy after applying the 1041 patch.
          Hide
          Doug Cutting added a comment -

          > I don't think we can safely assume that the right Enum type will be available everywhere.

          No, we can't, so String-based access is required. But we might also include Enum-based access as an option, so that if folks do have the Enum in hand they can use it to get counter values, no?

          Show
          Doug Cutting added a comment - > I don't think we can safely assume that the right Enum type will be available everywhere. No, we can't, so String-based access is required. But we might also include Enum-based access as an option, so that if folks do have the Enum in hand they can use it to get counter values, no?
          Hide
          David Bowen added a comment -

          True. Since this issue is closed, I could add this feature to the HADOOP-1041 patch. OK?

          Show
          David Bowen added a comment - True. Since this issue is closed, I could add this feature to the HADOOP-1041 patch. OK?
          Hide
          Doug Cutting added a comment -

          > Since this issue is closed, I could add this feature to the HADOOP-1041 patch. OK?

          +1 Thanks!

          Show
          Doug Cutting added a comment - > Since this issue is closed, I could add this feature to the HADOOP-1041 patch. OK? +1 Thanks!

            People

            • Assignee:
              David Bowen
              Reporter:
              arkady borkovsky
            • Votes:
              1 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development