Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-2125

Put map-reduce framework counters to JobTrackerMetricsInst

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.22.0
    • Fix Version/s: None
    • Component/s: jobtracker
    • Labels:
      None

      Description

      We have lots of useful information in the framework counters including #spills, filesystem read and write.
      It will be nice to put them all in the jobtracker metrics to get a global view of all these numbers.

      1. MAPREDUCE-2125-v2.txt
        3 kB
        Scott Chen
      2. MAPREDUCE-2125.txt
        3 kB
        Scott Chen

        Activity

        Hide
        Scott Chen added a comment -

        Hey Luke, Thanks for the reply.

        I also meant the metrics names regenerated on every doUpdate with regex replace and lowercase.

        I think they are OK because it happens only once every 5 second on the entire JT. There is no need to optimize them.

        The problem is that many of the intermediate jobs don't need the additional getJobCounter call. If you put it in the default instrumentation class, it becomes mandatory additional expensive call for every job. I'd be more comfortable if you provide a way to turn it off, preferably by default.

        I see. I will think of any elegant way to allow turning it off. Let me know if you have any good suggestions

        Show
        Scott Chen added a comment - Hey Luke, Thanks for the reply. I also meant the metrics names regenerated on every doUpdate with regex replace and lowercase. I think they are OK because it happens only once every 5 second on the entire JT. There is no need to optimize them. The problem is that many of the intermediate jobs don't need the additional getJobCounter call. If you put it in the default instrumentation class, it becomes mandatory additional expensive call for every job. I'd be more comfortable if you provide a way to turn it off, preferably by default. I see. I will think of any elegant way to allow turning it off. Let me know if you have any good suggestions
        Hide
        Luke Lu added a comment -

        I think adding one getJobCounter() for the entire lifecycle of a job should be allowed. If this is not doable, why should we need this method getJobCounter()?

        The problem is that many of the intermediate jobs don't need the additional getJobCounter call. If you put it in the default instrumentation class, it becomes mandatory additional expensive call for every job. I'd be more comfortable if you provide a way to turn it off, preferably by default.

        Show
        Luke Lu added a comment - I think adding one getJobCounter() for the entire lifecycle of a job should be allowed. If this is not doable, why should we need this method getJobCounter()? The problem is that many of the intermediate jobs don't need the additional getJobCounter call. If you put it in the default instrumentation class, it becomes mandatory additional expensive call for every job. I'd be more comfortable if you provide a way to turn it off, preferably by default.
        Hide
        Luke Lu added a comment -

        added intern() in for the filesystem counter. JobCounter and TaskCounter use enum. They are OK.

        I also meant the metrics names regenerated on every doUpdate with regex replace and lowercase.

        Show
        Luke Lu added a comment - added intern() in for the filesystem counter. JobCounter and TaskCounter use enum. They are OK. I also meant the metrics names regenerated on every doUpdate with regex replace and lowercase.
        Hide
        Scott Chen added a comment -

        Luke: I added intern() in for the filesystem counter. JobCounter and TaskCounter use enum. They are OK.
        Thanks for the suggestion.

              countersToMetrics.incrCounter(Task.FILESYSTEM_COUNTER_GROUP,
                  counter.getName().intern(), counter.getValue());
        

        I think the case you are talking about is bad for everything. Both obtainNew*Task() and initTasks() will be very expensive because they can be O( n) in the worst case with respect to number of tasks. In fact, these operations will be much more expansive than doing one getJobCounter(). Like you said, this should be fixed with using combined input format.

        The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists.

        I think adding one getJobCounter() for the entire lifecycle of a job should be allowed. If this is not doable, why should we need this method getJobCounter()?

        Show
        Scott Chen added a comment - Luke: I added intern() in for the filesystem counter. JobCounter and TaskCounter use enum. They are OK. Thanks for the suggestion. countersToMetrics.incrCounter(Task.FILESYSTEM_COUNTER_GROUP, counter.getName().intern(), counter.getValue()); I think the case you are talking about is bad for everything. Both obtainNew*Task() and initTasks() will be very expensive because they can be O( n) in the worst case with respect to number of tasks. In fact, these operations will be much more expansive than doing one getJobCounter(). Like you said, this should be fixed with using combined input format. The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists. I think adding one getJobCounter() for the entire lifecycle of a job should be allowed. If this is not doable, why should we need this method getJobCounter()?
        Hide
        Scott Chen added a comment -

        The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists.

        I see. Now I understand why you are worried.

        Another issue with the patch, the metrics names are regenerated on every update, which is wasteful. For these system counters you can use a simple cache to generate these metrics names only once and produce no additional garbage in updates.

        That's a good observation. We can intern those strings to avoid additional garbage.

        Show
        Scott Chen added a comment - The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists. I see. Now I understand why you are worried. Another issue with the patch, the metrics names are regenerated on every update, which is wasteful. For these system counters you can use a simple cache to generate these metrics names only once and produce no additional garbage in updates. That's a good observation. We can intern those strings to avoid additional garbage.
        Hide
        Luke Lu added a comment -

        If the use case is for many small jobs, each getCounter() call will be cheap. So in this case it will still be OK. I think the key here is that this change only add one more look at all task counter. So from the throughput point of view, it is not that large.

        The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists.

        Our job completion rate is about 20 jobs/minute in average.

        OK, you guys have well behaved jobs

        Another issue with the patch, the metrics names are regenerated on every update, which is wasteful. For these system counters you can use a simple cache to generate these metrics names only once and produce no additional garbage in updates.

        Show
        Luke Lu added a comment - If the use case is for many small jobs, each getCounter() call will be cheap. So in this case it will still be OK. I think the key here is that this change only add one more look at all task counter. So from the throughput point of view, it is not that large. The problem is not small jobs but short tasks in jobs with large amount of tasks. We happened to have certain system that generates jobs with 50k to 100k tasks per job, that only have a few MB per split, if you have multiple such jobs in different queues (or any shared scheduler that's not strictly FIFO), you can have high job completion rate for these large jobs after a while. Arguably, these jobs can be optimized to use proper input format to use less splits (hence less tasks) but I'd like to point out that such work load exists. Our job completion rate is about 20 jobs/minute in average. OK, you guys have well behaved jobs Another issue with the patch, the metrics names are regenerated on every update, which is wasteful. For these system counters you can use a simple cache to generate these metrics names only once and produce no additional garbage in updates.
        Hide
        Scott Chen added a comment -

        Hey Luke,

        What's the job completion rate on your cluster? If you just run a few big jobs that takes a while, It probably doesn't matter. But if you have jobs that takes seconds to minutes rather than hours (typically submitted via automated systems and getCounters is not even called because they're intermediate steps) the job completion rate can be very high, I've seen over 100 per second on our clusters.

        If the use case is for many small jobs, each getCounter() call will be cheap. So in this case it will still be OK.
        I think the key here is that this change only add one more look at all task counter. So from the throughput point of view, it is not that large.

        Our job completion rate is about 20 jobs/minute in average.
        Comparing to the general use case in our cluster that HIVE pulls the jobCounters periodically, this one is very light weight because it does getCounters only once.

        Show
        Scott Chen added a comment - Hey Luke, What's the job completion rate on your cluster? If you just run a few big jobs that takes a while, It probably doesn't matter. But if you have jobs that takes seconds to minutes rather than hours (typically submitted via automated systems and getCounters is not even called because they're intermediate steps) the job completion rate can be very high, I've seen over 100 per second on our clusters. If the use case is for many small jobs, each getCounter() call will be cheap. So in this case it will still be OK. I think the key here is that this change only add one more look at all task counter. So from the throughput point of view, it is not that large. Our job completion rate is about 20 jobs/minute in average. Comparing to the general use case in our cluster that HIVE pulls the jobCounters periodically, this one is very light weight because it does getCounters only once.
        Hide
        Luke Lu added a comment -

        You are right. The getCounter() will happen in completeJob(). But it is called once per job for the entire life-cycle of one job. So it has very minor impact on JT performance.

        What's the job completion rate on your cluster? If you just run a few big jobs that takes a while, It probably doesn't matter. But if you have jobs that takes seconds to minutes rather than hours (typically submitted via automated systems and getCounters is not even called because they're intermediate steps) the job completion rate can be very high, I've seen over 100 per second on our clusters.

        I'd be more comfortable with the patch, if we have an easy way to disable the expensive metrics. One way to do it without a lot of ifs is to create an alternative instrumentation class that subclass from the default class.

        Show
        Luke Lu added a comment - You are right. The getCounter() will happen in completeJob(). But it is called once per job for the entire life-cycle of one job. So it has very minor impact on JT performance. What's the job completion rate on your cluster? If you just run a few big jobs that takes a while, It probably doesn't matter. But if you have jobs that takes seconds to minutes rather than hours (typically submitted via automated systems and getCounters is not even called because they're intermediate steps) the job completion rate can be very high, I've seen over 100 per second on our clusters. I'd be more comfortable with the patch, if we have an easy way to disable the expensive metrics. One way to do it without a lot of ifs is to create an alternative instrumentation class that subclass from the default class.
        Hide
        Scott Chen added a comment -

        Sorry. I made a mistake in the previous comment. Please ignore the following comment. It is not correct.

        But here we do this in JobTrackerMetricsInst.doUpdates() which is called only every 5 seconds. So it has very minor impact on JT performance.

        You are right. The getCounter() will happen in completeJob(). But it is called once per job for the entire life-cycle of one job.
        So it has very minor impact on JT performance.

        Show
        Scott Chen added a comment - Sorry. I made a mistake in the previous comment. Please ignore the following comment. It is not correct. But here we do this in JobTrackerMetricsInst.doUpdates() which is called only every 5 seconds. So it has very minor impact on JT performance. You are right. The getCounter() will happen in completeJob(). But it is called once per job for the entire life-cycle of one job. So it has very minor impact on JT performance.
        Hide
        Scott Chen added a comment -

        Hey Luke, Thanks for the comments

        # Have you tried to benchmark the patch at scale? Calling job.getCounters in completeJob would bring down a busy JT on a large cluster to its knee. Think about calling getCounters (which is essentially a O( n ) operation) a few hundred times per second!

        You are right about that getCounters(). The method is really expansive.
        But here we do this in JobTrackerMetricsInst.doUpdates() which is called only every 5 seconds. So it has very minor impact on JT performance.
        We have put this on our 3000 nodes cluster that has many big jobs for months and it has been running fine.

        The necessity of having these total aggregate counts in real time. Rumen or other MR log processing tools can get these aggregates for performance analysis without impacting JT performance.

        We have an internal tool that graphs these metrics on a dashboard. It is really useful in real-time debugging for the cluster issues. I believe Y! and other people also have similar use case.

        If you really want these counters in real time, you should implement it in TT where it can send the metrics to distributed metrics aggregators with UDP etc. and can be easily disabled/enabled via the metrics system.

        That sounds like a good solution too. But I like the current way better because it is very simple.
        Anything we add in JobCounter and TaskCounter will automatically go to the metrics. We don't need to add more codes to make that happen.

        Show
        Scott Chen added a comment - Hey Luke, Thanks for the comments # Have you tried to benchmark the patch at scale? Calling job.getCounters in completeJob would bring down a busy JT on a large cluster to its knee. Think about calling getCounters (which is essentially a O( n ) operation) a few hundred times per second! You are right about that getCounters(). The method is really expansive. But here we do this in JobTrackerMetricsInst.doUpdates() which is called only every 5 seconds. So it has very minor impact on JT performance. We have put this on our 3000 nodes cluster that has many big jobs for months and it has been running fine. The necessity of having these total aggregate counts in real time. Rumen or other MR log processing tools can get these aggregates for performance analysis without impacting JT performance. We have an internal tool that graphs these metrics on a dashboard. It is really useful in real-time debugging for the cluster issues. I believe Y! and other people also have similar use case. If you really want these counters in real time, you should implement it in TT where it can send the metrics to distributed metrics aggregators with UDP etc. and can be easily disabled/enabled via the metrics system. That sounds like a good solution too. But I like the current way better because it is very simple. Anything we add in JobCounter and TaskCounter will automatically go to the metrics. We don't need to add more codes to make that happen.
        Hide
        Luke Lu added a comment -

        I meant O( n )

        Show
        Luke Lu added a comment - I meant O( n )
        Hide
        Luke Lu added a comment -

        There are two major issues with the proposal/patch.

        1. The necessity of having these total aggregate counts in real time. Rumen or other MR log processing tools can get these aggregates for performance analysis without impacting JT performance.
        2. Have you tried to benchmark the patch at scale? Calling job.getCounters in completeJob would bring down a busy JT on a large cluster to its knee. Think about calling getCounters (which is essentially a O operation) a few hundred times per second!

        If you really want these counters in real time, you should implement it in TT where it can send the metrics to distributed metrics aggregators with UDP etc. and can be easily disabled/enabled via the metrics system.

        Show
        Luke Lu added a comment - There are two major issues with the proposal/patch. The necessity of having these total aggregate counts in real time. Rumen or other MR log processing tools can get these aggregates for performance analysis without impacting JT performance. Have you tried to benchmark the patch at scale? Calling job.getCounters in completeJob would bring down a busy JT on a large cluster to its knee. Think about calling getCounters (which is essentially a O operation) a few hundred times per second! If you really want these counters in real time, you should implement it in TT where it can send the metrics to distributed metrics aggregators with UDP etc. and can be easily disabled/enabled via the metrics system.
        Hide
        Scott Chen added a comment -

        The patch collects the TaskCounters, JobCounters and FileSystemCounters and submit them to JobTrackerMetricsInst.

        Show
        Scott Chen added a comment - The patch collects the TaskCounters, JobCounters and FileSystemCounters and submit them to JobTrackerMetricsInst.

          People

          • Assignee:
            Scott Chen
            Reporter:
            Scott Chen
          • Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:

              Development