Hadoop Common
  1. Hadoop Common
  2. HADOOP-3245

Provide ability to persist running jobs (extend HADOOP-1876)

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change, Reviewed
    • Release Note:
      Introduced recovery of jobs when JobTracker restarts. This facility is off by default. Introduced config parameters mapred.jobtracker.restart.recover, mapred.jobtracker.job.history.block.size, and mapred.jobtracker.job.history.buffer.size.

      Description

      This could probably extend the work done in HADOOP-1876. This feature can be applied for things like jobs being able to survive jobtracker restarts.

      1. HADOOP-3245-v5.36-no-log.patch
        169 kB
        Amar Kamat
      2. HADOOP-3245-v5.36.3-no-log.patch
        176 kB
        Amar Kamat
      3. HADOOP-3245-v5.36.3-no-log.patch
        177 kB
        Amar Kamat
      4. HADOOP-3245-v5.36.2-no-log.patch
        176 kB
        Amar Kamat
      5. HADOOP-3245-v5.36.1-no-log.patch
        176 kB
        Amar Kamat
      6. HADOOP-3245-v5.35.3-no-log.patch
        170 kB
        Amar Kamat
      7. HADOOP-3245-v5.33.1.patch
        175 kB
        Amar Kamat
      8. HADOOP-3245-v5.31.3-nolog.patch
        149 kB
        Amar Kamat
      9. HADOOP-3245-v5.30-nolog.patch
        160 kB
        Amar Kamat
      10. HADOOP-3245-v5.26.patch
        188 kB
        Amar Kamat
      11. HADOOP-3245-v5.14.patch
        170 kB
        Amar Kamat
      12. HADOOP-3245-v5.13.patch
        187 kB
        Amar Kamat
      13. HADOOP-3245-v4.1.patch
        115 kB
        Amar Kamat
      14. HADOOP-3245-v2.6.9.patch
        84 kB
        Amar Kamat
      15. HADOOP-3245-v2.6.5.patch
        84 kB
        Amar Kamat
      16. HADOOP-3245-v2.5.patch
        53 kB
        Amar Kamat

        Issue Links

          Activity

          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #640 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/640/ )
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #618 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/618/ )
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #611 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/611/ )
          Hide
          Amar Kamat added a comment -

          Steve,
          The runtime of these tests are mainly because of the number of maps they run which is 50. The reason for running 50 maps is for having sufficient history size so that recovery can be tested. We can probably reduce the wait time to 1sec or so. I will check if I can reduce the test time and keep you guys posted.

          Show
          Amar Kamat added a comment - Steve, The runtime of these tests are mainly because of the number of maps they run which is 50. The reason for running 50 maps is for having sufficient history size so that recovery can be tested. We can probably reduce the wait time to 1sec or so. I will check if I can reduce the test time and keep you guys posted.
          Hide
          steve_l added a comment -

          I am finding the new TestJobTrackerRestart* tests take a very long time to run on my machine, upward of 10 minutes...if I'm doing anything else on the system at the same time the tests time out and get killed.

          -how long does the test run take for other people on their desktop?
          -there's a lot of waits for a minute or more -are these really needed? Could more polling for the job tracker to be unreachable be used as a metric, or asking the task tracker if they are still live?

          if thes test does take a long time, is it possible to somehow group this into the 'slow test' category; otherwise my test cycle has just been extended by 50%.

          Show
          steve_l added a comment - I am finding the new TestJobTrackerRestart* tests take a very long time to run on my machine, upward of 10 minutes...if I'm doing anything else on the system at the same time the tests time out and get killed. -how long does the test run take for other people on their desktop? -there's a lot of waits for a minute or more -are these really needed? Could more polling for the job tracker to be unreachable be used as a metric, or asking the task tracker if they are still live? if thes test does take a long time, is it possible to somehow group this into the 'slow test' category; otherwise my test cycle has just been extended by 50%.
          Hide
          dhruba borthakur added a comment -

          Nice work folks! I am eagerly waiting to deploy 0.19 so that I can get this feature. Thanks guys.

          Show
          dhruba borthakur added a comment - Nice work folks! I am eagerly waiting to deploy 0.19 so that I can get this feature. Thanks guys.
          Hide
          Hemanth Yamijala added a comment -

          (this was a long journey indeed)!

          With a happy ending, I hope ! smile. Thanks Devaraj for the multiple reviews and improvements.

          Show
          Hemanth Yamijala added a comment - (this was a long journey indeed)! With a happy ending, I hope ! smile . Thanks Devaraj for the multiple reviews and improvements.
          Hide
          Devaraj Das added a comment -

          I just committed this. Thanks, Amar (this was a long journey indeed)!

          Show
          Devaraj Das added a comment - I just committed this. Thanks, Amar (this was a long journey indeed)!
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12390114/HADOOP-3245-v5.36.3-no-log.patch
          against trunk revision 695604.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 22 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12390114/HADOOP-3245-v5.36.3-no-log.patch against trunk revision 695604. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 22 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3264/console This message is automatically generated.
          Hide
          Amar Kamat added a comment -

          Attaching a new patch that fixes a small bug in the previous patch. With HADOOP-3150, it was assumed that while invoking TaskInProgress.shouldCommit(), the variable taskCommit is set. Upon restart, the variable remained null as there is no COMMIT_PENDING attempt in recovery. There used to be NPE when the tracker with COMMIT_PENDING attempts (for completed tips) tried to rejoin the jobtracker. Now TaskInProgress.shouldCommit() also checks if the tip is complete or not.

          Ant test failed on the following test

          • TestHighRAMJobs
          • TestMapRed
          • TestLocalJobControl
            These tests failed on trunk too.

          Tested this patch on 100 nodes and restart work fine.

          Show
          Amar Kamat added a comment - Attaching a new patch that fixes a small bug in the previous patch. With HADOOP-3150 , it was assumed that while invoking TaskInProgress.shouldCommit() , the variable taskCommit is set. Upon restart, the variable remained null as there is no COMMIT_PENDING attempt in recovery. There used to be NPE when the tracker with COMMIT_PENDING attempts (for completed tips) tried to rejoin the jobtracker. Now TaskInProgress.shouldCommit() also checks if the tip is complete or not. Ant test failed on the following test TestHighRAMJobs TestMapRed TestLocalJobControl These tests failed on trunk too. Tested this patch on 100 nodes and restart work fine.
          Hide
          Amar Kamat added a comment -

          test-patch passes the core tests on the local machine with few warnings. The attached patch fixes those warnings. Thanks Amareshwari for testing it for me.

          Show
          Amar Kamat added a comment - test-patch passes the core tests on the local machine with few warnings. The attached patch fixes those warnings. Thanks Amareshwari for testing it for me.
          Hide
          Amar Kamat added a comment -

          Attaching a patch the fixes some findbugs/javadoc warnings. Following are the changes
          1) [Counter|Group|Counters].contentEquals() are now synchronized.
          2) Access to JobInProgress.launchTime is now synchronized
          3) Fixed javadoc warning in JobHistory.java.

          Show
          Amar Kamat added a comment - Attaching a patch the fixes some findbugs/javadoc warnings. Following are the changes 1) [Counter|Group|Counters] .contentEquals() are now synchronized. 2) Access to JobInProgress.launchTime is now synchronized 3) Fixed javadoc warning in JobHistory.java .
          Hide
          Amar Kamat added a comment -

          Attaching a patch updated to trunk.

          Show
          Amar Kamat added a comment - Attaching a patch updated to trunk.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that is updated to trunk. Following are the changes
          1) There seems to be a bug in RawLocalFileSystem, see here. A simple fix is to synchronize all such accesses to history directory in JobHistory. Clearly this solution wont work if some external process changes/deletes files from jobhistory directory. The attached patch synchronizes all the apis that might get invoked simultaneously.
          2) JobHistory.parseHistoryFromFS() allows incomplete log lines to be passed to the listener. This is an issue for RecoveryManager as partial updates might lead to inconsistent state. One way to overcome this is to have a LINE_DELIMITER just to indicate that the line is complete.
          The only drawback with this approach is that its a backward incompatible change where the new parser (with line-delimiters) wont be able to parse old jobhistory files. Will open a jira to address this. For now I have implemented the LINE_DELIMITER approach.

          Show
          Amar Kamat added a comment - Attaching a patch that is updated to trunk. Following are the changes 1) There seems to be a bug in RawLocalFileSystem , see here . A simple fix is to synchronize all such accesses to history directory in JobHistory . Clearly this solution wont work if some external process changes/deletes files from jobhistory directory. The attached patch synchronizes all the apis that might get invoked simultaneously. 2) JobHistory.parseHistoryFromFS() allows incomplete log lines to be passed to the listener. This is an issue for RecoveryManager as partial updates might lead to inconsistent state. One way to overcome this is to have a LINE_DELIMITER just to indicate that the line is complete. The only drawback with this approach is that its a backward incompatible change where the new parser (with line-delimiters) wont be able to parse old jobhistory files. Will open a jira to address this. For now I have implemented the LINE_DELIMITER approach.
          Hide
          Amar Kamat added a comment -

          I have manually tested the patch w.r.t counter recovery and update. Some counters are recovered using HADOOP-3970 while some are dynamically updated based on replay of events (job level counters like DATA_LOCALITY etc). Have manually checked and found the counters upon restart are very similar to the ones before the restart.

          Show
          Amar Kamat added a comment - I have manually tested the patch w.r.t counter recovery and update. Some counters are recovered using HADOOP-3970 while some are dynamically updated based on replay of events (job level counters like DATA_LOCALITY etc). Have manually checked and found the counters upon restart are very similar to the ones before the restart.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that incorporates Devaraj's comments.

          Changes are as follows :
          1) Fixed a bug w.r.t decoding/encoding of history filenames. Earlier the filenames were pattern-matched without decoding.
          2) Removed line delimiters. Earlier the parser used to prematurely terminate the history line breaking the test cases. HADOOP-2304 will fix that. That was just a small hack to get this patch working.
          3) Removed the comparator used in RecoveryManager for comparing job-id. JobID.compareTo() will suffice.
          4) Fixed a bug w.r.t trimming of job names.
          5) Fixed the bug in MiniMRCluster.setJobPriority() to use JobTracker.setJobPriority() instead of JobInProgress.setPriority().
          6) Fixed a bug in RecoveryManager to do with 0 start-time. Now RecoveryManager applies job-updates only if the updates are available. Earlier the job start-time was set to 0 if the fields were missing.
          7) Test case now has 3 separate checks

          • job-submission-order
          • job-start-time (NEW)
          • job-finish-time

            Note that I have used some code from HADOOP-4112 to do with TASK_TYPE=CLEANUP. Will fix that once HADOOP-4112 is committed.

          Show
          Amar Kamat added a comment - Attaching a patch that incorporates Devaraj's comments. Changes are as follows : 1) Fixed a bug w.r.t decoding/encoding of history filenames. Earlier the filenames were pattern-matched without decoding. 2) Removed line delimiters. Earlier the parser used to prematurely terminate the history line breaking the test cases. HADOOP-2304 will fix that. That was just a small hack to get this patch working. 3) Removed the comparator used in RecoveryManager for comparing job-id. JobID.compareTo() will suffice. 4) Fixed a bug w.r.t trimming of job names. 5) Fixed the bug in MiniMRCluster.setJobPriority() to use JobTracker.setJobPriority() instead of JobInProgress.setPriority() . 6) Fixed a bug in RecoveryManager to do with 0 start-time. Now RecoveryManager applies job-updates only if the updates are available. Earlier the job start-time was set to 0 if the fields were missing. 7) Test case now has 3 separate checks job-submission-order job-start-time (NEW) job-finish-time Note that I have used some code from HADOOP-4112 to do with TASK_TYPE=CLEANUP. Will fix that once HADOOP-4112 is committed.
          Hide
          Amar Kamat added a comment -

          1) Make the check for status as RUNNING explicitly in JobTracker.RecoveryManager.JobRecoveryListener.checkAndInit

          Done

          Rename the variable 'cause' in JobHistory.Task.LogFailed failedDueToAttempt

          Done

          Call JobInProgressListener.jobUpdated after the job recovery

          Done

          ReduceTask need not check for copied maps upon restart as copyOutput already does it.

          Done

          Overall, this patch should be tested thoroughly under various conditions like map partially complete, reduces partially complete

          Updated the test case to make sure that the reducer sees map events (~50%) before killing the jobtracker. This will now test the rollback logic as the test case checks if the order in which the events are available at the tasktracker is same as the order available at the (restarted) jobtracker.

          I will manually test the patch to see the job-level counters, that are reconstructed from the history, match across restarts. I am waiting for HADOOP-4112 as its a blocker for this issue.

          Show
          Amar Kamat added a comment - 1) Make the check for status as RUNNING explicitly in JobTracker.RecoveryManager.JobRecoveryListener.checkAndInit Done Rename the variable 'cause' in JobHistory.Task.LogFailed failedDueToAttempt Done Call JobInProgressListener.jobUpdated after the job recovery Done ReduceTask need not check for copied maps upon restart as copyOutput already does it. Done Overall, this patch should be tested thoroughly under various conditions like map partially complete, reduces partially complete Updated the test case to make sure that the reducer sees map events (~50%) before killing the jobtracker. This will now test the rollback logic as the test case checks if the order in which the events are available at the tasktracker is same as the order available at the (restarted) jobtracker. I will manually test the patch to see the job-level counters, that are reconstructed from the history, match across restarts. I am waiting for HADOOP-4112 as its a blocker for this issue.
          Hide
          Devaraj Das added a comment -

          Overall it looks good to me..
          A few comments:
          1) Make the check for status as RUNNING explicitly in JobTracker.RecoveryManager.JobRecoveryListener.checkAndInit
          2) Rename the variable 'cause' in JobHistory.Task.LogFailed failedDueToAttempt
          3) Call JobInProgressListener.jobUpdated after the job recovery
          4) ReduceTask need not check for copied maps upon restart as copyOutput already does it.

          Overall, this patch should be tested thoroughly under various conditions like map partially complete, reduces partially complete, the values for the counters being consistent across restarts, etc.

          Show
          Devaraj Das added a comment - Overall it looks good to me.. A few comments: 1) Make the check for status as RUNNING explicitly in JobTracker.RecoveryManager.JobRecoveryListener.checkAndInit 2) Rename the variable 'cause' in JobHistory.Task.LogFailed failedDueToAttempt 3) Call JobInProgressListener.jobUpdated after the job recovery 4) ReduceTask need not check for copied maps upon restart as copyOutput already does it. Overall, this patch should be tested thoroughly under various conditions like map partially complete, reduces partially complete, the values for the counters being consistent across restarts, etc.
          Hide
          Amar Kamat added a comment -

          One comment on the patch.

          Approach :

          The way history renaming is done in this patch is as follows

          • given the job-id, job-name and the user-name, try to find out a file from the history folder that matches the pattern : jt-hostname_[0-9]*_jobid_jobname_username
          • if any file matches the pattern, say file f, then use f.recover as the new file for history. If the file f.recover is recovered, then rename f.recover to f and use f.recover as the new file for history.
          • On successful recovery, delete f
          • On job completion, rename f.recover to f.
          • If the jt restarts in between, use the older file as the file for recovery.

          Problem :

          With trunk, only 1 dfs access is made while starting the log process for a job. With this patch there will be 4 dfs accesses

          • Check if the job has a history file [ false for new jobs]
          • Check if file exists [false for new jobs]
          • Check if file.recovery exists [false for new jobs]
          • Open file for logging

          I think it makes more sense to create a new job file upon every restart. Before starting the recovery process, delete all the history files related to the job except the oldest file. Note that the history filename has timestamp in it so that detecting the oldest file will now easy.

          Example :
          Say that the job started with the timestamp t1. The job history filename would be hostname_t1_jobid_jobname_username. Upon restart, delete all the file related to job except the oldest file. Now new filename would be hostname_t2_jobid_jobname_username. Use hostname_t1_jobid_jobname_username as the source for recovery. If the jobtracker dies while recovering then there will be 2 history file for the job, delete hostname_t2_jobid_jobname_username upon recovery and use hostname_t1_jobid_jobname_username for recovery. If the recovery is successful, delete hostname_t1_jobid_jobname_username just to make sure that the latest history file will be used upon next restart. There is no renaming and no temp file involved in this approach.

          Note that at a given time there will be at the max 2 history files per job.

          Show
          Amar Kamat added a comment - One comment on the patch. Approach : The way history renaming is done in this patch is as follows given the job-id, job-name and the user-name, try to find out a file from the history folder that matches the pattern : jt-hostname_ [0-9] *_jobid_jobname_username if any file matches the pattern, say file f , then use f.recover as the new file for history. If the file f.recover is recovered, then rename f.recover to f and use f.recover as the new file for history. On successful recovery, delete f On job completion, rename f.recover to f . If the jt restarts in between, use the older file as the file for recovery. Problem : With trunk, only 1 dfs access is made while starting the log process for a job. With this patch there will be 4 dfs accesses Check if the job has a history file [ false for new jobs] Check if file exists [false for new jobs] Check if file.recovery exists [false for new jobs] Open file for logging I think it makes more sense to create a new job file upon every restart. Before starting the recovery process, delete all the history files related to the job except the oldest file. Note that the history filename has timestamp in it so that detecting the oldest file will now easy. Example : Say that the job started with the timestamp t1. The job history filename would be hostname_t1_jobid_jobname_username . Upon restart, delete all the file related to job except the oldest file. Now new filename would be hostname_t2_jobid_jobname_username . Use hostname_t1_jobid_jobname_username as the source for recovery. If the jobtracker dies while recovering then there will be 2 history file for the job, delete hostname_t2_jobid_jobname_username upon recovery and use hostname_t1_jobid_jobname_username for recovery. If the recovery is successful, delete hostname_t1_jobid_jobname_username just to make sure that the latest history file will be used upon next restart. There is no renaming and no temp file involved in this approach. Note that at a given time there will be at the max 2 history files per job.
          Hide
          Amar Kamat added a comment -

          HADOOP-3150 made changes to job history because of which this patch needs to be reworked. Opened HADOOP-4112 to address the change.

          Show
          Amar Kamat added a comment - HADOOP-3150 made changes to job history because of which this patch needs to be reworked. Opened HADOOP-4112 to address the change.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that incorporates Devaraj's comments. Following are the changes :
          1) Listener is now stateless and inline. Tasks that are created but not finished are killed using ExpiryLaunchingTasks thread.
          2) Counters are also logged as part of a successful attempt. This helps in making the listener stateless.
          3) A dumb implementation of counter recovery is implemented in this patch and the tests ignore the counters while testing the equality of TaskReports. HADOOP-3970 should address this.
          4) The test case now test the effect of lost tracker while the jobtracker is down.
          5) A job's state changes to SUCCEEDED/FAILED/KILLED only after garbage collect since garbage collect finalizes the recovery process.

          Show
          Amar Kamat added a comment - Attaching a patch that incorporates Devaraj's comments. Following are the changes : 1) Listener is now stateless and inline. Tasks that are created but not finished are killed using ExpiryLaunchingTasks thread. 2) Counters are also logged as part of a successful attempt. This helps in making the listener stateless. 3) A dumb implementation of counter recovery is implemented in this patch and the tests ignore the counters while testing the equality of TaskReports. HADOOP-3970 should address this. 4) The test case now test the effect of lost tracker while the jobtracker is down. 5) A job's state changes to SUCCEEDED/FAILED/KILLED only after garbage collect since garbage collect finalizes the recovery process.
          Hide
          Devaraj Das added a comment -

          I'd like to see the history parsing and JobInProgress object creation/update happening inline. In the current patch, the JT memory requirement is doubled during recovery as well as the recovery time is doubled since you do two passes over the history data.
          Remove the check for hasRestarted() in the recover method. That will be always true.

          Show
          Devaraj Das added a comment - I'd like to see the history parsing and JobInProgress object creation/update happening inline. In the current patch, the JT memory requirement is doubled during recovery as well as the recovery time is doubled since you do two passes over the history data. Remove the check for hasRestarted() in the recover method. That will be always true.
          Hide
          Amar Kamat added a comment -

          Attaching a new patch.
          Additions :
          1) adds the switch (mapred.jobtracker.restart.recover) in hadoop-default.xml to turn on/off the recovery upon restart. By default the switch is switched off
          2) some bug fixes
          3) the test case now is improved to include

          • recovery switch test
          • matching the task completion events with the jobtracker and the task tracker upon restart

          4) This patch now assumes HADOOP-3970 and hence will break if its missing.

          Test :

          • Ran sort benchmark on 500 nodes with frequent restarts. Jobs resumes smoothly.
          • Killed a task-tracker while the jobtracker was down. The tracker was declared lost and all the tasks that ran on the tracker got re-executed.

          Known issues :
          1) jobhistory.jsp uses the history filename to infer the job's name. Upon restart, the history filename is added a '.recover' extension. Hence running jobs on the history page will show a '.recover' along with the actual jobname. Note that the history filename is renamed upon completion and hence completed jobs will display the jobname correctly.
          2) Every restart will create a new conf file in the history folder
          3) Some reduce tasks show progress more than 100%. Looks like the reduce phase shows the progress more than expected. Since this patch doesnt touch the REDUCE phase I suspect that the Reduce phase progress-reporting code is buggy.
          4) This patch still implements the greedy reconnect and hence few tasks gets scheduled unnecessarily. For now I dont think this is a concern. Something like a safe-mode seems to be a solution but needs some more thought.
          5) Devaraj's comment #9 here about reducer's committing suicide is still not addressed.
          6) In case where a map attempt is killed (externally/lost-tracker) and is not logged to history, it will take some time for the jobtracker to detect that the map is missing. Probably asking the task-trackers about what maps are currenly hosted seems to be a solution, this issue needs some more thought.

          Todo :
          1) There are some extra logs for debugging. Remove that and add appropriate debug logs.

          Show
          Amar Kamat added a comment - Attaching a new patch. Additions : 1) adds the switch (mapred.jobtracker.restart.recover) in hadoop-default.xml to turn on/off the recovery upon restart. By default the switch is switched off 2) some bug fixes 3) the test case now is improved to include recovery switch test matching the task completion events with the jobtracker and the task tracker upon restart 4) This patch now assumes HADOOP-3970 and hence will break if its missing. Test : Ran sort benchmark on 500 nodes with frequent restarts. Jobs resumes smoothly. Killed a task-tracker while the jobtracker was down. The tracker was declared lost and all the tasks that ran on the tracker got re-executed. Known issues : 1) jobhistory.jsp uses the history filename to infer the job's name. Upon restart, the history filename is added a '.recover' extension. Hence running jobs on the history page will show a '.recover' along with the actual jobname. Note that the history filename is renamed upon completion and hence completed jobs will display the jobname correctly. 2) Every restart will create a new conf file in the history folder 3) Some reduce tasks show progress more than 100%. Looks like the reduce phase shows the progress more than expected. Since this patch doesnt touch the REDUCE phase I suspect that the Reduce phase progress-reporting code is buggy. 4) This patch still implements the greedy reconnect and hence few tasks gets scheduled unnecessarily. For now I dont think this is a concern. Something like a safe-mode seems to be a solution but needs some more thought. 5) Devaraj's comment #9 here about reducer's committing suicide is still not addressed. 6) In case where a map attempt is killed (externally/lost-tracker) and is not logged to history, it will take some time for the jobtracker to detect that the map is missing. Probably asking the task-trackers about what maps are currenly hosted seems to be a solution, this issue needs some more thought. Todo : 1) There are some extra logs for debugging. Remove that and add appropriate debug logs.
          Hide
          Amar Kamat added a comment -

          There is a small bug in the patch.

          JobHistory.java
                final Pattern historyFilePattern = 
                  Pattern.compile(JOBTRACKER_UNIQUE_STRING + "[0-9]+" + "_" 
                                  + id.toString() + "_" + user + "_" + jobName + "+");
          

          should be

          JobHistory.java
                final Pattern historyFilePattern = 
                  Pattern.compile(jobtrackerHostname + "_" + "[0-9]+" + "_" 
                                  + id.toString() + "_" + user + "_" + jobName + "+");
          

          as JOBTRACKER_UNIQUE_STRING itself contains a (new) timestamp and hence old history files wont match the pattern. Will upload a modified patch soon.

          Show
          Amar Kamat added a comment - There is a small bug in the patch. JobHistory.java final Pattern historyFilePattern = Pattern.compile(JOBTRACKER_UNIQUE_STRING + "[0-9]+" + "_" + id.toString() + "_" + user + "_" + jobName + "+" ); should be JobHistory.java final Pattern historyFilePattern = Pattern.compile(jobtrackerHostname + "_" + "[0-9]+" + "_" + id.toString() + "_" + user + "_" + jobName + "+" ); as JOBTRACKER_UNIQUE_STRING itself contains a (new) timestamp and hence old history files wont match the pattern. Will upload a modified patch soon.
          Hide
          Hemanth Yamijala added a comment -

          Note that a working solution for HADOOP-3970 is present in this patch too.

          I think it is better to submit the solution of HADOOP-3970 as part of that JIRA, rather than including it in this patch. If I understand correctly, without this fix, everything except counters will be recovered for running jobs on a restart. Probably HADOOP-3970 should block this and get committed first.

          Also, I think it will be good to have a configuration variable to disable job recovery. This is to provide backwards compatibility, and also gives a way to disable this feature if required.

          Show
          Hemanth Yamijala added a comment - Note that a working solution for HADOOP-3970 is present in this patch too. I think it is better to submit the solution of HADOOP-3970 as part of that JIRA, rather than including it in this patch. If I understand correctly, without this fix, everything except counters will be recovered for running jobs on a restart. Probably HADOOP-3970 should block this and get committed first. Also, I think it will be good to have a configuration variable to disable job recovery. This is to provide backwards compatibility, and also gives a way to disable this feature if required.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that incorporates Devaraj's comments. Testing in progress. Other changes are as follows
          1) JobInProgress now has no notion of restarts and hence the web-ui remains unchanged. Its upto JobHistory to take care of restart and manage existing files.
          2) RecoveryManager and Listener are cleaned and are now simple.

          Note that a working solution for HADOOP-3970 is present in this patch too.

          Show
          Amar Kamat added a comment - Attaching a patch that incorporates Devaraj's comments. Testing in progress. Other changes are as follows 1) JobInProgress now has no notion of restarts and hence the web-ui remains unchanged. Its upto JobHistory to take care of restart and manage existing files. 2) RecoveryManager and Listener are cleaned and are now simple. Note that a working solution for HADOOP-3970 is present in this patch too.
          Hide
          Amar Kamat added a comment -

          Got an offline review from Devaraj. Comments are as follows :
          1) isJobName could be named isJobNameValid
          2) isJobDirClean could be named isJobDirValid
          3) Revert the changes w.r.t Jobtracker state
          4) The check for job.hasRestared() is redundant in RecoveryManager.recover()
          5) JIP constructor should not do recovery. Refactor it and call from Jobtracker
          6) Revert changes to job id
          7) Revert changes to history filename and use filters for detecting the filename
          8) Upon recovery, Jobtracker should not wait for all the jobs to be initialized. It should check if there is any history (w.r.t attempts) available and only then should wait for the initialization
          9) Use job-priority for detecting if there are job level (meta) information available instead of num-attempts found/recovered
          10) Counter should be logged at TIP level rather than attempt level
          11) Remove Tasktracker-hostname logging and user tracker-name to find out the hostname.
          12) Existence of an attempt should not be known to the Jobtracker. Move the logic back to job and tip.

          Vivek suggested to do Job.init() inside the jobtracker's recovery process and update the scheduler.

          Show
          Amar Kamat added a comment - Got an offline review from Devaraj. Comments are as follows : 1) isJobName could be named isJobNameValid 2) isJobDirClean could be named isJobDirValid 3) Revert the changes w.r.t Jobtracker state 4) The check for job.hasRestared() is redundant in RecoveryManager.recover() 5) JIP constructor should not do recovery. Refactor it and call from Jobtracker 6) Revert changes to job id 7) Revert changes to history filename and use filters for detecting the filename 8) Upon recovery, Jobtracker should not wait for all the jobs to be initialized. It should check if there is any history (w.r.t attempts) available and only then should wait for the initialization 9) Use job-priority for detecting if there are job level (meta) information available instead of num-attempts found/recovered 10) Counter should be logged at TIP level rather than attempt level 11) Remove Tasktracker-hostname logging and user tracker-name to find out the hostname. 12) Existence of an attempt should not be known to the Jobtracker. Move the logic back to job and tip. Vivek suggested to do Job.init() inside the jobtracker's recovery process and update the scheduler.
          Hide
          Hemanth Yamijala added a comment -

          Can you please open a new JIRA so they wouldn't be lost and add a comment with the number here ?

          Dhruba opened HADOOP-4038 for handling this.

          Show
          Hemanth Yamijala added a comment - Can you please open a new JIRA so they wouldn't be lost and add a comment with the number here ? Dhruba opened HADOOP-4038 for handling this.
          Hide
          Hemanth Yamijala added a comment -

          Dhruba, Pete, your use-cases seem reasonable to me. Can you please open a new JIRA so they wouldn't be lost and add a comment with the number here ?

          Show
          Hemanth Yamijala added a comment - Dhruba, Pete, your use-cases seem reasonable to me. Can you please open a new JIRA so they wouldn't be lost and add a comment with the number here ?
          Hide
          Amar Kamat added a comment -

          Attaching a patch that has the following modifications
          1) Renamed some of the parameters in JIP to maintain consistency
          2) Whether or not the job recovery was successful or not is now displayed on the web-ui and JIP is changed accordingly
          3) There was a bug in the earlier patch. In case of an incomplete TIP, the earlier patch ignored the attempts under it. This can be problematic if the attempt under the TIP has failed and the TIP remains in RUNNING state. This can lead to changed task-completion-events.
          4) JIP has two failedTask() apis. One of which fails a already completed attempt and logs to the jobhistory. Earlier patch ignored this log. The new patch calls both the failedTask() apis in the way it should be called.
          5) Testcase now includes the check for job execution ordering. It checks if the order in which the jobs we executed before the JT got killed is same as that upon restart.
          6) Currently, the submit time of a JIP is the time when the constructor is invoked which is not true with restart. Hence earlier patch changed the submit time upon restart but did not log it to the history. So upon the second restart the previous submit time was lost. Added a api to log job submit time.
          7) Added hashOf() wherever equals() was overridden to take care of findbugs warning.

          Tested on 500 nodes and work fine. Will continue testing. Ant test failed on 2 tests namely TestMapRed and TestLocalJobControl while findbugs warning were related to empty-catch-block and make-static-inner-class types. Investigating into it.

          Show
          Amar Kamat added a comment - Attaching a patch that has the following modifications 1) Renamed some of the parameters in JIP to maintain consistency 2) Whether or not the job recovery was successful or not is now displayed on the web-ui and JIP is changed accordingly 3) There was a bug in the earlier patch. In case of an incomplete TIP, the earlier patch ignored the attempts under it. This can be problematic if the attempt under the TIP has failed and the TIP remains in RUNNING state. This can lead to changed task-completion-events . 4) JIP has two failedTask() apis. One of which fails a already completed attempt and logs to the jobhistory. Earlier patch ignored this log. The new patch calls both the failedTask() apis in the way it should be called. 5) Testcase now includes the check for job execution ordering. It checks if the order in which the jobs we executed before the JT got killed is same as that upon restart. 6) Currently, the submit time of a JIP is the time when the constructor is invoked which is not true with restart. Hence earlier patch changed the submit time upon restart but did not log it to the history. So upon the second restart the previous submit time was lost. Added a api to log job submit time. 7) Added hashOf() wherever equals() was overridden to take care of findbugs warning. Tested on 500 nodes and work fine. Will continue testing. Ant test failed on 2 tests namely TestMapRed and TestLocalJobControl while findbugs warning were related to empty-catch-block and make-static-inner-class types. Investigating into it.
          Hide
          Pete Wyckoff added a comment -

          "safe-mode" that is automatically triggered when the namenode restarts. A similar thing wold be very helpful for the JobTracker.

          Also, it would be nice to stop the JT from taking new submissions so the administrator can allow the running jobs to complete before upgrading the cluster. Even though we can persist these jobs, if there's a big upgrade or something else, the admin may want to drain all running jobs before stopping the cluster.

          Show
          Pete Wyckoff added a comment - "safe-mode" that is automatically triggered when the namenode restarts. A similar thing wold be very helpful for the JobTracker. Also, it would be nice to stop the JT from taking new submissions so the administrator can allow the running jobs to complete before upgrading the cluster. Even though we can persist these jobs, if there's a big upgrade or something else, the admin may want to drain all running jobs before stopping the cluster.
          Hide
          dhruba borthakur added a comment -

          I like Amar's proposal. It helps administration as well. Many-a-times when we restart the jobtracker, we would like all task-trackers to check in before the job-tracker starts running jobs. HDFS has something called "safe-mode" that is automatically triggered when the namenode restarts. A similar thing wold be very helpful for the JobTracker. IT allows an administrator to restart a cluster, and then restart other pieces of software (e.g. etl pipelines) before opening the cluster to run jobs. I would like to open a seperate JIRA for doing this one.

          Show
          dhruba borthakur added a comment - I like Amar's proposal. It helps administration as well. Many-a-times when we restart the jobtracker, we would like all task-trackers to check in before the job-tracker starts running jobs. HDFS has something called "safe-mode" that is automatically triggered when the namenode restarts. A similar thing wold be very helpful for the JobTracker. IT allows an administrator to restart a cluster, and then restart other pieces of software (e.g. etl pipelines) before opening the cluster to run jobs. I would like to open a seperate JIRA for doing this one.
          Hide
          Amar Kamat added a comment -

          One more issue that needs to be addressed is trashing . When the jobtracker restarts, it will recover logged tasks and schedule the rest. Some trackers that join early might get a task which is running on a tracker that has not yet joined. Under such a case both the attempts will run in parallel and the task that finishes first will kill the other. The problem with this is that the slots will be wasted. Also this will add to the job runtime if the tasks are long running. Some delay in opening the scheduling window might help. It looks like a minor issue for now and can be handled in a separate issue.

          Show
          Amar Kamat added a comment - One more issue that needs to be addressed is trashing . When the jobtracker restarts, it will recover logged tasks and schedule the rest. Some trackers that join early might get a task which is running on a tracker that has not yet joined. Under such a case both the attempts will run in parallel and the task that finishes first will kill the other. The problem with this is that the slots will be wasted. Also this will add to the job runtime if the tasks are long running. Some delay in opening the scheduling window might help. It looks like a minor issue for now and can be handled in a separate issue.
          Hide
          Amar Kamat added a comment -

          Attaching a tested patch with the test case. I need to test it more. There are still some issues with the patch.
          1) Reduce progress seems to be 99.* even though the job is complete. Not sure if its related to the patch.
          2) Test case shows exception on success.
          3. Clean up the patch w.r.t comments, refactoring and optimizations


          Note that a temporary fix for HADOOP-3970 is present in this patch too.

          Show
          Amar Kamat added a comment - Attaching a tested patch with the test case. I need to test it more. There are still some issues with the patch. 1) Reduce progress seems to be 99.* even though the job is complete. Not sure if its related to the patch. 2) Test case shows exception on success. 3. Clean up the patch w.r.t comments, refactoring and optimizations Note that a temporary fix for HADOOP-3970 is present in this patch too.
          Hide
          Owen O'Malley added a comment -

          I think that the running time should continue to be just finish - start time. The JT bounce is still wall clock time and it will be less confusing to include it than subtracting it out.

          Show
          Owen O'Malley added a comment - I think that the running time should continue to be just finish - start time. The JT bounce is still wall clock time and it will be less confusing to include it than subtracting it out.
          Hide
          dhruba borthakur added a comment -

          I have seen that having a "safe-mode" kind of thing for the JobTracker is really nice to have (maybe not as part of this patch). When safe-mode is enabled, only commands from the adminsitrator are accepted. Many a times, when I upgrade a cluster and restart the job-tracker, user's cron-scripts immediately start submitting jobs to the tracker. Ideally, i would like the administrator to submit a few validate-cluster-kinda jobs before I open the cluster to general users.

          Show
          dhruba borthakur added a comment - I have seen that having a "safe-mode" kind of thing for the JobTracker is really nice to have (maybe not as part of this patch). When safe-mode is enabled, only commands from the adminsitrator are accepted. Many a times, when I upgrade a cluster and restart the job-tracker, user's cron-scripts immediately start submitting jobs to the tracker. Ideally, i would like the administrator to submit a few validate-cluster-kinda jobs before I open the cluster to general users.
          Hide
          Amar Kamat added a comment -

          Had an offline discussion with Devaraj and Hemanth. Attaching a patch the incorporates their comments which are as follows
          1) Avoid safe mode by delaying the start of ipc server. The ipc server starts only after the JobTracker has recovered. This avoids any extra coding at the tasktracker side.
          2) Avoid having any registration window as TrackerExpiryThread will take care of the tracker that were lost while the tracker was down.
          3) Remove unnecessary changes done to the Task/TaskAttempt logging with respect to passing of counters


          Things taken care from the todo list
          1) Re-factored out the code related to recovery under RecoveryManager


          Things that need more work/discussion
          1) Is safe mode required? Whether we want to start the ipc server early is the question we need to answer. Starting it early will allow JobClient and TaskTracker to connect to the JobTracker. Its the JobTracker's responsibility to handle the connection. It could either throw an exception or could reply with a dummy response. Apart from the fact that the JobClient can now detect that the JT is up but under maintenance and take some specific actions, there seems no reason to have the ipc services running before recovery (i.e to have the safe mode)
          2) W.r.t point #7 in my earlier comment (here) it seems that the time to detect the previously killed tasks will depend on
          2.1) number of reducers
          2.2) Reducers ability to report back the fetch failures
          It seems we can do better by asking the trackers about the list of maps that are currently hosted by the tracker. This is the list of tasks that the tracker that are successful. jobTracker can now kill all the tasks that were not claimed. We feel this can be dealt in a separate issue.


          I am currently testing the patch on a larger cluster.

          Show
          Amar Kamat added a comment - Had an offline discussion with Devaraj and Hemanth. Attaching a patch the incorporates their comments which are as follows 1) Avoid safe mode by delaying the start of ipc server. The ipc server starts only after the JobTracker has recovered. This avoids any extra coding at the tasktracker side. 2) Avoid having any registration window as TrackerExpiryThread will take care of the tracker that were lost while the tracker was down. 3) Remove unnecessary changes done to the Task/TaskAttempt logging with respect to passing of counters Things taken care from the todo list 1) Re-factored out the code related to recovery under RecoveryManager Things that need more work/discussion 1) Is safe mode required? Whether we want to start the ipc server early is the question we need to answer. Starting it early will allow JobClient and TaskTracker to connect to the JobTracker. Its the JobTracker's responsibility to handle the connection. It could either throw an exception or could reply with a dummy response. Apart from the fact that the JobClient can now detect that the JT is up but under maintenance and take some specific actions, there seems no reason to have the ipc services running before recovery (i.e to have the safe mode) 2) W.r.t point #7 in my earlier comment ( here ) it seems that the time to detect the previously killed tasks will depend on 2.1) number of reducers 2.2) Reducers ability to report back the fetch failures It seems we can do better by asking the trackers about the list of maps that are currently hosted by the tracker. This is the list of tasks that the tracker that are successful. jobTracker can now kill all the tasks that were not claimed. We feel this can be dealt in a separate issue. I am currently testing the patch on a larger cluster.
          Hide
          dhruba borthakur added a comment -

          The time os 1 hour is not configurable. This is the hard-limit of a lease recovery and is internal to the NameNode.

          Show
          dhruba borthakur added a comment - The time os 1 hour is not configurable. This is the hard-limit of a lease recovery and is internal to the NameNode.
          Hide
          Amar Kamat added a comment -

          If you have done a sync after a write to a HDFS file and then do not do anything else to that file for 1 hour the namenode will recover that file and will make all the data that was synced earlier to appear in the file.

          Is this time (1hr) configurable? Can it be configured on per file basis?

          Show
          Amar Kamat added a comment - If you have done a sync after a write to a HDFS file and then do not do anything else to that file for 1 hour the namenode will recover that file and will make all the data that was synced earlier to appear in the file. Is this time (1hr) configurable? Can it be configured on per file basis?
          Hide
          dhruba borthakur added a comment -

          I think you would not lose any data (even if small). If you have done a sync after a write to a HDFS file and then do not do anything else to that file for 1 hour, the namenode will recover that file and will make all the data that was synced earlier to appear in the file.

          Show
          dhruba borthakur added a comment - I think you would not lose any data (even if small). If you have done a sync after a write to a HDFS file and then do not do anything else to that file for 1 hour, the namenode will recover that file and will make all the data that was synced earlier to appear in the file.
          Hide
          Amar Kamat added a comment -

          1) This approach/patch works fine with history on local fs. With history on HDFS, the history file becomes visible but not available (i.e file-size = 0). The file becomes available only on close(). Sync() documentation indicates that the file-data availability is not guaranteed.

          The recently uploaded patch makes the block size of the history-file configurable. The parameter to change it is mapred.jobtracker.job.history.block.size (in bytes). For testing it, I started a small job on a single node cluster with history-block-size as 1k. It seems to work fine and the history file is now visible and available. With respect to the zero-size-file, I think we are ok to lose some small amount of information.

          Show
          Amar Kamat added a comment - 1) This approach/patch works fine with history on local fs. With history on HDFS, the history file becomes visible but not available (i.e file-size = 0). The file becomes available only on close(). Sync() documentation indicates that the file-data availability is not guaranteed. The recently uploaded patch makes the block size of the history-file configurable. The parameter to change it is mapred.jobtracker.job.history.block.size (in bytes). For testing it, I started a small job on a single node cluster with history-block-size as 1k . It seems to work fine and the history file is now visible and available. With respect to the zero-size-file , I think we are ok to lose some small amount of information.
          Hide
          dhruba borthakur added a comment -

          Regarding the file being zero size, you can work around like this (on trunk):

          1. open the file with the FileSysterm.append() API. in a loop unteil it stops geenrating AlreadyBeingCreatedException.
          2. Then close the file and reopen using FileSystem.open(). Yoiu should see the new contents of the file that was present when you did the sync().

          Opening a file for "append" tells the namenode to recover a lease, but only if the soft-limit time has expired. The soft-limit timer is 1 minute. The namenode then starts lease recovery and finds out the size of the last block of the file. Nowm if the client re-opens the file, it gets a new block list with updated block size.

          Show
          dhruba borthakur added a comment - Regarding the file being zero size, you can work around like this (on trunk): 1. open the file with the FileSysterm.append() API. in a loop unteil it stops geenrating AlreadyBeingCreatedException. 2. Then close the file and reopen using FileSystem.open(). Yoiu should see the new contents of the file that was present when you did the sync(). Opening a file for "append" tells the namenode to recover a lease, but only if the soft-limit time has expired. The soft-limit timer is 1 minute. The namenode then starts lease recovery and finds out the size of the last block of the file. Nowm if the client re-opens the file, it gets a new block list with updated block size.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that implements JT restart using JobHistory.

          Changes :
          Currently the job history filename is of the following format history-timestamp_jt-hostname_jobid_username_jobname. It was introduced in HADOOP-239 and the timestamp was added in the beginning since the job names were not unique. It makes it difficult to guess the job history filename with history-timestamp. So history-timestamp is removed as currently job-id is unique across restarts.
          So for now we define
          master-file = jt-hostname_jobid_username_jobname.
          tmp-file = master-file.tmp

          Working :
          0) Upon restart the JT goes in safe mode. In safe mode all the trackers are asked to resend/replay their heartbeat.

          1) For a new job, the history file is the master-file. For a restarted job, the history is written to the tmp file.

          2) Following checks are made for a recovered job
          2.1) If the master file exists then delete the tmp file
          2.2) If the master file is missing then make the tmp file as master

          3) Upon restart the master-file is read and default-history-parser is used to parse and recover history records. These records are used to create taskStatus which is replayed in order. Before replaying the JT waits for the jobs to be inited.

          4) Once the replay is over, delete the master-file to indicate that the tmp file is more recent. Note that on next restart the tmp file will be used for recovery.

          5) Once all the jobs are recovered, turn off the safe mode. JT will now process heartbeats (called as successful re-connect). Also the registration window timer starts. JT waits for tracker-expiry-interval time after last-tracker-re-connect before closing the window. Once the window closes, JT is considered as recovered. This plays an important role in detecting the trackers that went down while the JT was down. Upon recovery, JT re-executes all the tasks that were on the lost trackers.

          6) Since the history can have some data missing, there can be a case where the map-completion-event-list at the JT is smaller than the one at the tracker. Hence there is a rollback required upon restart. Once the JT is out of safe mode, it passes this information (map-events-list-size) to the tracker on the successful reconnect.

          7) The tasktracker rollbacks few events and asks the child tasks to reset their index to 0. Child tasks fetches all the events back and filters out necessary events for further processing. This is similar to the one discussed in approach #1.

          8) Errors in history can cause the parser to fail. We have HADOOP-2403 to address this. For now this patch encodes errors. This will replaced with the fix in HADOOP-2403.

          9) Currently counters are stringified and written to history. It is not possible to recover the counter back from the string and hence this patch encodes the counter-names so that they can be easily recovered. Note that there is no encoding in the user space. Only the frameworks history file has codes.

          10) Once the job finishes the tmp file is renamed to master-file. Similarly the history files in the user directory also follow the same renaming cycle.

          11) Job priority is logged on every change and hence its recovered.

          Issues :
          1) This approach/patch works fine with history on local fs. With history on HDFS, the history file becomes visible but not available (i.e file-size = 0). The file becomes available only on close(). Sync() documentation indicates that the file-data availability is not guaranteed.

          2) Detecting job runtime is still an issue.

          We are working on it.

          Todo :
          1) Refactor common code.
          2) Remove extra logs
          3) For ease of testing JT killing facility is added to web-ui. There is some extra code to support this. Clear it out.
          4) To test the usage of sync(), there are periodic syncs done to the history files. This is just for testing.
          5) Optimize encoding/decoding.
          6) Group together all the recovery code under something like JobTrackerRecoveryManager.


          Note that the logs/debugging-code/testing-code is still a part of this patch as I am testing it.

          Show
          Amar Kamat added a comment - Attaching a patch that implements JT restart using JobHistory. Changes : Currently the job history filename is of the following format history-timestamp_jt-hostname_jobid_username_jobname. It was introduced in HADOOP-239 and the timestamp was added in the beginning since the job names were not unique. It makes it difficult to guess the job history filename with history-timestamp. So history-timestamp is removed as currently job-id is unique across restarts. So for now we define master-file = jt-hostname_jobid_username_jobname. tmp-file = master-file.tmp Working : 0) Upon restart the JT goes in safe mode. In safe mode all the trackers are asked to resend/replay their heartbeat. 1) For a new job, the history file is the master-file . For a restarted job, the history is written to the tmp file. 2) Following checks are made for a recovered job 2.1) If the master file exists then delete the tmp file 2.2) If the master file is missing then make the tmp file as master 3) Upon restart the master-file is read and default-history-parser is used to parse and recover history records. These records are used to create taskStatus which is replayed in order. Before replaying the JT waits for the jobs to be inited. 4) Once the replay is over, delete the master-file to indicate that the tmp file is more recent. Note that on next restart the tmp file will be used for recovery. 5) Once all the jobs are recovered, turn off the safe mode. JT will now process heartbeats (called as successful re-connect). Also the registration window timer starts. JT waits for tracker-expiry-interval time after last-tracker-re-connect before closing the window. Once the window closes, JT is considered as recovered . This plays an important role in detecting the trackers that went down while the JT was down. Upon recovery , JT re-executes all the tasks that were on the lost trackers. 6) Since the history can have some data missing, there can be a case where the map-completion-event-list at the JT is smaller than the one at the tracker. Hence there is a rollback required upon restart. Once the JT is out of safe mode, it passes this information ( map-events-list-size ) to the tracker on the successful reconnect. 7) The tasktracker rollbacks few events and asks the child tasks to reset their index to 0. Child tasks fetches all the events back and filters out necessary events for further processing. This is similar to the one discussed in approach #1. 8) Errors in history can cause the parser to fail. We have HADOOP-2403 to address this. For now this patch encodes errors. This will replaced with the fix in HADOOP-2403 . 9) Currently counters are stringified and written to history. It is not possible to recover the counter back from the string and hence this patch encodes the counter-names so that they can be easily recovered. Note that there is no encoding in the user space. Only the frameworks history file has codes. 10) Once the job finishes the tmp file is renamed to master-file . Similarly the history files in the user directory also follow the same renaming cycle. 11) Job priority is logged on every change and hence its recovered. Issues : 1) This approach/patch works fine with history on local fs. With history on HDFS, the history file becomes visible but not available (i.e file-size = 0). The file becomes available only on close(). Sync() documentation indicates that the file-data availability is not guaranteed. 2) Detecting job runtime is still an issue. We are working on it. Todo : 1) Refactor common code. 2) Remove extra logs 3) For ease of testing JT killing facility is added to web-ui. There is some extra code to support this. Clear it out. 4) To test the usage of sync() , there are periodic syncs done to the history files. This is just for testing. 5) Optimize encoding/decoding. 6) Group together all the recovery code under something like JobTrackerRecoveryManager . Note that the logs/debugging-code/testing-code is still a part of this patch as I am testing it.
          Hide
          Hemanth Yamijala added a comment -

          Using job history seems a reasonable approach. Some concerns though:

          • We need to find out a good buffer size to use for writing to the history file. A small value could have an impact on performance due to faster flushes. A large value could result in a lot of task events not being flushed and hence unavailable for the JobTracker on restart. We are exploring what an ideal value for this is.
          • For a large job with typical job history outputs, we need to make sure the time to parse and reconstruct state is not too bad.
          • We still need something like the SYNC operation described above, because in the window where something is written to job history but not flushed, these events would be lost for the JT upon restart. So, there will need to be a way to tell the TTs to reset these events. However, this count is going to be much smaller than what can happen in the approach currently implemented.

          We're doing some tests related to the first two points and then can discuss the results.

          The completed task state in RAM is not introduced in this patch. I would recommend it be addressed in another JIRA, if it is an issue.

          Show
          Hemanth Yamijala added a comment - Using job history seems a reasonable approach. Some concerns though: We need to find out a good buffer size to use for writing to the history file. A small value could have an impact on performance due to faster flushes. A large value could result in a lot of task events not being flushed and hence unavailable for the JobTracker on restart. We are exploring what an ideal value for this is. For a large job with typical job history outputs, we need to make sure the time to parse and reconstruct state is not too bad. We still need something like the SYNC operation described above, because in the window where something is written to job history but not flushed, these events would be lost for the JT upon restart. So, there will need to be a way to tell the TTs to reset these events. However, this count is going to be much smaller than what can happen in the approach currently implemented. We're doing some tests related to the first two points and then can discuss the results. The completed task state in RAM is not introduced in this patch. I would recommend it be addressed in another JIRA, if it is an issue.
          Hide
          dhruba borthakur added a comment - - edited

          +1 to Owen's comment.

          A pressing need of our cluster is to not interrupt running jobs if the entire cluster has to be restarted. This means that job states have to be persisted in the form of a transaction log. This requirement is all the more beneficial to sites that have long-running job trackers (instead of HOD).

          However, isn't it better to be able to store state in HDFS? It is true that HDFS stores its transaction log in local files, but with the current focus on improving HDFS read/write latencies, HDFS itself is considering whether to store one copy of the transaction log in HDFS blocks (instead of NFS). In fact, if the JobTracker stores information in a org.apache.hadoop.fs.FileSystem, then a typical customer install could plug in various forms of storage to support the JobTracker transaction log.

          Show
          dhruba borthakur added a comment - - edited +1 to Owen's comment. A pressing need of our cluster is to not interrupt running jobs if the entire cluster has to be restarted. This means that job states have to be persisted in the form of a transaction log. This requirement is all the more beneficial to sites that have long-running job trackers (instead of HOD). However, isn't it better to be able to store state in HDFS? It is true that HDFS stores its transaction log in local files, but with the current focus on improving HDFS read/write latencies, HDFS itself is considering whether to store one copy of the transaction log in HDFS blocks (instead of NFS). In fact, if the JobTracker stores information in a org.apache.hadoop.fs.FileSystem, then a typical customer install could plug in various forms of storage to support the JobTracker transaction log.
          Hide
          Owen O'Malley added a comment -

          If we are counting on the TaskTracker's reports to rebuild the state, we should have a safe-mode equivalent where we wait for 2-3 minutes before launching new tasks, otherwise we will trash the cluster as each new TaskTracker reports back. Please also make sure that the TaskTracker does not reset and lose state if it gets an IOException when talking to the JobTracker.

          However, rather than have TaskTracker's store additional information about the final task status of each completed task in ram, I think we should reconsider the option of using the JobHistory as a transaction log for each job. For storage on local disk, we probably should support writing a second copy to NFS so that a different node could bring up the JobTracker.

          In any case, the extra completed task state should be on disk rather than ram. We also need to make sure that the JobHistory is complete and consistent even after the restoration.

          Show
          Owen O'Malley added a comment - If we are counting on the TaskTracker's reports to rebuild the state, we should have a safe-mode equivalent where we wait for 2-3 minutes before launching new tasks, otherwise we will trash the cluster as each new TaskTracker reports back. Please also make sure that the TaskTracker does not reset and lose state if it gets an IOException when talking to the JobTracker. However, rather than have TaskTracker's store additional information about the final task status of each completed task in ram, I think we should reconsider the option of using the JobHistory as a transaction log for each job. For storage on local disk, we probably should support writing a second copy to NFS so that a different node could bring up the JobTracker. In any case, the extra completed task state should be on disk rather than ram. We also need to make sure that the JobHistory is complete and consistent even after the restoration.
          Hide
          Amar Kamat added a comment -

          The only problem with this patch is that partial updates can cause the job to get stuck. I have opened HADOOP-3780 to address this.

          Show
          Amar Kamat added a comment - The only problem with this patch is that partial updates can cause the job to get stuck. I have opened HADOOP-3780 to address this.
          Hide
          Amar Kamat added a comment -

          Attaching a patch that has the following changes
          1) Devaraj's comments are incorporated. Comment # 9 seems complicated and should be dealt in a separate jira.

          2) Test case incorporated. It does the following

            2.1) Start DFS, MR
            2.2) Submit a job
            2.3) Kill/Close the JT 
            2.4) Restart the JT
            2.5) Check if the job got detected and was successful.
          

          3) What is the effect of JT getting killed on the DFS i.e what happens to the incomplete DFS operation started by the JT before getting killed. Also what happens after the restart when the JT tries to access the same set of files. This requires investigation but as of now I dont see any noticeable effects.

          4) Since the child task needs to reset it offset into the map-task-completion-events (TT's local copy), I have introduced a new class that encapsulates

          4.1) An array of map-task-completion-events (if any) as requested by the child task
          4.2) A boolean which decides whether the child should reset its offset. 
          

          Me, Hemanth and Devaraj had a discussion on this and we feel its better and cleaner to do it this way.

          5) Some bug fixes.


          Known issues (summary):
          1) Job level updates while the JT is running (like killJob(), priority updates etc) will be lost on restart.
          2) The job runtime cannot be determined
          3) Point #9 of Devaraj's comment

          Show
          Amar Kamat added a comment - Attaching a patch that has the following changes 1) Devaraj's comments are incorporated. Comment # 9 seems complicated and should be dealt in a separate jira. 2) Test case incorporated. It does the following 2.1) Start DFS, MR 2.2) Submit a job 2.3) Kill/Close the JT 2.4) Restart the JT 2.5) Check if the job got detected and was successful. 3) What is the effect of JT getting killed on the DFS i.e what happens to the incomplete DFS operation started by the JT before getting killed. Also what happens after the restart when the JT tries to access the same set of files. This requires investigation but as of now I dont see any noticeable effects. 4) Since the child task needs to reset it offset into the map-task-completion-events (TT's local copy), I have introduced a new class that encapsulates 4.1) An array of map-task-completion-events (if any) as requested by the child task 4.2) A boolean which decides whether the child should reset its offset. Me, Hemanth and Devaraj had a discussion on this and we feel its better and cleaner to do it this way. 5) Some bug fixes. Known issues (summary): 1) Job level updates while the JT is running (like killJob(), priority updates etc) will be lost on restart. 2) The job runtime cannot be determined 3) Point #9 of Devaraj's comment
          Hide
          Devaraj Das added a comment -

          This requires a testcase also

          Show
          Devaraj Das added a comment - This requires a testcase also
          Hide
          Devaraj Das added a comment -

          Some initial comments:
          1) Remove the unnecessary comments from JobTracker.java
          2) Rename the "restarted" field as "recovering"
          3) hasJobTrackerRestarted/Recovered API
          4) Remove the comment: "//TODO wait for all the incomplete(previously running) jobs to be ready" from offerService
          5) Put back the call to completedJobStatusStore.store in finalizeJob
          6) The method cleanupJob seems unnecessary. What is already done w.r.t cleanup will continue to work.
          7) The implementation of wasRecovered and hasRecovered should not make a back call to the JobTracker
          8) Synchronization for tasksInited in initTasks is redundant. Do a notify instead of notifyAll in the following line.
          9) In the interval between the JT death and restart the reducers might fail to fetch map outputs from some tasktrackers (due to faulty map nodes, etc.), but it has no one to send the notifications to. The reducers might end up killing themselves after a couple of retries.
          10) The construction of TaskTrackerStatus should be reverted to how it was done earlier (cloneAndResetRunningTaskStatuses called inline with the constructor invocation)
          11) In TaskTracker.transmitHeartBeat you should call cloneAndResetRunningJobTaskStatuses rather than cloneAndResetRunningTaskStatuses
          12) Pls move the SYNC action handling to the offerService method
          13) shouldResetEventsIndex could be cleared upon the first access as opposed to doing it in the heartbeat processing
          14) Instead of the additional RPC in Umbilical, you can add an arg in the getMapCompletionEvents to know whether to reset or not
          15) Factor out common code from cloneAndResetRunningJobTaskStatuses/cloneAndResetRunningTaskStatuses

          Show
          Devaraj Das added a comment - Some initial comments: 1) Remove the unnecessary comments from JobTracker.java 2) Rename the "restarted" field as "recovering" 3) hasJobTrackerRestarted/Recovered API 4) Remove the comment: "//TODO wait for all the incomplete(previously running) jobs to be ready" from offerService 5) Put back the call to completedJobStatusStore.store in finalizeJob 6) The method cleanupJob seems unnecessary. What is already done w.r.t cleanup will continue to work. 7) The implementation of wasRecovered and hasRecovered should not make a back call to the JobTracker 8) Synchronization for tasksInited in initTasks is redundant. Do a notify instead of notifyAll in the following line. 9) In the interval between the JT death and restart the reducers might fail to fetch map outputs from some tasktrackers (due to faulty map nodes, etc.), but it has no one to send the notifications to. The reducers might end up killing themselves after a couple of retries. 10) The construction of TaskTrackerStatus should be reverted to how it was done earlier (cloneAndResetRunningTaskStatuses called inline with the constructor invocation) 11) In TaskTracker.transmitHeartBeat you should call cloneAndResetRunningJobTaskStatuses rather than cloneAndResetRunningTaskStatuses 12) Pls move the SYNC action handling to the offerService method 13) shouldResetEventsIndex could be cleared upon the first access as opposed to doing it in the heartbeat processing 14) Instead of the additional RPC in Umbilical, you can add an arg in the getMapCompletionEvents to know whether to reset or not 15) Factor out common code from cloneAndResetRunningJobTaskStatuses/cloneAndResetRunningTaskStatuses
          Hide
          Amar Kamat added a comment -

          Had an offline discussion with Devaraj. Attaching a patch that includes his comments.
          1) Remove some security checks since HADOOP-3578 should take care of that.
          2) Remove the queued concept since HADOOP-3444 should take care of that.

          As mentioned earlier, the job runtime will be useless once the JT restarts as the job could be running for more time than the time displayed. There are some ways in which this problem can be solved but looks like it might require some more work. We think that the problem of job-runtime, job-priority and job-kill/task-kill can be handled in a separate issue. With this patch we will show the new job runtime as 0 indicating that the job runtime is unknown.

          Show
          Amar Kamat added a comment - Had an offline discussion with Devaraj. Attaching a patch that includes his comments. 1) Remove some security checks since HADOOP-3578 should take care of that. 2) Remove the queued concept since HADOOP-3444 should take care of that. As mentioned earlier, the job runtime will be useless once the JT restarts as the job could be running for more time than the time displayed. There are some ways in which this problem can be solved but looks like it might require some more work. We think that the problem of job-runtime, job-priority and job-kill/task-kill can be handled in a separate issue. With this patch we will show the new job runtime as 0 indicating that the job runtime is unknown.
          Hide
          Amar Kamat added a comment -

          Known issues :

          4) Some job level parameters can be changed dynamically after the job is submitted e.g job priority etc. Since it is not persisted, there is no way for the JT to load it upon restart. Also consider the following case where the user asks the JT to kill a job and the JT crashes before deleting the persisted structures. Upon restart the JT will continue to run the supposed-to-be-dead job. Hence all such external modifications (events) to the submitted job should be logged/persisted.

          Show
          Amar Kamat added a comment - Known issues : 4) Some job level parameters can be changed dynamically after the job is submitted e.g job priority etc. Since it is not persisted, there is no way for the JT to load it upon restart. Also consider the following case where the user asks the JT to kill a job and the JT crashes before deleting the persisted structures. Upon restart the JT will continue to run the supposed-to-be-dead job. Hence all such external modifications (events) to the submitted job should be logged/persisted.
          Hide
          Amar Kamat added a comment -

          Attaching a patch for review. Following are the changes
          1) The bug discussed here is taken care of. The reducer on resetting wont skip any task completion events. Duplicate events for a tip but from different attempts will also be added. The reduce task seems to take care of it.

          2) The job directories for restored jobs are checked for completeness before adding to the queue. A job directory is considered complete when it has job.xml, job.jar and job.split.

          3) There was one corner case where the jobtracker dies with a job as completed (job dir missing) before communicating to the tasktracker i.e task trackers still have the task statuses for the completed job. The way this is handled is that the jobtracker on receiving an update request for a missing job will ask all the TTs to clear this job's details.

          4) Restart mode turned off : The restart mode is turned off after some time. This is useful as we dont want the JT to entertain latecomers. The JT comes out of restart mode using the following equation
          current-time > last-time-when-a-tt-synced + lost-task-tracker-interval
          This somehow will make sure that we dont close the registration too early.

          5) The web ui now shows the restart information. It shows whether the JT is still recovering and the time it has taken to recover.


          Issues taken care of :
          1) Consider the following case :
          Reducers belonging to the old JT are still shuffling a map m while the jt gets restarted. m gets re-executed on a different host, say m'. Consider m' checking in before m. Since m checks in later, it gets killed. The reducer which fetches from m now start failing. Here the fetch failure notification will have no effect on the jt and hence there are no false notifications.
          2) Backlisting of a tracker per job is based on the task failures on that host. Failed statuses are not cleared from the running jobs on the tracker and hence will be replayed as per the design.
          3) If a TIP has failed earlier, it will fail again since all the failed task statuses will be replayed.


          Known issues :
          1) I have seen jobs getting stuck. I tried hard to reproduce it but I couldn't. Will keep testing the patch.
          2) The job runtime will change as the runtime is calculated based on the time the job is created at the jobtracker. With restarted jobtracker the old start time will be lost.
          3) The task attempt id is now changed. It requires the jobtracker's start time and hence it might affect the task output filters. Also application outside the framework would not be able to guess the attempt id which they anyways should not be able to.

          Show
          Amar Kamat added a comment - Attaching a patch for review. Following are the changes 1) The bug discussed here is taken care of. The reducer on resetting wont skip any task completion events. Duplicate events for a tip but from different attempts will also be added. The reduce task seems to take care of it. 2) The job directories for restored jobs are checked for completeness before adding to the queue. A job directory is considered complete when it has job.xml, job.jar and job.split . 3) There was one corner case where the jobtracker dies with a job as completed (job dir missing) before communicating to the tasktracker i.e task trackers still have the task statuses for the completed job. The way this is handled is that the jobtracker on receiving an update request for a missing job will ask all the TTs to clear this job's details. 4) Restart mode turned off : The restart mode is turned off after some time. This is useful as we dont want the JT to entertain latecomers. The JT comes out of restart mode using the following equation current-time > last-time-when-a-tt-synced + lost-task-tracker-interval This somehow will make sure that we dont close the registration too early. 5) The web ui now shows the restart information. It shows whether the JT is still recovering and the time it has taken to recover. Issues taken care of : 1) Consider the following case : Reducers belonging to the old JT are still shuffling a map m while the jt gets restarted. m gets re-executed on a different host, say m'. Consider m' checking in before m. Since m checks in later, it gets killed. The reducer which fetches from m now start failing. Here the fetch failure notification will have no effect on the jt and hence there are no false notifications. 2) Backlisting of a tracker per job is based on the task failures on that host. Failed statuses are not cleared from the running jobs on the tracker and hence will be replayed as per the design. 3) If a TIP has failed earlier, it will fail again since all the failed task statuses will be replayed. Known issues : 1) I have seen jobs getting stuck. I tried hard to reproduce it but I couldn't. Will keep testing the patch. 2) The job runtime will change as the runtime is calculated based on the time the job is created at the jobtracker. With restarted jobtracker the old start time will be lost. 3) The task attempt id is now changed. It requires the jobtracker's start time and hence it might affect the task output filters. Also application outside the framework would not be able to guess the attempt id which they anyways should not be able to.
          Hide
          Amar Kamat added a comment -

          Attaching a review-patch implementing the above discussed design. Testing and optimizations are in progress. There is one known issue with the design and hence the patch is incomplete.

          Consider the following case [Notations : JT@ti means JT (re)started at time t1, Ti@TTj means Task Ti completed on Tracker TTj]
          1) TT1 asks for a task and the JT@t1 schedules map M1 on TT1
          2) M1 finishes on TT1 and JT is updated
          3) TT2 asks for a task and the JT@t1 schedules reduce R1 on TT2
          4) R1 asks for map-completion-event and gets M1@TT1
          5) R1 adds M1@TT1 to the fetch list
          6) JT@t1 restarts and comes up as JT@t2.
          7) TT3 asks for a task and the JT@t2 schedules reduce M1 on TT3
          8) M1 finishes on TT3 and M1@TT3 is added as map-completion-event
          9) TT2 SYNCs up with JT@t2 and gets the map completion event
          10) R1 get M1@TT3 and ignores it since it already had M1@TT1.
          11) TT1 goes down.

          The design prefers the old map output location and silently ignores the new task completion event. In such a case R1 has missed the new event and will keep re-trying at the old location. Even though R1 will report fetch failures, it will be a no-op since JT@t2 doesnt know about M1@TT1.

          JT@t2 thinks that M1 is complete while R1@TT2 will wait for map-completion-event of M1 and hence the job will be stuck. Note that this also true if TT1 joins back after M1@TT3 completes where JT@t2 will delete the output of M1@TT1. Following is the change that might help to overcome this problem.

          Let the reducers fetch data for the same map from multiple sources (i.e R1 will keep fetching data from M1@TT1 and also from M1@TT3). The one that finishes first will invalidate the other. One optimization that can be done is that the reducer can continue fetching from the old output (since the timestamp is always there) and switch to the new event once there is a failure from the old event (i.e keep M1@TT3 as a backup and keep fetching from M1@TT1 until that fails after which switch to M1@TT3).


          Thoughts?

          Show
          Amar Kamat added a comment - Attaching a review-patch implementing the above discussed design. Testing and optimizations are in progress. There is one known issue with the design and hence the patch is incomplete . Consider the following case [Notations : JT@ti means JT (re)started at time t1, Ti@TTj means Task Ti completed on Tracker TTj] 1) TT1 asks for a task and the JT@t1 schedules map M1 on TT1 2) M1 finishes on TT1 and JT is updated 3) TT2 asks for a task and the JT@t1 schedules reduce R1 on TT2 4) R1 asks for map-completion-event and gets M1@TT1 5) R1 adds M1@TT1 to the fetch list 6) JT@t1 restarts and comes up as JT@t2. 7) TT3 asks for a task and the JT@t2 schedules reduce M1 on TT3 8) M1 finishes on TT3 and M1@TT3 is added as map-completion-event 9) TT2 SYNCs up with JT@t2 and gets the map completion event 10) R1 get M1@TT3 and ignores it since it already had M1@TT1. 11) TT1 goes down. The design prefers the old map output location and silently ignores the new task completion event. In such a case R1 has missed the new event and will keep re-trying at the old location. Even though R1 will report fetch failures, it will be a no-op since JT@t2 doesnt know about M1@TT1. JT@t2 thinks that M1 is complete while R1@TT2 will wait for map-completion-event of M1 and hence the job will be stuck. Note that this also true if TT1 joins back after M1@TT3 completes where JT@t2 will delete the output of M1@TT1. Following is the change that might help to overcome this problem. Let the reducers fetch data for the same map from multiple sources (i.e R1 will keep fetching data from M1@TT1 and also from M1@TT3). The one that finishes first will invalidate the other. One optimization that can be done is that the reducer can continue fetching from the old output (since the timestamp is always there) and switch to the new event once there is a failure from the old event (i.e keep M1@TT3 as a backup and keep fetching from M1@TT1 until that fails after which switch to M1@TT3). Thoughts?
          Hide
          Amar Kamat added a comment -

          There is a deficiency with the SYNC algo mentioned above. The task completion events in the JT will now be reordered. The copy of task completion events that the TT has is now garbled. Consider the following cases with the SYNC algo in place

          1) A TT has reducers in SHUFFLE phase : Here the TT will have a partial copy of the previous version of the task completion events and hence will require the latest copy.

          2) A TT has all the reducers in REDUCE phase before the JT restart and new reduce tasks assigned to it after the restart : Here the TT will have a complete copy of the previous version of the task completion events and hence it looks like it might not require the latest copy of completion events. But consider a case where some maps were lost and hence their output is not available. Since the ordering of the task completion events is different and some events in the JT might belong to re-executed maps, there is no good way to inform the TTs that a particular map was lost and it should use the new task completion event. Currently (with trunk) this is not a issue because there will be just one copy of the task completion events in the lifetime of the job. Hence the TT will always have the correct ordering of the completion events. In case of re-executions the new event will always be at the end.


          One simple solution would be to force the re-build of completion events copy at the TT on SYNC action.

          Show
          Amar Kamat added a comment - There is a deficiency with the SYNC algo mentioned above. The task completion events in the JT will now be reordered. The copy of task completion events that the TT has is now garbled. Consider the following cases with the SYNC algo in place 1) A TT has reducers in SHUFFLE phase : Here the TT will have a partial copy of the previous version of the task completion events and hence will require the latest copy. 2) A TT has all the reducers in REDUCE phase before the JT restart and new reduce tasks assigned to it after the restart : Here the TT will have a complete copy of the previous version of the task completion events and hence it looks like it might not require the latest copy of completion events . But consider a case where some maps were lost and hence their output is not available. Since the ordering of the task completion events is different and some events in the JT might belong to re-executed maps, there is no good way to inform the TTs that a particular map was lost and it should use the new task completion event . Currently (with trunk) this is not a issue because there will be just one copy of the task completion events in the lifetime of the job. Hence the TT will always have the correct ordering of the completion events . In case of re-executions the new event will always be at the end. One simple solution would be to force the re-build of completion events copy at the TT on SYNC action.
          Hide
          Hemanth Yamijala added a comment -

          To clarify a bit: we considered two different approaches for providing persistence in the JobTracker. The first one was to persist completed task information to a log file, similar to the method in the NameNode edits log. The second one is what Amar has described above, where TaskTrackers send 'Task Reports' to the restarted JT on demand. We feel the second approach is preferable, as it scales better.

          However, the second approach does have a couple of issues:

          • Upon restart of the JT, before a TT has resent its task reports to the JT, other TTs could be given the same tasks to execute. This would cause re-executions, and the duplicates must be discarded. If this is a serious problem, we could make the JT wait for a while (described by Amar above). We feel it is OK to re-execute, as it appears that TTs will sync fast enough.
          • Upon restart, the order of Task Completion events is lost. This needs to be rebuilt from the task reports. The task reports could come in a different order from how the task completion events originally came. However, reduce tasks which have already fetched some events depend on this order. One way to handle this is to make the TT re-fetch all Task Completion events from the beginning upon a JT restart. Then it can check which Map outputs it has already shuffled, and not get them again. As the time for the shuffle is the more expensive operation compared to re-fetching the events, we feel this overhead is manageable.

          Please do share your thoughts on whether these points make sense.

          Show
          Hemanth Yamijala added a comment - To clarify a bit: we considered two different approaches for providing persistence in the JobTracker. The first one was to persist completed task information to a log file, similar to the method in the NameNode edits log. The second one is what Amar has described above, where TaskTrackers send 'Task Reports' to the restarted JT on demand. We feel the second approach is preferable, as it scales better. However, the second approach does have a couple of issues: Upon restart of the JT, before a TT has resent its task reports to the JT, other TTs could be given the same tasks to execute. This would cause re-executions, and the duplicates must be discarded. If this is a serious problem, we could make the JT wait for a while (described by Amar above). We feel it is OK to re-execute, as it appears that TTs will sync fast enough. Upon restart, the order of Task Completion events is lost. This needs to be rebuilt from the task reports. The task reports could come in a different order from how the task completion events originally came. However, reduce tasks which have already fetched some events depend on this order. One way to handle this is to make the TT re-fetch all Task Completion events from the beginning upon a JT restart. Then it can check which Map outputs it has already shuffled, and not get them again. As the time for the shuffle is the more expensive operation compared to re-fetching the events, we feel this overhead is manageable. Please do share your thoughts on whether these points make sense.
          Hide
          Amar Kamat added a comment -

          III) The logic for detecting lost TT should not rely on missing data structures but use some kind of book keeping. We can now use 'missing data structures logic' for detecting when the TT should SYNC. Note that detecting a TT as lost (missing TT details) if different from declaring it as lost (10min gap in heartbeat).

          These are two different cases where
          1) Lost TT will have initial contact as false while the previous heartbeat will be present
          2) Restarted JT will have initial contact as false while the previous heartbeat will also be missing.
          Hence there is no need to fix the lost TT logic.

          Show
          Amar Kamat added a comment - III) The logic for detecting lost TT should not rely on missing data structures but use some kind of book keeping. We can now use 'missing data structures logic' for detecting when the TT should SYNC. Note that detecting a TT as lost (missing TT details) if different from declaring it as lost (10min gap in heartbeat). These are two different cases where 1) Lost TT will have initial contact as false while the previous heartbeat will be present 2) Restarted JT will have initial contact as false while the previous heartbeat will also be missing. Hence there is no need to fix the lost TT logic.
          Hide
          Amar Kamat added a comment -

          Here is a proposal
          1) A job can be one of the following states - queued, running, completed.
          Queued and running jobs needs to survive JT restarts and hence will be represented as queued and running folders under backup directory. The reason we need to do this is basically its cleaner to distinguish queued and running jobs. We need to see if we want this feature to be configurable (in terms of enabling backup mode and specifying backup location) or always check the queued/running folders under mapred.system.dir to auto-detect if the JT got restarted. For now this backup directory will be the mapred.system.dir.

          2) On job submission, the job first gets queued up (goes in the job structures and also gets persisted on the FS as queued). For now we will use a common structure to hold both the queued and running jobs. In future (say after the new scheduler comes in) we might need to have separate queues/lists for the same.

          3) A job jumps from queued state to running state only when the JobInitThread selects it for initialization/running [ consider initialized/expanded job is a running job]. As of now all the jobs will transit from queued to running immediately. But in future the decision of which job to initialize will be pretty complex/involved.

          4) Running jobs need following information for restarts :

             4.1) TIP info : What all TIPs are there in the job and what is their locality info. 
                     This could be rebuilt from job.xml which is in *mapred.system.dir*. Hence on JT restart, we should be careful 
                     while clearing the mapred system directory. Currently the JobTracker will switch to RESTART mode if there 
                     is some stale data in the queued/running backup folders. If we decide to keep the backup feature configurable then the
                     JT will also check if its enabled.
             4.2) Completed task statuses : (Sameer's suggestion) TaskStatus info can be obtained from the TTs. 
                     More details are stated below (see SYNC ALGO)
          

          5) SYNC ALGO : Algo for sync up the JT and TTs :

             5.1)  On Demand Sync : 
               Have SYNC operation for the TaskTrackers. Following are the ways to achieve on-demand sync
                 5.1.1) Greedy :
                     a) TT sends an old heartbeat to a new restarted JT. The JT on restart check the backup folders and detects
                          if its in restart mode or not.
                     b) Once the JT in restart mode receives a heartbeat which is not the *first* contact, it considers that the 
                         TT is from its previous incarnation and sends a SYNC command.
                     c) TT receives a SYNC operation, adds the completed statuses of running jobs to the current heartbeat 
                         and SYNCs up with the JT making this contact as *initial*.
                     d) JT receives the updated heartbeat as a new contact, updates the internal structures.
                     e) JT replies with new tasks if asked.
               5.1.2) Waited :
                     Similar to 6.1.1 but doesn't give out tasks immediately. Waits for some time and then serves out the tasks. 
                    The question to answer is how much to wait? How to detect that all the TTs have SYNCed?
               For 5.1.1, the rate at which the TTs SYNCs with the JT will be faster and hence the 
               overhead should not be much. Also we could avoid scheduling tasks on SYNC operation. Thoughts?
             5.2) Other options?
          

          6) Problems/Issues :
          I) Once the JT restarts, the JIP structures for previously completed jobs will be missing. Hence the web-ui will now change in terms of completed jobs. Earlier the JT showed the completed jobs which on restart it will not be able to. One work around is to use completed-job-store to store completed jobs and serve completed jobs from job history on restarts.

          II) Consider the following scenario :

             1. JT schedules task1, task2 to TT1
             2. JT schedules task3, task4 to TT2
             3. TT1 informs JT about task1/task2 completion
             4. JT restarts 
             5. JT receives SYNC from TT1.
             6. JT syncs up and schedules task3 to TT1
             7. TT1 starts task3 and this might interfere with the side effect files of task3 on TT2.
             8. In the worst case task3 could be running on TT1 and JT schedules task3 on TT1 in which case the local folders will also 
                 clash.
          

          One way to overcome this is to include identifiers to distinguish between the task attempts across JT restarts. We can use JT's timestamp as an identifier.

          III) The logic for detecting lost TT should not rely on missing data structures but use some kind of book keeping. We can now use 'missing data structures logic' for detecting when the TT should SYNC. Note that detecting a TT as lost (missing TT details) if different from declaring it as lost (10min gap in heartbeat).


          So for now we should
          1) Have backup as non configurable and use mapred.system.dir as the backup folder with queued/running folders under it
          2) Have queuing logic just for persistence
          3) Use job-history for serving completed jobs upon restarts
          4) Change lost TT detection logic
          5) Use On-Demand:Greedy sync logic
          6) Task attempts carry encoded JT timestamp with them


          Thoughts?

          Show
          Amar Kamat added a comment - Here is a proposal 1) A job can be one of the following states - queued, running, completed . Queued and running jobs needs to survive JT restarts and hence will be represented as queued and running folders under backup directory. The reason we need to do this is basically its cleaner to distinguish queued and running jobs. We need to see if we want this feature to be configurable (in terms of enabling backup mode and specifying backup location) or always check the queued/running folders under mapred.system.dir to auto-detect if the JT got restarted. For now this backup directory will be the mapred.system.dir . 2) On job submission, the job first gets queued up (goes in the job structures and also gets persisted on the FS as queued). For now we will use a common structure to hold both the queued and running jobs. In future (say after the new scheduler comes in) we might need to have separate queues/lists for the same. 3) A job jumps from queued state to running state only when the JobInitThread selects it for initialization/running [ consider initialized/expanded job is a running job]. As of now all the jobs will transit from queued to running immediately. But in future the decision of which job to initialize will be pretty complex/involved. 4) Running jobs need following information for restarts : 4.1) TIP info : What all TIPs are there in the job and what is their locality info. This could be rebuilt from job.xml which is in *mapred.system.dir*. Hence on JT restart, we should be careful while clearing the mapred system directory. Currently the JobTracker will switch to RESTART mode if there is some stale data in the queued/running backup folders. If we decide to keep the backup feature configurable then the JT will also check if its enabled. 4.2) Completed task statuses : (Sameer's suggestion) TaskStatus info can be obtained from the TTs. More details are stated below (see SYNC ALGO) 5) SYNC ALGO : Algo for sync up the JT and TTs : 5.1) On Demand Sync : Have SYNC operation for the TaskTrackers. Following are the ways to achieve on-demand sync 5.1.1) Greedy : a) TT sends an old heartbeat to a new restarted JT. The JT on restart check the backup folders and detects if its in restart mode or not. b) Once the JT in restart mode receives a heartbeat which is not the *first* contact, it considers that the TT is from its previous incarnation and sends a SYNC command. c) TT receives a SYNC operation, adds the completed statuses of running jobs to the current heartbeat and SYNCs up with the JT making this contact as *initial*. d) JT receives the updated heartbeat as a new contact, updates the internal structures. e) JT replies with new tasks if asked. 5.1.2) Waited : Similar to 6.1.1 but doesn't give out tasks immediately. Waits for some time and then serves out the tasks. The question to answer is how much to wait? How to detect that all the TTs have SYNCed? For 5.1.1, the rate at which the TTs SYNCs with the JT will be faster and hence the overhead should not be much. Also we could avoid scheduling tasks on SYNC operation. Thoughts? 5.2) Other options? 6) Problems/Issues : I) Once the JT restarts, the JIP structures for previously completed jobs will be missing. Hence the web-ui will now change in terms of completed jobs. Earlier the JT showed the completed jobs which on restart it will not be able to. One work around is to use completed-job-store to store completed jobs and serve completed jobs from job history on restarts. II) Consider the following scenario : 1. JT schedules task1, task2 to TT1 2. JT schedules task3, task4 to TT2 3. TT1 informs JT about task1/task2 completion 4. JT restarts 5. JT receives SYNC from TT1. 6. JT syncs up and schedules task3 to TT1 7. TT1 starts task3 and this might interfere with the side effect files of task3 on TT2. 8. In the worst case task3 could be running on TT1 and JT schedules task3 on TT1 in which case the local folders will also clash. One way to overcome this is to include identifiers to distinguish between the task attempts across JT restarts. We can use JT's timestamp as an identifier. III) The logic for detecting lost TT should not rely on missing data structures but use some kind of book keeping. We can now use 'missing data structures logic' for detecting when the TT should SYNC. Note that detecting a TT as lost (missing TT details) if different from declaring it as lost (10min gap in heartbeat). So for now we should 1) Have backup as non configurable and use mapred.system.dir as the backup folder with queued/running folders under it 2) Have queuing logic just for persistence 3) Use job-history for serving completed jobs upon restarts 4) Change lost TT detection logic 5) Use On-Demand:Greedy sync logic 6) Task attempts carry encoded JT timestamp with them Thoughts?
          Hide
          Devaraj Das added a comment -

          I think we need to work out what is the intended behavior to re-start a job tracker, and what needs to be persisted to support that behavior.

          Agree. This jira is more about being able to save all Job states and being able to create images that can later be used to recreate the job data structures. Actually restarting a job would be the next step (a separate jira probably). This issue is about a framework to enable job restarts.

          But at this time, I think it does not sound right to tweak 1876 for some functionalities we don't have a clear consensus.

          Agree and the proposal here doesn't intend to tweak 1876. It could leverage some APIs from there (just a thought), but at this point it seems like having edits log kind of a mechanism should work well..

          Show
          Devaraj Das added a comment - I think we need to work out what is the intended behavior to re-start a job tracker, and what needs to be persisted to support that behavior. Agree. This jira is more about being able to save all Job states and being able to create images that can later be used to recreate the job data structures. Actually restarting a job would be the next step (a separate jira probably). This issue is about a framework to enable job restarts. But at this time, I think it does not sound right to tweak 1876 for some functionalities we don't have a clear consensus. Agree and the proposal here doesn't intend to tweak 1876. It could leverage some APIs from there (just a thought), but at this point it seems like having edits log kind of a mechanism should work well..
          Hide
          Runping Qi added a comment -

          bq There is one problem with this approach of periodic merges. We lose the information for completed tasks that completes in the interval between the last merge and the time when the JobTracker crashes.

          Are you suggest that you intend to use persisted job status as the basis for the job tracker to started from where it crashed?
          I don't think that is the jira 1876 was intended and its current implementation can support that.

          I think we need to work out what is the intended behavior to re-start a job tracker, and what needs to be persisted to support that behavior.
          It wil be nice if the work for 1876 can be re-used.
          But at this time, I think it does not sound right to tweak 1876 for some functionalities we don't have a clear consensus.

          Show
          Runping Qi added a comment - bq There is one problem with this approach of periodic merges. We lose the information for completed tasks that completes in the interval between the last merge and the time when the JobTracker crashes. Are you suggest that you intend to use persisted job status as the basis for the job tracker to started from where it crashed? I don't think that is the jira 1876 was intended and its current implementation can support that. I think we need to work out what is the intended behavior to re-start a job tracker, and what needs to be persisted to support that behavior. It wil be nice if the work for 1876 can be re-used. But at this time, I think it does not sound right to tweak 1876 for some functionalities we don't have a clear consensus.
          Hide
          Devaraj Das added a comment -

          Doug that makes sense. However, the concern there is this that for each task update, we need to do a sync to dfs (sync because we want to be sure the info is there in the dfs). That might be expensive when TIPs complete at a high rate, no? Also, even in this case, we most likely still need to do the edits log kind of merge since the file will just contain a bunch of updates.
          There is one problem with this approach of periodic merges. We lose the information for completed tasks that completes in the interval between the last merge and the time when the JobTracker crashes. So in the next restart the JobTracker would try to reexecute these tasks but their outputs would already be present on the dfs and conflicts will happen when save output is invoked for these new attempts (this problem becomes a bit more complicated with the side files in the picture). We probably should have a strategy (job configurable?) to handle such cases on a per path basis - OVERWRITE (if an output path already exists) or ACCEPT (accept what is already there). Assuming idempotent tasks we probably should have the default as ACCEPT...

          Show
          Devaraj Das added a comment - Doug that makes sense. However, the concern there is this that for each task update, we need to do a sync to dfs (sync because we want to be sure the info is there in the dfs). That might be expensive when TIPs complete at a high rate, no? Also, even in this case, we most likely still need to do the edits log kind of merge since the file will just contain a bunch of updates. There is one problem with this approach of periodic merges. We lose the information for completed tasks that completes in the interval between the last merge and the time when the JobTracker crashes. So in the next restart the JobTracker would try to reexecute these tasks but their outputs would already be present on the dfs and conflicts will happen when save output is invoked for these new attempts (this problem becomes a bit more complicated with the side files in the picture). We probably should have a strategy (job configurable?) to handle such cases on a per path basis - OVERWRITE (if an output path already exists) or ACCEPT (accept what is already there). Assuming idempotent tasks we probably should have the default as ACCEPT...
          Hide
          Doug Cutting added a comment -

          > we don't have appends [ ... ]

          We will though. So, if you don't use HDFS, it might still be worth trying to use the FileSystem API to talk to the local FS, so that a subsequent move to HDFS would be trivial, a matter of configuration. Perhaps we should start supporting append in the FileSystem API and in the local implementation now, to permit this?

          Show
          Doug Cutting added a comment - > we don't have appends [ ... ] We will though. So, if you don't use HDFS, it might still be worth trying to use the FileSystem API to talk to the local FS, so that a subsequent move to HDFS would be trivial, a matter of configuration. Perhaps we should start supporting append in the FileSystem API and in the local implementation now, to permit this?
          Hide
          Hemanth Yamijala added a comment -

          Devaraj and I had a brief discussion today and came up with the following points:

          One requirement that we felt should be addressed is: if a job is partially complete when the JobTracker restarts, the user would expect to get all information about the completed tasks of this job transparently.

          To address this requirement, we would need to persist information about every completed task. To solve this, we can probably take an approach similar to what is followed by the NameNode edit logs mechanism. We could have a master image file that stores a snapshot of the current state of a running job. When tasks of the job change state, we could store the update immediately to a log. Periodically, we could merge these updates to the master image file.

          An alternative approach would be to update the image file periodically, batching updates. However in the interests of scale, and considering there may be frequent updates to tasks, we felt the earlier approach is a better one.

          Another point we considered was whether we could store this information to DFS, similar to HADOOP-1876. However, given that we don't have appends, and also that the a JobTracker restart may make us lose information written to this file, we feel that may not work very well.

          Comments ?

          Show
          Hemanth Yamijala added a comment - Devaraj and I had a brief discussion today and came up with the following points: One requirement that we felt should be addressed is: if a job is partially complete when the JobTracker restarts, the user would expect to get all information about the completed tasks of this job transparently. To address this requirement, we would need to persist information about every completed task. To solve this, we can probably take an approach similar to what is followed by the NameNode edit logs mechanism. We could have a master image file that stores a snapshot of the current state of a running job. When tasks of the job change state, we could store the update immediately to a log. Periodically, we could merge these updates to the master image file. An alternative approach would be to update the image file periodically, batching updates. However in the interests of scale, and considering there may be frequent updates to tasks, we felt the earlier approach is a better one. Another point we considered was whether we could store this information to DFS, similar to HADOOP-1876 . However, given that we don't have appends, and also that the a JobTracker restart may make us lose information written to this file, we feel that may not work very well. Comments ?

            People

            • Assignee:
              Amar Kamat
              Reporter:
              Devaraj Das
            • Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development