Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-58

Use YARN's AMRMClientAsync client library

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.8.0
    • Component/s: yarn
    • Labels:
      None

      Description

      YARN 2.2.0 has a nice Async API for clients and AMs. This API didn't exist when we did the initial YARN integration for Samza. We should upgrade Samza to use these new APIs.

      The API is loosely based off Samza's own AM code, so we can probably strip out a lot of it (YarnAppMaster, mainly), and switch everything over to the call-back based API.

      For details, see:

      https://issues.apache.org/jira/browse/YARN-417

      This new API is used in DistributedShell now, so we can use that for testing.

      1. SAMZA-58.1.patch
        43 kB
        Zhijie Shen
      2. SAMZA-58.2.patch
        46 kB
        Zhijie Shen

        Activity

        Hide
        zjshen Zhijie Shen added a comment -

        AMRMClientAsync can be used to talk to RM, and YarnAppMaster seems not to need to do while loop to invoke allocate, and the following logic can be moved to CallbackHandler's implementation. Similarly we can use NMClientAsync to replace NMClient. I'd like to work on this issue.

        Show
        zjshen Zhijie Shen added a comment - AMRMClientAsync can be used to talk to RM, and YarnAppMaster seems not to need to do while loop to invoke allocate, and the following logic can be moved to CallbackHandler's implementation. Similarly we can use NMClientAsync to replace NMClient. I'd like to work on this issue.
        Hide
        criccomini Chris Riccomini added a comment -

        Sounds good Zhijie Shen. The goal with YARN-417 was to get something similar to what we have here, so we can convert. Happy to have you take this JIRA.

        We'll also need this in order to better support HA RM when YARN 2.4 is released, so good timing.

        Show
        criccomini Chris Riccomini added a comment - Sounds good Zhijie Shen . The goal with YARN-417 was to get something similar to what we have here, so we can convert. Happy to have you take this JIRA. We'll also need this in order to better support HA RM when YARN 2.4 is released, so good timing.
        Hide
        zjshen Zhijie Shen added a comment -

        I've created a patch, which use AMRMClientAsync to replace AMRMClient. In this case YarnAppMaster is no longer required, but I keep the loop, because I want to block SamzaAppMaster termination until all the listeners are done.

        Two related issues that I've found, but haven't fixed in this patch:
        1. The heartbeat interval was set to 1s by hard code. I kept it, but ideally it's good to be configurable. Thoughts?
        2. Samza will always report progress = 0 to YARN. Though it doesn't do harm to the application execution, but users are going to be confused when monitoring Samza jobs' progress

        NMClient can be replaced by NMClientAsync as well. However, we need to think of the logic when startContainer becomes nonblocking. Therefore, I prefer to solve it separately. Thoughts?

        Show
        zjshen Zhijie Shen added a comment - I've created a patch, which use AMRMClientAsync to replace AMRMClient. In this case YarnAppMaster is no longer required, but I keep the loop, because I want to block SamzaAppMaster termination until all the listeners are done. Two related issues that I've found, but haven't fixed in this patch: 1. The heartbeat interval was set to 1s by hard code. I kept it, but ideally it's good to be configurable. Thoughts? 2. Samza will always report progress = 0 to YARN. Though it doesn't do harm to the application execution, but users are going to be confused when monitoring Samza jobs' progress NMClient can be replaced by NMClientAsync as well. However, we need to think of the logic when startContainer becomes nonblocking. Therefore, I prefer to solve it separately. Thoughts?
        Hide
        criccomini Chris Riccomini added a comment - - edited

        1. The heartbeat interval was set to 1s by hard code. I kept it, but ideally it's good to be configurable. Thoughts?

        Totally agree. I think we can add a setting in YarnConfig to tweak this.

        2. Samza will always report progress = 0 to YARN. Though it doesn't do harm to the application execution, but users are going to be confused when monitoring Samza jobs' progress

        Yeah, I wasn't sure what to do there. YARN doesn't really handle infinitely running jobs well (or at least it didn't at the time I wrote this). Do you have any idea what the recommended approach is now? I've heard some people just put random numbers in there every time. Others I've heard slide it up and down like an animation.

        NMClient can be replaced by NMClientAsync as well. However, we need to think of the logic when startContainer becomes nonblocking. Therefore, I prefer to solve it separately. Thoughts?

        Agree. A separate ticket for the NMClient is probably better.

        Show
        criccomini Chris Riccomini added a comment - - edited 1. The heartbeat interval was set to 1s by hard code. I kept it, but ideally it's good to be configurable. Thoughts? Totally agree. I think we can add a setting in YarnConfig to tweak this. 2. Samza will always report progress = 0 to YARN. Though it doesn't do harm to the application execution, but users are going to be confused when monitoring Samza jobs' progress Yeah, I wasn't sure what to do there. YARN doesn't really handle infinitely running jobs well (or at least it didn't at the time I wrote this). Do you have any idea what the recommended approach is now? I've heard some people just put random numbers in there every time. Others I've heard slide it up and down like an animation. NMClient can be replaced by NMClientAsync as well. However, we need to think of the logic when startContainer becomes nonblocking. Therefore, I prefer to solve it separately. Thoughts? Agree. A separate ticket for the NMClient is probably better.
        Hide
        zjshen Zhijie Shen added a comment -

        Yeah, I wasn't sure what to do there. YARN doesn't really handle infinitely running jobs well (or at least it didn't at the time I wrote this). Do you have any idea what the recommended approach is now?

        MapReduce job computes the progress as following: 1. dividing a job into several phases and each phase has a weight; 2. in Map/Reduce phase, the sub-progress = # completed tasks / # total tasks. I'm not sure whether it is a good idea to let progress = completedTasks / taskCount in SamzaAppMasterState

        Show
        zjshen Zhijie Shen added a comment - Yeah, I wasn't sure what to do there. YARN doesn't really handle infinitely running jobs well (or at least it didn't at the time I wrote this). Do you have any idea what the recommended approach is now? MapReduce job computes the progress as following: 1. dividing a job into several phases and each phase has a weight; 2. in Map/Reduce phase, the sub-progress = # completed tasks / # total tasks. I'm not sure whether it is a good idea to let progress = completedTasks / taskCount in SamzaAppMasterState
        Hide
        criccomini Chris Riccomini added a comment -

        MapReduce job computes the progress as following: 1. dividing a job into several phases and each phase has a weight; 2. in Map/Reduce phase, the sub-progress = # completed tasks / # total tasks. I'm not sure whether it is a good idea to let progress = completedTasks / taskCount in SamzaAppMasterState

        I think the trick is that Samza jobs tend to be infinitely running, so the containers never complete. We could report progress the way you define, though. It's just that most jobs would report a progress of 0 forever.

        Also, at first glance, this patch looks good. Have you tried running it through hello-samza? We don't really have integration tests right now, but it'd be good to try it on a YARN grid first. If you can't, I can give it a shot, but it might take a bit to get some time to do it.

        Show
        criccomini Chris Riccomini added a comment - MapReduce job computes the progress as following: 1. dividing a job into several phases and each phase has a weight; 2. in Map/Reduce phase, the sub-progress = # completed tasks / # total tasks. I'm not sure whether it is a good idea to let progress = completedTasks / taskCount in SamzaAppMasterState I think the trick is that Samza jobs tend to be infinitely running, so the containers never complete. We could report progress the way you define, though. It's just that most jobs would report a progress of 0 forever. Also, at first glance, this patch looks good. Have you tried running it through hello-samza? We don't really have integration tests right now, but it'd be good to try it on a YARN grid first. If you can't, I can give it a shot, but it might take a bit to get some time to do it.
        Hide
        zjshen Zhijie Shen added a comment -

        Also, at first glance, this patch looks good. Have you tried running it through hello-samza? We don't really have integration tests right now, but it'd be good to try it on a YARN grid first. If you can't, I can give it a shot, but it might take a bit to get some time to do it.

        Thanks for your review. I've not tried the patch with hello-samza. I can take care of it.

        Show
        zjshen Zhijie Shen added a comment - Also, at first glance, this patch looks good. Have you tried running it through hello-samza? We don't really have integration tests right now, but it'd be good to try it on a YARN grid first. If you can't, I can give it a shot, but it might take a bit to get some time to do it. Thanks for your review. I've not tried the patch with hello-samza. I can take care of it.
        Hide
        criccomini Chris Riccomini added a comment -

        I ran this patch against hello-samza, and it works great. I killed the container, and the AM restarted. I killed the AM, and the job restarted as expected. I killed the container 10x, and the AM shut itself down as expected.

        Looks good.

        Show
        criccomini Chris Riccomini added a comment - I ran this patch against hello-samza, and it works great. I killed the container, and the AM restarted. I killed the AM, and the job restarted as expected. I killed the container 10x, and the AM shut itself down as expected. Looks good.
        Hide
        criccomini Chris Riccomini added a comment -

        So, I think the things that need to be updated are:

        1. Make DEFAULT_POLL_INTERVAL_MS configurable. You can add a yarn.am.poll.interval.ms config to YarnConfig.scala.
        2. The while-loop in run() seems strange now, since we're not heart-beating it anymore. Should we re-think this section of code?

        For (2), I think it's safe for us to remove the onEventLoop callback in the YarnAppMasterListener interface. If we do this, it seems to me that we don't really need that while loop anymore. I'm not sure how the other new async AMs handle this, but we essentially just want to amClient.start, listen for some form of a shutdown event (some exception, or shouldShutdown==true), and then call amClient.stop. I wish the amClient itself had a heartbeat callback. Do most other async AMs just sit in a loop like this as well?

        Show
        criccomini Chris Riccomini added a comment - So, I think the things that need to be updated are: Make DEFAULT_POLL_INTERVAL_MS configurable. You can add a yarn.am.poll.interval.ms config to YarnConfig.scala. The while-loop in run() seems strange now, since we're not heart-beating it anymore. Should we re-think this section of code? For (2), I think it's safe for us to remove the onEventLoop callback in the YarnAppMasterListener interface. If we do this, it seems to me that we don't really need that while loop anymore. I'm not sure how the other new async AMs handle this, but we essentially just want to amClient.start, listen for some form of a shutdown event (some exception, or shouldShutdown==true), and then call amClient.stop. I wish the amClient itself had a heartbeat callback. Do most other async AMs just sit in a loop like this as well?
        Hide
        zjshen Zhijie Shen added a comment -

        Chris, thanks for reviewing and testing the patch!

        W.R.T (2), I understand it seems to be weird to have such a dummy loop. However, the tricky part is that AMRClientAsyncImpl has a non-daemon thread (given the threat that creates amClient is non-daemon) to do heartbeat with ResourceManager, and a daemon thread to invoke the right callback. Given main thread is not trapped in the loop, when any error happens on the heartbeat thread or ResourceManager notify AM of resync or shutdown, the heartbeat thread is to exit, and the process is to exit as all non-daemon thread have exited, such that the callback to stop listeners and stop amClient may have a chance to execute.

        I didn't know many applications, but I saw that the demonstrating application: distributed shell is sitting in a loop in the main thread until the finish condition is met.

        Show
        zjshen Zhijie Shen added a comment - Chris, thanks for reviewing and testing the patch! W.R.T (2), I understand it seems to be weird to have such a dummy loop. However, the tricky part is that AMRClientAsyncImpl has a non-daemon thread (given the threat that creates amClient is non-daemon) to do heartbeat with ResourceManager, and a daemon thread to invoke the right callback. Given main thread is not trapped in the loop, when any error happens on the heartbeat thread or ResourceManager notify AM of resync or shutdown, the heartbeat thread is to exit, and the process is to exit as all non-daemon thread have exited, such that the callback to stop listeners and stop amClient may have a chance to execute. I didn't know many applications, but I saw that the demonstrating application: distributed shell is sitting in a loop in the main thread until the finish condition is met.
        Hide
        zjshen Zhijie Shen added a comment - - edited

        I created a new patch (https://reviews.apache.org/r/19963/), which address Chris' comments. It is worth mentioning:

        1. I still keep the loop, as i think it's necessary to prevent the process from exiting early.

        2. I keep progress 0 as we've not concluded a correct Samza progress.

        BTW, when we think about whether the loop is required or not, it's better not to make the assumption we need the implementation of AMRMClientAsyncImpl, given it may be changed in the future. Probably we want something like that AMRMClientAsync can have the API block the caller until unregistration. Unfortunately, we don't have it on hand right now. Thoughts?

        Show
        zjshen Zhijie Shen added a comment - - edited I created a new patch ( https://reviews.apache.org/r/19963/ ), which address Chris' comments. It is worth mentioning: 1. I still keep the loop, as i think it's necessary to prevent the process from exiting early. 2. I keep progress 0 as we've not concluded a correct Samza progress. BTW, when we think about whether the loop is required or not, it's better not to make the assumption we need the implementation of AMRMClientAsyncImpl, given it may be changed in the future. Probably we want something like that AMRMClientAsync can have the API block the caller until unregistration. Unfortunately, we don't have it on hand right now. Thoughts?
        Hide
        criccomini Chris Riccomini added a comment - - edited

        2. I keep progress 0 as we've not concluded a correct Samza progress.

        I've opened a separate JIRA to track this issue: SAMZA-228

        BTW, when we think about whether the loop is required or not, it's better not to make the assumption we need the implementation of AMRMClientAsyncImpl, given it may be changed in the future. Probably we want something like that AMRMClientAsync can have the API block the caller until unregistration. Unfortunately, we don't have it on hand right now. Thoughts?

        Yea, I agree. It almost feels like there should be something like amClient.await, but I think this should be a YARN JIRA, rather than a Samza one. Given that the other AMs (i.e. the dist shell example) are using an event loop, I think it's fine to stick with what we've got.

        Show
        criccomini Chris Riccomini added a comment - - edited 2. I keep progress 0 as we've not concluded a correct Samza progress. I've opened a separate JIRA to track this issue: SAMZA-228 BTW, when we think about whether the loop is required or not, it's better not to make the assumption we need the implementation of AMRMClientAsyncImpl, given it may be changed in the future. Probably we want something like that AMRMClientAsync can have the API block the caller until unregistration. Unfortunately, we don't have it on hand right now. Thoughts? Yea, I agree. It almost feels like there should be something like amClient.await, but I think this should be a YARN JIRA, rather than a Samza one. Given that the other AMs (i.e. the dist shell example) are using an event loop, I think it's fine to stick with what we've got.
        Hide
        criccomini Chris Riccomini added a comment -

        +1 Merged and committed.

        Thanks!

        Show
        criccomini Chris Riccomini added a comment - +1 Merged and committed. Thanks!
        Hide
        zjshen Zhijie Shen added a comment -

        Yea, I agree. It almost feels like there should be something like amClient.await, but I think this should be a YARN JIRA, rather than a Samza one.

        I filed a YARN ticket: YARN-1954. See how YARN folks think about it.

        Show
        zjshen Zhijie Shen added a comment - Yea, I agree. It almost feels like there should be something like amClient.await, but I think this should be a YARN JIRA, rather than a Samza one. I filed a YARN ticket: YARN-1954 . See how YARN folks think about it.

          People

          • Assignee:
            zjshen Zhijie Shen
            Reporter:
            criccomini Chris Riccomini
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development