Hadoop Common
  1. Hadoop Common
  2. HADOOP-1900

the heartbeat and task event queries interval should be set dynamically by the JobTracker

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.16.0
    • Component/s: None
    • Labels:
      None

      Description

      The JobTracker should scale the intervals that the TaskTrackers use to contact it dynamically, based on how the busy it is and the size of the cluster.

      1. patch-1900.txt
        8 kB
        Amareshwari Sriramadasu
      2. patch-1900.txt
        8 kB
        Amareshwari Sriramadasu
      3. patch-1900.txt
        8 kB
        Amareshwari Sriramadasu
      4. patch-1900.txt
        8 kB
        Amareshwari Sriramadasu
      5. patch-1900.txt
        16 kB
        Amareshwari Sriramadasu
      6. patch-1900.txt
        20 kB
        Amareshwari Sriramadasu
      7. patch-1900.txt
        19 kB
        Amareshwari Sriramadasu

        Activity

        Hide
        Hudson added a comment -
        Show
        Hudson added a comment - Integrated in Hadoop-Nightly #322 (See http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Nightly/322/ )
        Hide
        Devaraj Das added a comment -

        I just committed this. Thanks, Amareshwari!

        Show
        Devaraj Das added a comment - I just committed this. Thanks, Amareshwari!
        Hide
        Amareshwari Sriramadasu added a comment -

        "if (diff > minWait) " condition should check for greater-than-or-equal-to

        changed

        Show
        Amareshwari Sriramadasu added a comment - "if (diff > minWait) " condition should check for greater-than-or-equal-to changed
        Hide
        Devaraj Das added a comment -

        Sorry one more comment - "if (diff > minWait) " condition should check for greater-than-or-equal-to.

        Show
        Devaraj Das added a comment - Sorry one more comment - "if (diff > minWait) " condition should check for greater-than-or-equal-to.
        Hide
        Hadoop QA added a comment -

        +1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12370812/patch-1900.txt
        against trunk revision r600244.

        @author +1. The patch does not contain any @author tags.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs +1. The patch does not introduce any new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12370812/patch-1900.txt against trunk revision r600244. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1242/console This message is automatically generated.
        Hide
        Amareshwari Sriramadasu added a comment -

        patch with comments incorporated and tested.

        Show
        Amareshwari Sriramadasu added a comment - patch with comments incorporated and tested.
        Hide
        Amareshwari Sriramadasu added a comment -

        Some more comments from Devaraj.

        Show
        Amareshwari Sriramadasu added a comment - Some more comments from Devaraj.
        Hide
        Devaraj Das added a comment -

        In the patch, I see that TaskTracker.initialize is no longer synchronized. Any reason why we should remove the synchronization for the method as part of this patch? Also, should we use int for doing the serialization/deserialization of the heartbeatInterval as opposed to long. Also, the sleep in getMapEvents should be elsewhere since we don't want to put a RPC handler to sleep (the handler will invoke this method eventually). (Sorry for the late comment)

        Show
        Devaraj Das added a comment - In the patch, I see that TaskTracker.initialize is no longer synchronized. Any reason why we should remove the synchronization for the method as part of this patch? Also, should we use int for doing the serialization/deserialization of the heartbeatInterval as opposed to long. Also, the sleep in getMapEvents should be elsewhere since we don't want to put a RPC handler to sleep (the handler will invoke this method eventually). (Sorry for the late comment)
        Hide
        Hadoop QA added a comment -

        +1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12370618/patch-1900.txt
        against trunk revision r599703.

        @author +1. The patch does not contain any @author tags.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs +1. The patch does not introduce any new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12370618/patch-1900.txt against trunk revision r599703. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1214/console This message is automatically generated.
        Hide
        Amareshwari Sriramadasu added a comment -

        tested on 500 node cluster

        Show
        Amareshwari Sriramadasu added a comment - tested on 500 node cluster
        Hide
        Amareshwari Sriramadasu added a comment -

        Some more comments from Arun and Devaraj -
        1. Replace 'if' check by max method
        2. declare heartbeatInterval variable as volatile.

        Show
        Amareshwari Sriramadasu added a comment - Some more comments from Arun and Devaraj - 1. Replace 'if' check by max method 2. declare heartbeatInterval variable as volatile.
        Hide
        Amareshwari Sriramadasu added a comment -

        The javadoc warning was on hbase -
        [javadoc] /export/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/shell/CreateCommand.java:77: warning - @param argument "table" is not a parameter name.

        contrib tests are also failed in hbase.
        I'm submitting the patch again for hudson

        Show
        Amareshwari Sriramadasu added a comment - The javadoc warning was on hbase - [javadoc] /export/home/hudson/hudson/jobs/Hadoop-Patch/workspace/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/shell/CreateCommand.java:77: warning - @param argument "table" is not a parameter name. contrib tests are also failed in hbase. I'm submitting the patch again for hudson
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12370276/patch-1900.txt
        against trunk revision r598555.

        @author +1. The patch does not contain any @author tags.

        javadoc -1. The javadoc tool appears to have generated messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs +1. The patch does not introduce any new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests -1. The patch failed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12370276/patch-1900.txt against trunk revision r598555. @author +1. The patch does not contain any @author tags. javadoc -1. The javadoc tool appears to have generated messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1175/console This message is automatically generated.
        Hide
        Amareshwari Sriramadasu added a comment -

        Amareshwari, could you please move TaskCompletionEventResponse to a separate TaskCompletionEventResponse.java? It is currently in JobTracker.java, and it shouldn't be a 'public' class either.

        I removed the class TaskCompletionEventResponse. Since we want to use heartbeat interval as polling interval, we dont need this class any more.

        Show
        Amareshwari Sriramadasu added a comment - Amareshwari, could you please move TaskCompletionEventResponse to a separate TaskCompletionEventResponse.java? It is currently in JobTracker.java, and it shouldn't be a 'public' class either. I removed the class TaskCompletionEventResponse. Since we want to use heartbeat interval as polling interval, we dont need this class any more.
        Hide
        Arun C Murthy added a comment -

        Amareshwari, could you please move TaskCompletionEventResponse to a separate TaskCompletionEventResponse.java? It is currently in JobTracker.java, and it shouldn't be a 'public' class either.

        Show
        Arun C Murthy added a comment - Amareshwari, could you please move TaskCompletionEventResponse to a separate TaskCompletionEventResponse.java? It is currently in JobTracker.java, and it shouldn't be a 'public' class either.
        Hide
        Hadoop QA added a comment -

        +1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12369658/patch-1900.txt
        against trunk revision r595563.

        @author +1. The patch does not contain any @author tags.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs +1. The patch does not introduce any new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12369658/patch-1900.txt against trunk revision r595563. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1111/console This message is automatically generated.
        Hide
        Amareshwari Sriramadasu added a comment -

        Here is a patch with proposed design.

        Show
        Amareshwari Sriramadasu added a comment - Here is a patch with proposed design.
        Hide
        Owen O'Malley added a comment -

        Devaraj, it does address job tracker business because it assumes that the job tracker should spend N% of its time processing heartbeats. Since the total heartbeat load is proportional to the number of nodes, scaling this way accomplishes it.

        Amareshwari +1

        Show
        Owen O'Malley added a comment - Devaraj, it does address job tracker business because it assumes that the job tracker should spend N% of its time processing heartbeats. Since the total heartbeat load is proportional to the number of nodes, scaling this way accomplishes it. Amareshwari +1
        Hide
        Amareshwari Sriramadasu added a comment - - edited

        Now considering only cluster size for varying heartbeat interval, the propasal is as follows:

        1. Heartbeat interval = max(2, clusterSize/50+1)
        i.e. for every 50 nodes, we increase the heartbeat interval by 1 second.

        2. Map completion events polling interval can take the same value as heartbeat interval.
        Apart from polling, the tasktracker will also fetch map events from the JobTracker when a reducetask asks for events and it has nothing to give (this is similar to the way tasktrackers ask for a new task whenever it finishes executing a task)

        Thoughts?

        Show
        Amareshwari Sriramadasu added a comment - - edited Now considering only cluster size for varying heartbeat interval, the propasal is as follows: 1. Heartbeat interval = max(2, clusterSize/50+1) i.e. for every 50 nodes, we increase the heartbeat interval by 1 second. 2. Map completion events polling interval can take the same value as heartbeat interval. Apart from polling, the tasktracker will also fetch map events from the JobTracker when a reducetask asks for events and it has nothing to give (this is similar to the way tasktrackers ask for a new task whenever it finishes executing a task) Thoughts?
        Hide
        Devaraj Das added a comment -

        One of the requirements that this issue is supposed to address is the JobTracker busyness (issue description). So, by basing the heartbeat on purely the cluster size, we are not addressing that requirement (unless we say that the clustersize based frequency would handle it). Just wanted to bring it to everyone's notice.

        Show
        Devaraj Das added a comment - One of the requirements that this issue is supposed to address is the JobTracker busyness (issue description). So, by basing the heartbeat on purely the cluster size, we are not addressing that requirement (unless we say that the clustersize based frequency would handle it). Just wanted to bring it to everyone's notice.
        Hide
        Sameer Paranjpye added a comment -

        > But I don't see the importance of faster heartbeats on faster hardware, especially if it adds complexity to the code.

        +1

        Show
        Sameer Paranjpye added a comment - > But I don't see the importance of faster heartbeats on faster hardware, especially if it adds complexity to the code. +1
        Hide
        Doug Cutting added a comment -

        I don't see why finding an optimal heartbeat time is critical. Scaling as the cluster grows seems reasonable, and using a safe scaling factor so that the jobtracker is not overwhelmed even on slow hardware seems prudent. But I don't see the importance of faster heartbeats on faster hardware, especially if it adds complexity to the code. Unless someone can provide a reason why this is important, I also feel an adaptive mechanism is overkill.

        Show
        Doug Cutting added a comment - I don't see why finding an optimal heartbeat time is critical. Scaling as the cluster grows seems reasonable, and using a safe scaling factor so that the jobtracker is not overwhelmed even on slow hardware seems prudent. But I don't see the importance of faster heartbeats on faster hardware, especially if it adds complexity to the code. Unless someone can provide a reason why this is important, I also feel an adaptive mechanism is overkill.
        Hide
        Devaraj Das added a comment -

        I think in my last proposal, the adaptiveness is not going to be that much of an overkill since I am only suggesting that we just monitor the average time to process an RPC. That will take care of things like beefy hardware vs rudimentary ones...

        Show
        Devaraj Das added a comment - I think in my last proposal, the adaptiveness is not going to be that much of an overkill since I am only suggesting that we just monitor the average time to process an RPC. That will take care of things like beefy hardware vs rudimentary ones...
        Hide
        Owen O'Malley added a comment -

        Oops, clearly I meant max instead of min. I still think an adaptive response is overkill for this.

        Show
        Owen O'Malley added a comment - Oops, clearly I meant max instead of min. I still think an adaptive response is overkill for this.
        Hide
        Devaraj Das added a comment -

        Owen, by what you suggested, it appears that the heartbeat interval would be 2 seconds for all cluster configurations with more than 20 nodes. This seems way too much.

        After some thought, I am tending to agree with Owen that backoff may be difficult to control. So here is a simplified proposal:

        1) Monitor the average time we take to process an RPC

        2) Assuming that every RPC can be processed within millisecond(s), the average #RPCs that the server can process per minute (RPC-processed-per-minute) is: (60000 / time-per-rpc). Assuming time-per-rpc is ~10 msec, ~6000 RPCs can be processed in a minute. Since the heartbeat RPC invocation locks the JobTracker, the number of handlers actually don't matter much.

        3) The frequency of heartbeat should be (clustersize/RPC-processed-per-minute) minutes.
        For example, if ClusterSize = 1000, the heartbeat interval is set to 1000/6000 min = 10 sec.

        4) taskCompletionEvents : this RPC is treated no differently than the heartbeat RPC. In addition to regular polling, this RPC also happens on demand, i.e., a TaskTracker invokes this RPC whenever a ReduceTask asks for MapcompletionEvents and the TaskTracker has nothing to give back (a lower cap of 5 seconds is set between two on-demand rpcs). This is similar to the way heartbeat RPCs work - whenever tasks finish, the TaskTracker sends a heartbeat.

        What do others think?

        Show
        Devaraj Das added a comment - Owen, by what you suggested, it appears that the heartbeat interval would be 2 seconds for all cluster configurations with more than 20 nodes. This seems way too much. After some thought, I am tending to agree with Owen that backoff may be difficult to control. So here is a simplified proposal: 1) Monitor the average time we take to process an RPC 2) Assuming that every RPC can be processed within millisecond(s), the average #RPCs that the server can process per minute (RPC-processed-per-minute) is: (60000 / time-per-rpc). Assuming time-per-rpc is ~10 msec, ~6000 RPCs can be processed in a minute. Since the heartbeat RPC invocation locks the JobTracker, the number of handlers actually don't matter much. 3) The frequency of heartbeat should be (clustersize/RPC-processed-per-minute) minutes. For example, if ClusterSize = 1000, the heartbeat interval is set to 1000/6000 min = 10 sec. 4) taskCompletionEvents : this RPC is treated no differently than the heartbeat RPC. In addition to regular polling, this RPC also happens on demand, i.e., a TaskTracker invokes this RPC whenever a ReduceTask asks for MapcompletionEvents and the TaskTracker has nothing to give back (a lower cap of 5 seconds is set between two on-demand rpcs). This is similar to the way heartbeat RPCs work - whenever tasks finish, the TaskTracker sends a heartbeat. What do others think?
        Hide
        Owen O'Malley added a comment -

        I really really don't think the complexity is justified. If we have too many levels of back offs and retries it will be very hard to control the performance. I'd propose

           heartbeatPeriodSeconds = min(2, slaveNodes / 10);
        
        Show
        Owen O'Malley added a comment - I really really don't think the complexity is justified. If we have too many levels of back offs and retries it will be very hard to control the performance. I'd propose heartbeatPeriodSeconds = min(2, slaveNodes / 10);
        Hide
        Amareshwari Sriramadasu added a comment - - edited

        With the patch attached, I ran sort benchmarks on 390 node cluster and 120 node cluster. The performance is almost the same as with the trunk.

        To simulate busyness at the job tracker, I ran the sort benchmarks on 120 node cluster with number of handlers=4 and with max queue size per handler =10, but there are drops and lost task trackers with the patch and without.

        Thus Cluster size factor as (clusterSize/50+1) should be fine. But the busyFactor has to be better tuned.

        I propose the following to tune busy factor:

        We have threshouldDropCount = clusterSize/10;

        We increment busyFactor by HEARTBEAT_BUSY_FACTOR (say 2secs) for every 10% cluster size drops.
        if(dropCount > threshouldDropCount)

        { busyFactor += (dropCount/threshouldDropCount)*HEARTBEAT_BUSY_FACTOR; }

        For example, on a 100 node cluster, if we see 40 drops, busyFactor is incremented by 8 seconds (40/10*2).

        If job tracker is not busy for 'observationInterval' , then we will decrement busyFactor by HEARTBEAT_BUSY_FACTOR;

        To calculate observationInterval,
        We have, 2 rpcs to be processed as at the jobtracker i.e. heartbeat and task completion events. let processing time per rpc be 2 seconds.
        Here, observationInterval is calculated as:
        observationInterval = (clusterSize/#handlers)*processingTime*2;

        Assuming that we don't see drops at a certain observationInterval (and the corresponding busyFactor), we decrement the busyFactor by HEARTBEAT_BUSY_FACTOR. This can be done in a loop, until we see drops. When we see drops, we increment it by the constant HEARTBEAT_BUSY_FACTOR, and stabilize there .. until we see drops.

        For example, On a 100 on cluster, We start with 2 seconds heartbeat interval.
        We see 40 drops, then busyFactor = 8; then, new interval = (2+8) =10;
        We dont see drops for 40 seconds; new interval = 10-2 =8;
        We dont see drops for 40 seconds; new interval = 8-2 =6;
        We dont see drops for 40 seconds; new interval = 6-2 =4;
        We see drops; then new interval = 6;
        We dont see drops for lone time, say. we stabilize here.
        Say we see 30 drops after some time, busyFactor =6; new interval = 6+6 =12;
        And the loop repeats.

        Thoughts?

        Show
        Amareshwari Sriramadasu added a comment - - edited With the patch attached, I ran sort benchmarks on 390 node cluster and 120 node cluster. The performance is almost the same as with the trunk. To simulate busyness at the job tracker, I ran the sort benchmarks on 120 node cluster with number of handlers=4 and with max queue size per handler =10, but there are drops and lost task trackers with the patch and without. Thus Cluster size factor as (clusterSize/50+1) should be fine. But the busyFactor has to be better tuned. I propose the following to tune busy factor: We have threshouldDropCount = clusterSize/10; We increment busyFactor by HEARTBEAT_BUSY_FACTOR (say 2secs) for every 10% cluster size drops. if(dropCount > threshouldDropCount) { busyFactor += (dropCount/threshouldDropCount)*HEARTBEAT_BUSY_FACTOR; } For example, on a 100 node cluster, if we see 40 drops, busyFactor is incremented by 8 seconds (40/10*2). If job tracker is not busy for 'observationInterval' , then we will decrement busyFactor by HEARTBEAT_BUSY_FACTOR; To calculate observationInterval, We have, 2 rpcs to be processed as at the jobtracker i.e. heartbeat and task completion events. let processing time per rpc be 2 seconds. Here, observationInterval is calculated as: observationInterval = (clusterSize/#handlers)*processingTime*2; Assuming that we don't see drops at a certain observationInterval (and the corresponding busyFactor), we decrement the busyFactor by HEARTBEAT_BUSY_FACTOR. This can be done in a loop, until we see drops. When we see drops, we increment it by the constant HEARTBEAT_BUSY_FACTOR, and stabilize there .. until we see drops. For example, On a 100 on cluster, We start with 2 seconds heartbeat interval. We see 40 drops, then busyFactor = 8; then, new interval = (2+8) =10; We dont see drops for 40 seconds; new interval = 10-2 =8; We dont see drops for 40 seconds; new interval = 8-2 =6; We dont see drops for 40 seconds; new interval = 6-2 =4; We see drops; then new interval = 6; We dont see drops for lone time, say. we stabilize here. Say we see 30 drops after some time, busyFactor =6; new interval = 6+6 =12; And the loop repeats. Thoughts?
        Hide
        Doug Cutting added a comment -

        This sounds complicated. Is it really required? On a small cluster, is there any harm in reporting every second? I'd rather try the simple proportional approach first and see if it is not sufficient.

        Show
        Doug Cutting added a comment - This sounds complicated. Is it really required? On a small cluster, is there any harm in reporting every second? I'd rather try the simple proportional approach first and see if it is not sufficient.
        Hide
        Arun C Murthy added a comment -

        After more thought, some observations:

        1. Heartbeat Interval Ranges

        I believe initial heartbeat interval of (clustersize/50) is too aggressive for small clusters e.g. it leads to 1s for 50-node cluster, 2s for 100 nodes etc. I state this with care since there isn't much tasks can accomplish in a 2-3second interval. Instead, speaking from experience I'd like to see the chosen algorithm achieve the following intervals for the given cluster sizes:

        Cluster Size Heartbeat Interval (in secs)
        < 100 5s
        100-500 5s- 10s
        500-1000 10s-15s
        1000-1500 15s-20s
        1500-2000 20+ s

        These numbers are in-line with observed performance on real-world clusters, and also keeping in mind that any interval <5s is probably not going to be able to update much.

        2. Dynamic Scaling of HeartBeat Intervals

        I propose we model the back-off strategy loosely on TCP's slow start, i.e. put reliability above performance. When we notice a significant number of dropped RPCs the first thing is to ensure that it doesn't occur again. Keeping that in mind I propose we double the current heartbeat interval (upto the above limits, section 1), and keep doubling till we see no more dropped calls. Once we achieve that reliability goal, I propose we decrease the heartbeat interval slowly (say by 1s at a time) till we achieve stability i.e. no more dropped calls.

        E.g.

        Cluster size of 100 nodes.

        Time Noticed Behaviour Reaction on Heartbeat Interval
        t0   5s
        t1 dropped calls (say 10% of cluster-size i.e. 10 dropped calls) Increase to 10s
        t2 no more dropped calls decrease to 9s
        t3 no more dropped calls decrease to 8s
        t4 no more dropped calls decrease to 7s
        t4 dropped calls increase to 8s
        t5 no more dropped calls stabilize at 8s

        Thoughts?

        Show
        Arun C Murthy added a comment - After more thought, some observations: 1. Heartbeat Interval Ranges I believe initial heartbeat interval of (clustersize/50) is too aggressive for small clusters e.g. it leads to 1s for 50-node cluster, 2s for 100 nodes etc. I state this with care since there isn't much tasks can accomplish in a 2-3second interval. Instead, speaking from experience I'd like to see the chosen algorithm achieve the following intervals for the given cluster sizes: Cluster Size Heartbeat Interval (in secs) < 100 5s 100-500 5s- 10s 500-1000 10s-15s 1000-1500 15s-20s 1500-2000 20+ s These numbers are in-line with observed performance on real-world clusters, and also keeping in mind that any interval <5s is probably not going to be able to update much. 2. Dynamic Scaling of HeartBeat Intervals I propose we model the back-off strategy loosely on TCP's slow start , i.e. put reliability above performance. When we notice a significant number of dropped RPCs the first thing is to ensure that it doesn't occur again. Keeping that in mind I propose we double the current heartbeat interval (upto the above limits, section 1), and keep doubling till we see no more dropped calls. Once we achieve that reliability goal, I propose we decrease the heartbeat interval slowly (say by 1s at a time) till we achieve stability i.e. no more dropped calls. E.g. Cluster size of 100 nodes. Time Noticed Behaviour Reaction on Heartbeat Interval t0   5s t1 dropped calls (say 10% of cluster-size i.e. 10 dropped calls) Increase to 10s t2 no more dropped calls decrease to 9s t3 no more dropped calls decrease to 8s t4 no more dropped calls decrease to 7s t4 dropped calls increase to 8s t5 no more dropped calls stabilize at 8s Thoughts?
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12368873/patch-1900.txt
        against trunk revision r591389.

        @author +1. The patch does not contain any @author tags.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

        core tests +1. The patch passed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12368873/patch-1900.txt against trunk revision r591389. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1055/console This message is automatically generated.
        Hide
        Sameer Paranjpye added a comment -

        Do we have any benchmarks to show that this helps?

        Show
        Sameer Paranjpye added a comment - Do we have any benchmarks to show that this helps?
        Hide
        Amareshwari Sriramadasu added a comment -

        Submiting patch again after testing on 390 node cluster.
        Fixed a deadlock issue in the previous patch.

        Show
        Amareshwari Sriramadasu added a comment - Submiting patch again after testing on 390 node cluster. Fixed a deadlock issue in the previous patch.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12368760/patch-1900.txt
        against trunk revision r590273.

        @author +1. The patch does not contain any @author tags.

        javadoc +1. The javadoc tool did not generate any warning messages.

        javac +1. The applied patch does not generate any new compiler warnings.

        findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

        core tests -1. The patch failed core unit tests.

        contrib tests +1. The patch passed contrib unit tests.

        Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/testReport/
        Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/artifact/trunk/build/test/checkstyle-errors.html
        Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12368760/patch-1900.txt against trunk revision r590273. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1039/console This message is automatically generated.
        Hide
        Amareshwari Sriramadasu added a comment -

        In the patch attached,
        The job tracker periodically calculates the heartbeat interval. It looks at both cluster size and busyness of jobtracker. If jobtracker is busy, the interval is incremented by a busyFactor. If it is not busy for two continuous periods, the interval is decremented by the busyFactor.

        Map events polling interval is calculated as a function of heartbeat interval to skip the recalculation. It is calculated as follows:
        polling_interval = heartbeat_interval/3;
        if polling_interval < MIN_POLLING_INTERVAL, then polling_interval = MIN_POLLING_INTERVAL;
        if polling_interval > MAX_POLLING_INTERVAL, then polling_interval = MAX_POLLING_INTERVAL;
        MapEventsFetcherThread is notified if a reduce task doesnt find map events at the tasktracker.

        I propose a change to the status message in the heartbeat - the tasktracker can compare the current task status with the previous one and if it finds the status to be the same, it doesn't send the complete status object to the JobTracker, but just a flag saying it is a duplicate or something to that effect. That will reduce the data per RPC considerably for long running tasks whose statuses don't change frequently and also reduce the processing load on the JobTracker.

        This will be addressed in another JIRA

        Show
        Amareshwari Sriramadasu added a comment - In the patch attached, The job tracker periodically calculates the heartbeat interval. It looks at both cluster size and busyness of jobtracker. If jobtracker is busy, the interval is incremented by a busyFactor. If it is not busy for two continuous periods, the interval is decremented by the busyFactor. Map events polling interval is calculated as a function of heartbeat interval to skip the recalculation. It is calculated as follows: polling_interval = heartbeat_interval/3; if polling_interval < MIN_POLLING_INTERVAL, then polling_interval = MIN_POLLING_INTERVAL; if polling_interval > MAX_POLLING_INTERVAL, then polling_interval = MAX_POLLING_INTERVAL; MapEventsFetcherThread is notified if a reduce task doesnt find map events at the tasktracker. I propose a change to the status message in the heartbeat - the tasktracker can compare the current task status with the previous one and if it finds the status to be the same, it doesn't send the complete status object to the JobTracker, but just a flag saying it is a duplicate or something to that effect. That will reduce the data per RPC considerably for long running tasks whose statuses don't change frequently and also reduce the processing load on the JobTracker. This will be addressed in another JIRA
        Hide
        Devaraj Das added a comment -

        So, one way to take this into account might be to maintain an average time-to-complete for all tasks in the system (of current jobs) and factor that into the scaling of the intervals.

        The TaskTracker currently pings the JobTracker asking for a task as soon as it finishes executing a task. I think that should be the behavior to keep the utilization of the tasktrackers optimal (of course, in general we could do better by sending it a bunch of tasks every time it asks for a new task, but that's the subject of another jira).

        Also, while we are at this, I say we should start to consider busy-ness of JobTracker too, along with the cluster-size. So, for e.g., if the individual tasks are taking in the order of minutes, then it might not matter much if we send one every 20s or so, in some cases it might. I know that the sort's map tasks take around 40s each...

        I propose a change to the status message in the heartbeat - the tasktracker can compare the current task status with the previous one and if it finds the status to be the same, it doesn't send the complete status object to the JobTracker, but just a flag saying it is a duplicate or something to that effect. That will reduce the data per RPC considerably for long running tasks whose statuses don't change frequently and also reduce the processing load on the JobTracker.

        Thoughts?

        Show
        Devaraj Das added a comment - So, one way to take this into account might be to maintain an average time-to-complete for all tasks in the system (of current jobs) and factor that into the scaling of the intervals. The TaskTracker currently pings the JobTracker asking for a task as soon as it finishes executing a task. I think that should be the behavior to keep the utilization of the tasktrackers optimal (of course, in general we could do better by sending it a bunch of tasks every time it asks for a new task, but that's the subject of another jira). Also, while we are at this, I say we should start to consider busy-ness of JobTracker too, along with the cluster-size. So, for e.g., if the individual tasks are taking in the order of minutes, then it might not matter much if we send one every 20s or so, in some cases it might. I know that the sort's map tasks take around 40s each... I propose a change to the status message in the heartbeat - the tasktracker can compare the current task status with the previous one and if it finds the status to be the same, it doesn't send the complete status object to the JobTracker, but just a flag saying it is a duplicate or something to that effect. That will reduce the data per RPC considerably for long running tasks whose statuses don't change frequently and also reduce the processing load on the JobTracker. Thoughts?
        Hide
        Arun C Murthy added a comment -

        I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive.

        +1

        I'd like to see some numbers about how long it takes to process a heartbeat etc. before we decide on the actual scaling factors (both up and down). Given that we've run so far on clusters of 2000 nodes with heartbeat-interval of 10s, I'd suspect scaling it up by 10s for every 500 nodes is too conservative... anyway I'll believe the numbers when we have them.

        Also, while we are at this, I say we should start to consider busy-ness of JobTracker too, along with the cluster-size. So, for e.g., if the individual tasks are taking in the order of minutes, then it might not matter much if we send one every 20s or so, in some cases it might. I know that the sort's map tasks take around 40s each...

        So, one way to take this into account might be to maintain an average time-to-complete for all tasks in the system (of current jobs) and factor that into the scaling of the intervals.

        Show
        Arun C Murthy added a comment - I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive. +1 I'd like to see some numbers about how long it takes to process a heartbeat etc. before we decide on the actual scaling factors (both up and down). Given that we've run so far on clusters of 2000 nodes with heartbeat-interval of 10s, I'd suspect scaling it up by 10s for every 500 nodes is too conservative... anyway I'll believe the numbers when we have them. Also, while we are at this, I say we should start to consider busy-ness of JobTracker too, along with the cluster-size. So, for e.g., if the individual tasks are taking in the order of minutes, then it might not matter much if we send one every 20s or so, in some cases it might. I know that the sort's map tasks take around 40s each... So, one way to take this into account might be to maintain an average time-to-complete for all tasks in the system (of current jobs) and factor that into the scaling of the intervals.
        Hide
        Amareshwari Sriramadasu added a comment - - edited

        I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive.

        Agreed.

        Now, MapEventsFetcherThread polls jobtracker for completed map tasks for every 5 secs (MIN_POLL_INTERVAL). Shall we change polling interval also in the similar fashion as heartbeat interval? But, here some reduce tasks could be idle for longer time.

        We can have MapEventsFetcherThread polling jobtracker for completed map tasks in the similar fashion as heartbeat interval with MIN_POLL_INTERVAL=5secs and MAX_POLL_INTERVAL=30secs. And Whenever tasktracker finds there are no mapevents and reduce task is waiting for the map events, it will wakeup the thread to fetch map events from job tracker.

        Show
        Amareshwari Sriramadasu added a comment - - edited I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive. Agreed. Now, MapEventsFetcherThread polls jobtracker for completed map tasks for every 5 secs (MIN_POLL_INTERVAL). Shall we change polling interval also in the similar fashion as heartbeat interval? But, here some reduce tasks could be idle for longer time. We can have MapEventsFetcherThread polling jobtracker for completed map tasks in the similar fashion as heartbeat interval with MIN_POLL_INTERVAL=5secs and MAX_POLL_INTERVAL=30secs. And Whenever tasktracker finds there are no mapevents and reduce task is waiting for the map events, it will wakeup the thread to fetch map events from job tracker.
        Hide
        Doug Cutting added a comment -

        > For every additional 500 nodes we increase heartbeat interval by 10 secs.

        I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive.

        Show
        Doug Cutting added a comment - > For every additional 500 nodes we increase heartbeat interval by 10 secs. I wonder if instead we should just make it clusterSize/50+1? That way, small clusters will get a heartbeat of just one second, which should make them more responsive.
        Hide
        Amareshwari Sriramadasu added a comment -

        Here is a proposal for changing heartbeat interval dynamically.

        We will intialize heartbeat interval as HEARTBEAT_INTERVAL(10 secs) in task tracker.
        Once tasktracker transmits heartbeat, jobtracker's response will have next heartbeat interval.

        JobTracker calculates next heart beat interval as follows:

        1. Using Clustersize (Number of task trackers) :
        nextInterval = 10secs * (cluster_size/500 +1)
        i.e. For every additional 500 nodes we increase heartbeat interval by 10 secs.

        2. if (nextInterval >HEARTBEAT_INTERVAL_MAX ) nextInteval = HEARTBEAT_INTERVAL_MAX;
        if this exceeds HEARTBEAT_INTERVAL_MAX (60seconds), next heartbeat interval is 60 seconds.

        3. if dropcount of heartbeats is greater than a threshold drop count, increase interval by 10 more seconds.
        Threshold drop count can be 'clustersize/10' . i.e. If there are 500 nodes in the cluster and more than 50 heartbeats are dropped, then we increase next heartbeat interval by 10 more seconds.

        Thus, next Interval calculation can be the following

        threshold_dropcount = clustersize/10;
        isBusy = dropCount > threshold_dropcount ?1:0;
        nextInterval =  HEARTBEAT_INTERVAL* (cluster_size/500 +1) 
                                           + HEARTBEAT_INTERVAL*isBusy;
        if (nextInterval >HEARTBEAT_INTERVAL_MAX ) nextInteval = HEARTBEAT_INTERVAL_MAX;
        

        Now, MapEventsFetcherThread polls jobtracker for completed map tasks for every 5 secs (MIN_POLL_INTERVAL). Shall we change polling interval also in the similar fashion as heartbeat interval? But, here some reduce tasks could be idle for longer time.

        Any thoughts?

        Show
        Amareshwari Sriramadasu added a comment - Here is a proposal for changing heartbeat interval dynamically. We will intialize heartbeat interval as HEARTBEAT_INTERVAL(10 secs) in task tracker. Once tasktracker transmits heartbeat, jobtracker's response will have next heartbeat interval. JobTracker calculates next heart beat interval as follows: 1. Using Clustersize (Number of task trackers) : nextInterval = 10secs * (cluster_size/500 +1) i.e. For every additional 500 nodes we increase heartbeat interval by 10 secs. 2. if (nextInterval >HEARTBEAT_INTERVAL_MAX ) nextInteval = HEARTBEAT_INTERVAL_MAX; if this exceeds HEARTBEAT_INTERVAL_MAX (60seconds), next heartbeat interval is 60 seconds. 3. if dropcount of heartbeats is greater than a threshold drop count, increase interval by 10 more seconds. Threshold drop count can be 'clustersize/10' . i.e. If there are 500 nodes in the cluster and more than 50 heartbeats are dropped, then we increase next heartbeat interval by 10 more seconds. Thus, next Interval calculation can be the following threshold_dropcount = clustersize/10; isBusy = dropCount > threshold_dropcount ?1:0; nextInterval = HEARTBEAT_INTERVAL* (cluster_size/500 +1) + HEARTBEAT_INTERVAL*isBusy; if (nextInterval >HEARTBEAT_INTERVAL_MAX ) nextInteval = HEARTBEAT_INTERVAL_MAX; Now, MapEventsFetcherThread polls jobtracker for completed map tasks for every 5 secs (MIN_POLL_INTERVAL). Shall we change polling interval also in the similar fashion as heartbeat interval? But, here some reduce tasks could be idle for longer time. Any thoughts?

          People

          • Assignee:
            Amareshwari Sriramadasu
            Reporter:
            Owen O'Malley
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development