Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 2.0.0-alpha
    • Fix Version/s: None
    • Component/s: resourcemanager
    • Labels:
      None

      Description

      This umbrella jira tracks the work needed to preserve critical state information and reload them upon RM restart.

      1. YARN-128.full-code.5.patch
        255 kB
        Bikas Saha
      2. restart-fs-store-11-17.patch
        17 kB
        Bikas Saha
      3. restart-zk-store-11-17.patch
        61 kB
        Bikas Saha
      4. restart-12-11-zkstore.patch
        21 kB
        Bikas Saha
      5. YARN-128.full-code-4.patch
        179 kB
        Bikas Saha
      6. YARN-128.new-code-added-4.patch
        78 kB
        Bikas Saha
      7. YARN-128.old-code-removed.4.patch
        123 kB
        Bikas Saha
      8. YARN-128.full-code.3.patch
        176 kB
        Bikas Saha
      9. YARN-128.new-code-added.3.patch
        74 kB
        Bikas Saha
      10. YARN-128.old-code-removed.3.patch
        123 kB
        Bikas Saha
      11. RMRestartPhase1.pdf
        59 kB
        Bikas Saha
      12. YARN-128.patch
        92 kB
        Devaraj K
      13. RM-recovery-initial-thoughts.txt
        3 kB
        Bikas Saha
      14. MR-4343.1.patch
        17 kB
        Tsuyoshi OZAWA

        Issue Links

          Activity

          Hide
          Bikas Saha added a comment -

          Will be posting a preliminary design sketch this week for comments.

          Show
          Bikas Saha added a comment - Will be posting a preliminary design sketch this week for comments.
          Hide
          Tsuyoshi OZAWA added a comment -

          Bikas,

          The attached patch is originally created for MAPREDUCE-4343, which is marked as a duplicated task of this ticket.

          The patch may be a reference, so I attached it to this ticket.

          Show
          Tsuyoshi OZAWA added a comment - Bikas, The attached patch is originally created for MAPREDUCE-4343 , which is marked as a duplicated task of this ticket. The patch may be a reference, so I attached it to this ticket.
          Hide
          Bikas Saha added a comment -

          Thanks! I will take a look before posting the design.

          Show
          Bikas Saha added a comment - Thanks! I will take a look before posting the design.
          Hide
          Sharad Agarwal added a comment -

          Arun/Bikas - what is the rationale of opening new tickets and marking the old ones as duplicate. Isn't MAPREDUCE-2713 already talking the same ?

          Show
          Sharad Agarwal added a comment - Arun/Bikas - what is the rationale of opening new tickets and marking the old ones as duplicate. Isn't MAPREDUCE-2713 already talking the same ?
          Hide
          Tsuyoshi OZAWA added a comment -

          Sharad,

          MAPREDUCE-2713 is now marked as dup of this ticket(MAPREDUCE-4326).

          Show
          Tsuyoshi OZAWA added a comment - Sharad, MAPREDUCE-2713 is now marked as dup of this ticket( MAPREDUCE-4326 ).
          Hide
          Tsuyoshi OZAWA added a comment -

          Bikas,

          What's going on? I can help you if you have a difficulty related to a preliminary design sketch.

          Show
          Tsuyoshi OZAWA added a comment - Bikas, What's going on? I can help you if you have a difficulty related to a preliminary design sketch.
          Hide
          Bikas Saha added a comment -

          I have been looking around at the code and jotted down notes on how this could be done. Its not good enough to post as a design. Its going to be a fairly non-trivial change and will take some time. I am planning to prototype something based on my notes before I post any proposal on the jira so that there is something correct and concrete in the proposal.
          In the meanwhile, if you have any ideas, please post them and I will be glad to study them.

          Show
          Bikas Saha added a comment - I have been looking around at the code and jotted down notes on how this could be done. Its not good enough to post as a design. Its going to be a fairly non-trivial change and will take some time. I am planning to prototype something based on my notes before I post any proposal on the jira so that there is something correct and concrete in the proposal. In the meanwhile, if you have any ideas, please post them and I will be glad to study them.
          Hide
          Tsuyoshi OZAWA added a comment -

          Yeah, it's not trivial what to save into ZK or the local disk of RM.
          I'm going to look at the code too, and post them here.

          Show
          Tsuyoshi OZAWA added a comment - Yeah, it's not trivial what to save into ZK or the local disk of RM. I'm going to look at the code too, and post them here.
          Hide
          Tsuyoshi OZAWA added a comment -

          I've looked around the code of RM, and I've found that the current Recoverable interface provides storing the states as follows:
          1. information about application(application ids and info defined in ApplicationId.java and ApplicationSubmissionContext.java).
          2. Information about node managers(info about Node Manager defined in RMNode.java).

          My questions are:
          1. Are the states enough to store? In my looking around the code, RMContext has the other states, however, the states are recoverable without the store.
          2. When the states should be saved onto the store?
          3. When the interface getLastLoggedNodeId() is used?

          IMHO, we should go step by step as follows:
          1. Define the states of RM, which are preserved onto MemStore/DiskStore/ZKStore.
          2. Implement the resurrectable version when the RM crashed(ex. DiskStore/ZKStore).
          Prototyping 2 and testing it will prove the correctness of 1.

          If you have any ideas, please let me know.

          Show
          Tsuyoshi OZAWA added a comment - I've looked around the code of RM, and I've found that the current Recoverable interface provides storing the states as follows: 1. information about application(application ids and info defined in ApplicationId.java and ApplicationSubmissionContext.java). 2. Information about node managers(info about Node Manager defined in RMNode.java). My questions are: 1. Are the states enough to store? In my looking around the code, RMContext has the other states, however, the states are recoverable without the store. 2. When the states should be saved onto the store? 3. When the interface getLastLoggedNodeId() is used? IMHO, we should go step by step as follows: 1. Define the states of RM, which are preserved onto MemStore/DiskStore/ZKStore. 2. Implement the resurrectable version when the RM crashed(ex. DiskStore/ZKStore). Prototyping 2 and testing it will prove the correctness of 1. If you have any ideas, please let me know.
          Hide
          Tsuyoshi OZAWA added a comment -

          s/Are the states enough to store/ Are the states enough to recover/

          Show
          Tsuyoshi OZAWA added a comment - s/Are the states enough to store/ Are the states enough to recover/
          Hide
          Bikas Saha added a comment -

          I think the current implementation (actual code/commented code/todo's etc) looks like a prototype which may not be in sync with the current state of the functional code. So I am not sure about using it as is.
          Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.
          On that note, my gut feeling is that the RM state in practice is, in a sense, the sum total of the current state of the cluster as reflected in the NM's. So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM's in a reasonable amount of time. The NM's anyways have to re-sync with the RM after it comes back up. So that is not extra overhead.
          Saving a lot of state would result in having to solve the same set of issues that the Namenode has to solve in order to maintain consistent, reliable and available saved state. IMO, for the RM we are better off avoiding those issues.
          The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed. This information is present only in the RM and so needs to be preserved across RM restart. Fortunately, this information is small and infrequently updated. So saving it synchronously in ZK may not be too much of an issue.

          Show
          Bikas Saha added a comment - I think the current implementation (actual code/commented code/todo's etc) looks like a prototype which may not be in sync with the current state of the functional code. So I am not sure about using it as is. Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store. On that note, my gut feeling is that the RM state in practice is, in a sense, the sum total of the current state of the cluster as reflected in the NM's. So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM's in a reasonable amount of time. The NM's anyways have to re-sync with the RM after it comes back up. So that is not extra overhead. Saving a lot of state would result in having to solve the same set of issues that the Namenode has to solve in order to maintain consistent, reliable and available saved state. IMO, for the RM we are better off avoiding those issues. The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed. This information is present only in the RM and so needs to be preserved across RM restart. Fortunately, this information is small and infrequently updated. So saving it synchronously in ZK may not be too much of an issue.
          Hide
          Tsuyoshi OZAWA added a comment -

          > So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM's in a reasonable amount of time.

          It's good idea to avoid saving recoverable states without storing. It's uncertain that it can be recoverable in a reasonable amount of time, so prototyping is needed.

          > The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed.

          I agree with you. I'll check whether the states of WIP jobs is defined correctly or not.

          > Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.

          I think, to avoid being the bottleneck, RM should have a dedicated thread to save the states of RM. The main thread can send the requests of saving the states to the dedicated thread without blocking by using queue or something. Using async APIs to save the states is also effective, however, the code can get complicated.

          Show
          Tsuyoshi OZAWA added a comment - > So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM's in a reasonable amount of time. It's good idea to avoid saving recoverable states without storing. It's uncertain that it can be recoverable in a reasonable amount of time, so prototyping is needed. > The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed. I agree with you. I'll check whether the states of WIP jobs is defined correctly or not. > Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store. I think, to avoid being the bottleneck, RM should have a dedicated thread to save the states of RM. The main thread can send the requests of saving the states to the dedicated thread without blocking by using queue or something. Using async APIs to save the states is also effective, however, the code can get complicated.
          Hide
          Bikas Saha added a comment -

          Attaching initial thoughts after reading the code.

          Show
          Bikas Saha added a comment - Attaching initial thoughts after reading the code.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Pasting notes from Bikas inline for easier discussion.

          Basic Idea:

          Key idea is that the state of the cluster is its current state. So don't save all container info.
          RM on startup sets a recovery flag on. Informs scheduler via API.
          Re-create running AM info from persisted state. Running AM's will heartbeat to the RM and be asked to re-sync.
          Re-start AM's that have been lost. What about AM's that completed during restart. Re-running them should be a no-op.
          Ask running and re-started AM's to re-send all pending container requests to re-create pending request state.
          RM accepts new AM registrations and their requests.
          Scheduling pass is not performed when recovery flag is on.
          RM waits for nodes to heartbeat and give it container info.
          RM passes container info to scheduler so that the scheduler can re-create current allocation state.
          After recovery time threshold, reset recovery flag and start the scheduling pass. Normal from thereon.
          Schedulers could save their state and recover previous allocation information from that saved state.

          What info comes in node heartbeats:

          Handle sequence number mismatch during recovery. On heartbeat from node send ReRegister command instead of Reboot. NodeManager should continue running containers during this time.
          RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?
          ApplicationAttemptId can be gotten from Container objects to map resources back to SchedulingApp.

          How to pause scheduling pass:

          Scheduling pass is triggered on NODE_UPDATE events that happen on node heartbeat. Easy to pause under recovery flag.
          YarnScheduler.allocate() is the API that needs to be changed.
          How to handle container releases messages that were lost when RM was down? Will AM's get delivery failure and continue to resend indefinitely?

          How to re-create scheduler allocation state:

          On node re-register, RM passes container info to scheduler so that the scheduler can re-create current allocation state.
          Use CsQueue.recoverContainer() to recover previous allocations from currently running containers.

          How to re-synchronize pending requests with AM's:

          Need new AM-RM API to resend asks from AM to RM.
          Keep accumulating asks from AM's like it currently happens when allocate() is called.

          How to persist AM state:

          Store AM info in a persistent ZK node that uses version numbers to prevent out of order updates from other RM's. One ZK node per AM under a master RM ZK node. AM submission creates ZK node. Start and restart update ZK node. Completion clears ZK node.

          Metrics:

          What needs to be done to maintain consistency across restarts. New app attempt would be a new attempt but what about recovered running apps.

          Security:

          What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers. ZK nodes themelves should be secure.

          Show
          Vinod Kumar Vavilapalli added a comment - Pasting notes from Bikas inline for easier discussion. Basic Idea: Key idea is that the state of the cluster is its current state. So don't save all container info. RM on startup sets a recovery flag on. Informs scheduler via API. Re-create running AM info from persisted state. Running AM's will heartbeat to the RM and be asked to re-sync. Re-start AM's that have been lost. What about AM's that completed during restart. Re-running them should be a no-op. Ask running and re-started AM's to re-send all pending container requests to re-create pending request state. RM accepts new AM registrations and their requests. Scheduling pass is not performed when recovery flag is on. RM waits for nodes to heartbeat and give it container info. RM passes container info to scheduler so that the scheduler can re-create current allocation state. After recovery time threshold, reset recovery flag and start the scheduling pass. Normal from thereon. Schedulers could save their state and recover previous allocation information from that saved state. What info comes in node heartbeats: Handle sequence number mismatch during recovery. On heartbeat from node send ReRegister command instead of Reboot. NodeManager should continue running containers during this time. RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers? ApplicationAttemptId can be gotten from Container objects to map resources back to SchedulingApp. How to pause scheduling pass: Scheduling pass is triggered on NODE_UPDATE events that happen on node heartbeat. Easy to pause under recovery flag. YarnScheduler.allocate() is the API that needs to be changed. How to handle container releases messages that were lost when RM was down? Will AM's get delivery failure and continue to resend indefinitely? How to re-create scheduler allocation state: On node re-register, RM passes container info to scheduler so that the scheduler can re-create current allocation state. Use CsQueue.recoverContainer() to recover previous allocations from currently running containers. How to re-synchronize pending requests with AM's: Need new AM-RM API to resend asks from AM to RM. Keep accumulating asks from AM's like it currently happens when allocate() is called. How to persist AM state: Store AM info in a persistent ZK node that uses version numbers to prevent out of order updates from other RM's. One ZK node per AM under a master RM ZK node. AM submission creates ZK node. Start and restart update ZK node. Completion clears ZK node. Metrics: What needs to be done to maintain consistency across restarts. New app attempt would be a new attempt but what about recovered running apps. Security: What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers. ZK nodes themelves should be secure.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          +1 for most of your points. Some specific comments:

          What about AM's that completed during restart. Re-running them should be a no-op.

          AMs should not finish themselves while the RM is down or recovering. They should just spin.

          How to handle container releases messages that were lost when RM was down? Will AM's get delivery failure and continue to resend indefinitely?

          You mean release requests from AM? Like above, if AMs just spin, we don't have an issue.

          Need new AM-RM API to resend asks from AM to RM.

          See AMResponse.getRebott(). That can be used to inform AMs to resend all details.

          What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers.

          We already noted this as java comments in code. Need to put in proper documentation.

          ZK nodes themelves should be secure.

          Good point. Worst case that ZK doesn't support security, we can rely on a RM specific ZK instance and firewall rules.

          More requirements:

          • An upper bound (time) on recovery?
          • Writing to ZK shouldn't add more than x% (< 1-2%) to app latency?

          More state to save:

          • New app submissions should be persisted/accepted but not acted upon during recovery.

          Miscellaneous points:

          • I think we should add a new ServiceState call Recovering and use the same in RM.
          • Overall, clients, AMs and NMs should spin while the RM is down or doing recovery. Also we need to handle fail-over of RM, should do as part of a separate ticket.
          • When is recovery officially finished? When all running AMs sync up? I suppose so, that would be an upper bound equaling AM-expiry interval.
          • Need to think of how the RM-NM shared secret roll-over is affected, if RM is down for a significant amount of item
          Show
          Vinod Kumar Vavilapalli added a comment - +1 for most of your points. Some specific comments: What about AM's that completed during restart. Re-running them should be a no-op. AMs should not finish themselves while the RM is down or recovering. They should just spin. How to handle container releases messages that were lost when RM was down? Will AM's get delivery failure and continue to resend indefinitely? You mean release requests from AM? Like above, if AMs just spin, we don't have an issue. Need new AM-RM API to resend asks from AM to RM. See AMResponse.getRebott(). That can be used to inform AMs to resend all details. What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers. We already noted this as java comments in code. Need to put in proper documentation. ZK nodes themelves should be secure. Good point. Worst case that ZK doesn't support security, we can rely on a RM specific ZK instance and firewall rules. More requirements: An upper bound (time) on recovery? Writing to ZK shouldn't add more than x% (< 1-2%) to app latency? More state to save: New app submissions should be persisted/accepted but not acted upon during recovery. Miscellaneous points: I think we should add a new ServiceState call Recovering and use the same in RM. Overall, clients, AMs and NMs should spin while the RM is down or doing recovery. Also we need to handle fail-over of RM, should do as part of a separate ticket. When is recovery officially finished? When all running AMs sync up? I suppose so, that would be an upper bound equaling AM-expiry interval. Need to think of how the RM-NM shared secret roll-over is affected, if RM is down for a significant amount of item
          Hide
          Robert Joseph Evans added a comment -

          AMs should not finish themselves while the RM is down or recovering. They should just spin.

          +1 for that. If we let the MR AM finish, and then the RM comes up and tries to restart it will get confused because it will not find the job history log where it expects to see it which will cause it to restart, and it is likely to find the output directory already populated with data, which could cause the job to fail. What is worse it may not fail, because I think the output committer will ignore those errors. The first AM could inform oozie that the job finished through a callback, and a second job may be launched and is reading the data at the time that the restarted first job is trying to write that data, which could cause inconsistent results or cause the second job to fail somewhat randomly.

          An upper bound (time) on recovery?

          This is a bit difficult to determine because the RM is responsible for renewing tokens. Right now it will renew them when they only have about 10% of their time left before they expire. So it depends on how long the shortest token you have in flight is valid for before it needs to be renewed. In general all of the tokens I have seen are for 24 hours, so you would have about 2.4 hours to bring the RM back up and read in/start renewing all of the tokens or risk tokens expiring.

          Show
          Robert Joseph Evans added a comment - AMs should not finish themselves while the RM is down or recovering. They should just spin. +1 for that. If we let the MR AM finish, and then the RM comes up and tries to restart it will get confused because it will not find the job history log where it expects to see it which will cause it to restart, and it is likely to find the output directory already populated with data, which could cause the job to fail. What is worse it may not fail, because I think the output committer will ignore those errors. The first AM could inform oozie that the job finished through a callback, and a second job may be launched and is reading the data at the time that the restarted first job is trying to write that data, which could cause inconsistent results or cause the second job to fail somewhat randomly. An upper bound (time) on recovery? This is a bit difficult to determine because the RM is responsible for renewing tokens. Right now it will renew them when they only have about 10% of their time left before they expire. So it depends on how long the shortest token you have in flight is valid for before it needs to be renewed. In general all of the tokens I have seen are for 24 hours, so you would have about 2.4 hours to bring the RM back up and read in/start renewing all of the tokens or risk tokens expiring.
          Hide
          Thomas Graves added a comment -

          RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?

          Containers can currently be lost. See YARN-72 and YARN-73. Once its changed so RM doesn't always reboot the NM's that will get a bit better but its still possible so we will have to handle somehow. Since the NM could crash it almost needs a way to check on startup whats running and at that point decide if it should clean them up. It does have a .pid file for the containers but you would have to be sure that process is the same one as when the NM went down.

          Show
          Thomas Graves added a comment - RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers? Containers can currently be lost. See YARN-72 and YARN-73 . Once its changed so RM doesn't always reboot the NM's that will get a bit better but its still possible so we will have to handle somehow. Since the NM could crash it almost needs a way to check on startup whats running and at that point decide if it should clean them up. It does have a .pid file for the containers but you would have to be sure that process is the same one as when the NM went down.
          Hide
          Thomas Graves added a comment -

          What about AM's that completed during restart. Re-running them should be a no-op.

          AMs should not finish themselves while the RM is down or recovering. They should just spin.

          Doesn't the RM still need to handle this. The client could stop the AM at any point by talking directly to it. Or since anyone can write an AM it could simply finish on its own. Or perhaps timing issue on app finish. How does the RM tell the difference? We can have the MR client/AM handle this nicely but even then there could be a bug or expiry after so long. So perhaps if the AM is down it doesn't get restarted? Thats probably not ideal if app happens to go down at the same time as the RM though - like a rack gets rebooted or something, but otherwise you have to handle all the restart issues, like Bobby mentioned above.

          Show
          Thomas Graves added a comment - What about AM's that completed during restart. Re-running them should be a no-op. AMs should not finish themselves while the RM is down or recovering. They should just spin. Doesn't the RM still need to handle this. The client could stop the AM at any point by talking directly to it. Or since anyone can write an AM it could simply finish on its own. Or perhaps timing issue on app finish. How does the RM tell the difference? We can have the MR client/AM handle this nicely but even then there could be a bug or expiry after so long. So perhaps if the AM is down it doesn't get restarted? Thats probably not ideal if app happens to go down at the same time as the RM though - like a rack gets rebooted or something, but otherwise you have to handle all the restart issues, like Bobby mentioned above.
          Hide
          Robert Joseph Evans added a comment -

          The problem is that we cannot be truly backwards compatible when adding in this feature. We have to better define the lifecycle of an AM for it to be "well behaved" and properly handle RM recovery. I would say that if the client asks the AM to stop it should still pause on unregister until it can successfully unregister, or until it can mark itself as "killed" in a persistent way like with the job history log, so that when that AM is relaunched all it has to do is to check a file on HDFS and then unregister. Perhaps the only way to be totally backwards compatible is for the AM to indicate when it registers if it supports RM recovery or not. Or to avoid any race conditions when the client launches the AM it would indicate this. If it does not (legacy AMs), then the RM will not try to relaunch it if the AM goes down while the RM is recovering. If it does, then the AM will always be relaunched when the RM goes down.

          Show
          Robert Joseph Evans added a comment - The problem is that we cannot be truly backwards compatible when adding in this feature. We have to better define the lifecycle of an AM for it to be "well behaved" and properly handle RM recovery. I would say that if the client asks the AM to stop it should still pause on unregister until it can successfully unregister, or until it can mark itself as "killed" in a persistent way like with the job history log, so that when that AM is relaunched all it has to do is to check a file on HDFS and then unregister. Perhaps the only way to be totally backwards compatible is for the AM to indicate when it registers if it supports RM recovery or not. Or to avoid any race conditions when the client launches the AM it would indicate this. If it does not (legacy AMs), then the RM will not try to relaunch it if the AM goes down while the RM is recovering. If it does, then the AM will always be relaunched when the RM goes down.
          Hide
          Devaraj K added a comment -

          Attaching the first version of patch. I have tested in a small cluster with FIFO & Capacity schedulers by making RM down and up while running the application and continued the application without any failures.

          Show
          Devaraj K added a comment - Attaching the first version of patch. I have tested in a small cluster with FIFO & Capacity schedulers by making RM down and up while running the application and continued the application without any failures.
          Hide
          Arun C Murthy added a comment -

          Agree with Bobby's concerns.

          For now I think the first step should be to merely restart all apps on RM restart, something similar to MR1 today.

          Bikas - can I pls suggest this as a first step? Thanks!

          Show
          Arun C Murthy added a comment - Agree with Bobby's concerns. For now I think the first step should be to merely restart all apps on RM restart, something similar to MR1 today. Bikas - can I pls suggest this as a first step? Thanks!
          Hide
          Bikas Saha added a comment -

          yeah. I have been thinking on similar lines too. Working on a refreshed proposal and code patch.

          Show
          Bikas Saha added a comment - yeah. I have been thinking on similar lines too. Working on a refreshed proposal and code patch.
          Hide
          Bikas Saha added a comment -

          Devaraj, I think the current approach+code based on zkstore (that YARN-128.patch builds on top of) has some significant issues wrt perf/scalability of ZK/future HA. The design outline attached to this jira calls out some of the issues. The next proposal document will help clarify a bit more I hope.

          Show
          Bikas Saha added a comment - Devaraj, I think the current approach+code based on zkstore (that YARN-128 .patch builds on top of) has some significant issues wrt perf/scalability of ZK/future HA. The design outline attached to this jira calls out some of the issues. The next proposal document will help clarify a bit more I hope.
          Hide
          Bikas Saha added a comment -

          Attaching a proposal doc and code for the first iteration. The proposal is in the same lines as the earlier initial design sketch but limits the first iteration of the work to restarting the applications after the RM comes back up. The reasoning and ideas are detailed in the doc.

          Attaching some code that implements the proposal. It includes a functional test that verifies the end-to-end scenario using an in-memory store. If everything looks good overral then I will tie up the loose ends and add more tests.

          For review, the code is broken into 1) removal of old code 2) new code + test. There are TODO comments in the code where folks could make suggestions. The code is attached in full for a build and test pass on Jenkins because my machine is having long host resolution timeouts. Any ideas on this?

          During the testing I found a bug in the CapacityScheduler because of which it fails to activate applications when resources are added to the cluster. Folks can comment on the fix. There is a separate test case that shows the bug and verifies the fix.

          Show
          Bikas Saha added a comment - Attaching a proposal doc and code for the first iteration. The proposal is in the same lines as the earlier initial design sketch but limits the first iteration of the work to restarting the applications after the RM comes back up. The reasoning and ideas are detailed in the doc. Attaching some code that implements the proposal. It includes a functional test that verifies the end-to-end scenario using an in-memory store. If everything looks good overral then I will tie up the loose ends and add more tests. For review, the code is broken into 1) removal of old code 2) new code + test. There are TODO comments in the code where folks could make suggestions. The code is attached in full for a build and test pass on Jenkins because my machine is having long host resolution timeouts. Any ideas on this? During the testing I found a bug in the CapacityScheduler because of which it fails to activate applications when resources are added to the cluster. Folks can comment on the fix. There is a separate test case that shows the bug and verifies the fix.
          Hide
          Bikas Saha added a comment -

          Updating patches for new code and combined patch.
          Changes
          1) Code added to remove application data upon completion
          2) All TODO's examined and removed/fixed.
          3) Improved TestRMRestart and its readability
          4) Added more tests for RMAppAttemptTransitions
          5) Refactored RMStateStore into an abstract class so that it can implement common functionality to notify app attempt about async store operation completion

          Fix for capacity scheduler bug is still in the patch because it blocks test completion. The issue is also tracked in YARN-209

          Show
          Bikas Saha added a comment - Updating patches for new code and combined patch. Changes 1) Code added to remove application data upon completion 2) All TODO's examined and removed/fixed. 3) Improved TestRMRestart and its readability 4) Added more tests for RMAppAttemptTransitions 5) Refactored RMStateStore into an abstract class so that it can implement common functionality to notify app attempt about async store operation completion Fix for capacity scheduler bug is still in the patch because it blocks test completion. The issue is also tracked in YARN-209
          Hide
          Bikas Saha added a comment -

          Attaching rebased patches

          Show
          Bikas Saha added a comment - Attaching rebased patches
          Hide
          Bikas Saha added a comment -

          Attaching rebased patches + change RMStateStore to throw exception to notify about store errors.

          Show
          Bikas Saha added a comment - Attaching rebased patches + change RMStateStore to throw exception to notify about store errors.
          Hide
          Arinto Murdopo added a comment -

          Based on the YARN-128.full-code-4.patch, I have these following observations:

          1) In TestRMRestart.java Line 78, app1 and appState refer to the same instance because we are using memory to store the states (MemoryRMStateStore). Therefore, the assert result will always be True.

          2) ApplicationState is stored when we invoke MockRM's submitApp method. More precisely, it is in ClientRMService class, line 266. The state that we store contains the resource request from client. In this case, the value of resource request is 200. However, if we wait for some time, the value will be updated to 1024 (which is the normalized value given by the Scheduler).

          3)Currently our school project is trying to persist the state in persistent storage, and the assert statement in our modified test class returns error since our storage stores the resource value before updated by the scheduler.

          Based on above observations, should we update the persisted memory value with the new value assigned by scheduler?
          Since we are going to restart both ApplicationMaster and NodeManager when there is failure in ResourceManager, I think the answer is no, we can use the original value requested by user. But I'm not really sure with my own reasoning.. soo.. please comment on it. . If the answer is yes, then we should wait until Scheduler updates the resource value before persisting it into the storage.

          Show
          Arinto Murdopo added a comment - Based on the YARN-128 .full-code-4.patch, I have these following observations: 1) In TestRMRestart.java Line 78, app1 and appState refer to the same instance because we are using memory to store the states (MemoryRMStateStore). Therefore, the assert result will always be True. 2) ApplicationState is stored when we invoke MockRM's submitApp method. More precisely, it is in ClientRMService class, line 266. The state that we store contains the resource request from client. In this case, the value of resource request is 200. However, if we wait for some time, the value will be updated to 1024 (which is the normalized value given by the Scheduler). 3)Currently our school project is trying to persist the state in persistent storage, and the assert statement in our modified test class returns error since our storage stores the resource value before updated by the scheduler. Based on above observations, should we update the persisted memory value with the new value assigned by scheduler? Since we are going to restart both ApplicationMaster and NodeManager when there is failure in ResourceManager, I think the answer is no, we can use the original value requested by user. But I'm not really sure with my own reasoning.. soo.. please comment on it. . If the answer is yes, then we should wait until Scheduler updates the resource value before persisting it into the storage.
          Hide
          Bikas Saha added a comment -

          1) Unless I am mistaken, the test condition is correct. app1 is the app actually submitted while appState is the state retrieved from the store. By checking that both are the same, we are checking that the data that was supposed to be passed has actually been passed to the store and there is no bug in the transfer of that data. The assert will be false if the transfer does not happen or some other value gets passed by mistake. Does that help clarify?

          3) Which resource value is this? The one that is store in ApplicationSubmissionContext->ContainerLaunchContext? In the patch, the ApplicationSubmissionContext is being store at the very beginning to ensure that the client does not have to submit the job again. Hence, the Resource set by the client is saved. I am not sure what your project is saving after the scheduling is done.
          You are right. We dont want to store the updated value since this updated value is a side-effect of the policy of the scheduler.

          I am not sure if this applies to your project. I will be shortly posting an Zookeeper and HDFS state store that you could use unless you are using your own storage mechanism.

          Show
          Bikas Saha added a comment - 1) Unless I am mistaken, the test condition is correct. app1 is the app actually submitted while appState is the state retrieved from the store. By checking that both are the same, we are checking that the data that was supposed to be passed has actually been passed to the store and there is no bug in the transfer of that data. The assert will be false if the transfer does not happen or some other value gets passed by mistake. Does that help clarify? 3) Which resource value is this? The one that is store in ApplicationSubmissionContext->ContainerLaunchContext? In the patch, the ApplicationSubmissionContext is being store at the very beginning to ensure that the client does not have to submit the job again. Hence, the Resource set by the client is saved. I am not sure what your project is saving after the scheduling is done. You are right. We dont want to store the updated value since this updated value is a side-effect of the policy of the scheduler. I am not sure if this applies to your project. I will be shortly posting an Zookeeper and HDFS state store that you could use unless you are using your own storage mechanism.
          Hide
          Arinto Murdopo added a comment -

          1) Yes, I agree with your clarification. It works as what you state when we are using persistent storage (not MemStore, but ZK, MySQL, file or other persistent storage)
          However, when we are using MemStore, the stored object (appState) and app1 are referring to the same instance since our "store" is memory. To test my argument, we can put breakpoint in the assert statement that compares the ApplicationSubmissionContext, then use IDE feature to change any value of appState's properties i.e resource in ApplicationSubmissionContext. The corresponding app1 value (in this case is the resource in app1's ApplicationSubmissionContext) will also be updated to the same value.

          3). Yes, it's in Resource in ApplicationSubmissionContext->ContainerLaunchContext. e
          If we are saving the original resource value requested by client, then the assert statement that compare ApplicationSubmissionContext will not pass.
          Let's say Client request resource of memory with value of 200. We store this in our persistent storage. After we store, scheduler updates the resource with value of 1024. In this case, the resource in app1 instance will be 1024, but the resource that stored in our storage is 200. Hence, it will not pass when we compare them using current assert statement. Maybe we need to keep storing our original resource request in ApplicationSubmissionContext.

          Looking forward to your ZK and HDFS state store. The state store in our project is using MySQL cluster.

          Show
          Arinto Murdopo added a comment - 1) Yes, I agree with your clarification. It works as what you state when we are using persistent storage (not MemStore, but ZK, MySQL, file or other persistent storage) However, when we are using MemStore, the stored object (appState) and app1 are referring to the same instance since our "store" is memory. To test my argument, we can put breakpoint in the assert statement that compares the ApplicationSubmissionContext, then use IDE feature to change any value of appState's properties i.e resource in ApplicationSubmissionContext. The corresponding app1 value (in this case is the resource in app1's ApplicationSubmissionContext) will also be updated to the same value. 3). Yes, it's in Resource in ApplicationSubmissionContext->ContainerLaunchContext. e If we are saving the original resource value requested by client, then the assert statement that compare ApplicationSubmissionContext will not pass. Let's say Client request resource of memory with value of 200. We store this in our persistent storage. After we store, scheduler updates the resource with value of 1024. In this case, the resource in app1 instance will be 1024, but the resource that stored in our storage is 200. Hence, it will not pass when we compare them using current assert statement. Maybe we need to keep storing our original resource request in ApplicationSubmissionContext. Looking forward to your ZK and HDFS state store. The state store in our project is using MySQL cluster.
          Hide
          Tom White added a comment -

          Bikas, this looks good so far. Thanks for working on it. A few comments:

          • Is there a race condition in ResourceManager#recover where RMAppImpl#recover is called after the StartAppAttemptTransition from resubmitting the app? The problem would be that the earlier app attempts (from before the resart) would not be the first ones since the new attempt would get in first.
          • I think we need the concept of a 'killed' app attempt (when the system is at fault, not the app) as well as a 'failed' attempt, like we have in MR task attempts. Without the distinction a restart will count against the user's app attempts (default 1 retry) which is undesirable.
          • Rather than change the ResourceManager constructor, you could read the recoveryEnabled flag from the configuration.
          Show
          Tom White added a comment - Bikas, this looks good so far. Thanks for working on it. A few comments: Is there a race condition in ResourceManager#recover where RMAppImpl#recover is called after the StartAppAttemptTransition from resubmitting the app? The problem would be that the earlier app attempts (from before the resart) would not be the first ones since the new attempt would get in first. I think we need the concept of a 'killed' app attempt (when the system is at fault, not the app) as well as a 'failed' attempt, like we have in MR task attempts. Without the distinction a restart will count against the user's app attempts (default 1 retry) which is undesirable. Rather than change the ResourceManager constructor, you could read the recoveryEnabled flag from the configuration.
          Hide
          Bikas Saha added a comment -

          @Arinto
          Thanks for using the code!
          1) Yes. Both are the same object. But that is what the test is testing. That the context that got saved in the store is the same as the one the app was submitted with. We are doing this with an in memory store that lets us examine the stored data and compare it with the real data. A real store would save this the data. So comparison is not possible.
          3) Yes. It seems incorrect to store scheduler side-effects. e.g. upon restart if the scheduler config make minimum container size = 512 then again it will not match.
          I am attaching a patch for a ZK store that you can try. It applies on top of the current full patch.

          @Tom
          Thanks for reviewing!
          1) There is no race condition because the Dispatcher has not been started yet and hence the attempt start event has not been processed. There is a comment to that effect in the code.
          2) I agree. I had thought about it too. But it looks like the current behavior (before this patch) does this because it does not differentiate killed/failed attempts when deciding that the attempt retry limit has been reached. So I thought about leaving it for a separate jira which would be unrelated to this. Once that is done this code could use it and not count the restarted attempt. This patch is already huge. Does that sound good?
          3) Yes. That could be done. The constructor makes it easier to write tests without mangling configs.

          Show
          Bikas Saha added a comment - @Arinto Thanks for using the code! 1) Yes. Both are the same object. But that is what the test is testing. That the context that got saved in the store is the same as the one the app was submitted with. We are doing this with an in memory store that lets us examine the stored data and compare it with the real data. A real store would save this the data. So comparison is not possible. 3) Yes. It seems incorrect to store scheduler side-effects. e.g. upon restart if the scheduler config make minimum container size = 512 then again it will not match. I am attaching a patch for a ZK store that you can try. It applies on top of the current full patch. @Tom Thanks for reviewing! 1) There is no race condition because the Dispatcher has not been started yet and hence the attempt start event has not been processed. There is a comment to that effect in the code. 2) I agree. I had thought about it too. But it looks like the current behavior (before this patch) does this because it does not differentiate killed/failed attempts when deciding that the attempt retry limit has been reached. So I thought about leaving it for a separate jira which would be unrelated to this. Once that is done this code could use it and not count the restarted attempt. This patch is already huge. Does that sound good? 3) Yes. That could be done. The constructor makes it easier to write tests without mangling configs.
          Hide
          Tom White added a comment -

          You are right about there being no race - I missed the comment! I opened YARN-218 for the killed/failed distinction as I agree it can be tackled separately.

          Show
          Tom White added a comment - You are right about there being no race - I missed the comment! I opened YARN-218 for the killed/failed distinction as I agree it can be tackled separately.
          Hide
          Bikas Saha added a comment -

          Updated ZK and FileSystem store patches. FileSystem patch applies after ZK patch.

          Show
          Bikas Saha added a comment - Updated ZK and FileSystem store patches. FileSystem patch applies after ZK patch.
          Hide
          Tom White added a comment -

          I had a quick look at the new patches and FileSystemRMStateStore and ZKRMStateStore seem to be missing default constructors, which StoreFactory needs. You might change the tests to use StoreFactory to construct the store instances to test this code path.

          Show
          Tom White added a comment - I had a quick look at the new patches and FileSystemRMStateStore and ZKRMStateStore seem to be missing default constructors, which StoreFactory needs. You might change the tests to use StoreFactory to construct the store instances to test this code path.
          Hide
          Bikas Saha added a comment -

          Thanks for looking at the patches while work is still in progress. That helps a lot!
          Yes. I am working on that currently. The 2 also have a lot of duplicated code which I am moving into the base class. I will soon create a few sub tasks and post the final patches in them so that its easier to review and commit them.

          Show
          Bikas Saha added a comment - Thanks for looking at the patches while work is still in progress. That helps a lot! Yes. I am working on that currently. The 2 also have a lot of duplicated code which I am moving into the base class. I will soon create a few sub tasks and post the final patches in them so that its easier to review and commit them.
          Hide
          Bikas Saha added a comment -

          Attaching final patch with full changes for a test run. Can someone with access please trigger a test run on JIRA?
          Changes
          1) Completed handling on unmanaged AM's
          2) Refactored ZK and FileSystem store classes to move common logic into the base class and also integrate with the RM
          3) Test improvements
          I have tested manually on a single node with both ZK and FileSystem store (using HDFS) and run wordcount job across a restart.

          I will create sub-tasks of this jira to break the changes into logical pieces.

          Show
          Bikas Saha added a comment - Attaching final patch with full changes for a test run. Can someone with access please trigger a test run on JIRA? Changes 1) Completed handling on unmanaged AM's 2) Refactored ZK and FileSystem store classes to move common logic into the base class and also integrate with the RM 3) Test improvements I have tested manually on a single node with both ZK and FileSystem store (using HDFS) and run wordcount job across a restart. I will create sub-tasks of this jira to break the changes into logical pieces.
          Hide
          Bikas Saha added a comment -

          Done creating sub-tasks and attaching final patches for review and commit.

          Show
          Bikas Saha added a comment - Done creating sub-tasks and attaching final patches for review and commit.
          Hide
          Arinto Murdopo added a comment -

          Tested the YARN-128.full-code.5.patch, using ZooKeeper store and the result is positive. ResourceManager resurrected properly after we killed it.
          Experiment overview:

          • ZK settings: 1 ZK-Server consisted of 3 different nodes
          • HDFS was in single-node setting. YARN and HDFS was executed in the same node.
          • Executed bbp and pi examples from the generated hadoop distribution (we built and packaged the trunk and patch code)
          • Killed ResourceManager process when bbp or pi was executing(using Linux kill command) and started new RM 3 seconds after we killed it.
          Show
          Arinto Murdopo added a comment - Tested the YARN-128 .full-code.5.patch, using ZooKeeper store and the result is positive. ResourceManager resurrected properly after we killed it. Experiment overview: ZK settings: 1 ZK-Server consisted of 3 different nodes HDFS was in single-node setting. YARN and HDFS was executed in the same node. Executed bbp and pi examples from the generated hadoop distribution (we built and packaged the trunk and patch code) Killed ResourceManager process when bbp or pi was executing(using Linux kill command) and started new RM 3 seconds after we killed it.
          Hide
          Robert Joseph Evans added a comment -

          Those are good results.

          Show
          Robert Joseph Evans added a comment - Those are good results.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12554338/YARN-128.full-code.5.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 34 new or modified test files.

          -1 javac. The patch appears to cause the build to fail.

          Console output: https://builds.apache.org/job/PreCommit-YARN-Build/183//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12554338/YARN-128.full-code.5.patch against trunk revision . +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 34 new or modified test files. -1 javac . The patch appears to cause the build to fail. Console output: https://builds.apache.org/job/PreCommit-YARN-Build/183//console This message is automatically generated.
          Hide
          Bikas Saha added a comment -

          Thanks for using it Arinto and posting results!

          Show
          Bikas Saha added a comment - Thanks for using it Arinto and posting results!
          Hide
          Strahinja Lazetic added a comment -

          Bikas, I have one question; Since we reboot NMs and terminate all the running containers and AMs upon the RM restart, why do we need to keep track of the previous Applications' attempts? Couldn't we just start "from scratch" instead of generating the next attempt id based on the last running one?

          Show
          Strahinja Lazetic added a comment - Bikas, I have one question; Since we reboot NMs and terminate all the running containers and AMs upon the RM restart, why do we need to keep track of the previous Applications' attempts? Couldn't we just start "from scratch" instead of generating the next attempt id based on the last running one?
          Hide
          Bikas Saha added a comment -

          Yes we need to. This is because many things like failure tracking of AM attempts, job history, log and debug information are tied to attempts and so we cannot forget them.
          Also, restarting everything is just the first step. We want to move towards a work-preserving restart (see doc on jira) and the current approach builds the ground work for it.

          Show
          Bikas Saha added a comment - Yes we need to. This is because many things like failure tracking of AM attempts, job history, log and debug information are tied to attempts and so we cannot forget them. Also, restarting everything is just the first step. We want to move towards a work-preserving restart (see doc on jira) and the current approach builds the ground work for it.
          Hide
          Yesha Vora added a comment -

          Succeed job tries to restart after RMrestart

          Show
          Yesha Vora added a comment - Succeed job tries to restart after RMrestart
          Hide
          Yesha Vora added a comment -

          reducer of sort job restarts from scratch in between after RM restart

          Show
          Yesha Vora added a comment - reducer of sort job restarts from scratch in between after RM restart
          Hide
          Yesha Vora added a comment -

          Historyserver does not refresh the result of restarted jobs after RM restart

          Show
          Yesha Vora added a comment - Historyserver does not refresh the result of restarted jobs after RM restart
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Keeping it unassigned given multiple contributors. Removing target-version given it spanned across releases. Marked it as a feature.

          Show
          Vinod Kumar Vavilapalli added a comment - Keeping it unassigned given multiple contributors. Removing target-version given it spanned across releases. Marked it as a feature.

            People

            • Assignee:
              Unassigned
              Reporter:
              Arun C Murthy
            • Votes:
              1 Vote for this issue
              Watchers:
              64 Start watching this issue

              Dates

              • Created:
                Updated:

                Development