Uploaded image for project: 'Apache Twill'
  1. Apache Twill
  2. TWILL-190

Restart of a TwillRunnable does not wait for the runnable to stop

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0-incubating, 0.7.0-incubating
    • Fix Version/s: 0.8.0
    • Component/s: core, yarn
    • Labels:
      None

      Description

      Today when a TwillRunnable is restarted, the call sends a stop message to the TwillRunnable, and then starts new TwillRunnable without waiting for the stopping runnable to finish stopping.

      This can leave a non-responding TwillRunnable container running, and can lead to issues like two TwillRunnables with same instance id running at the same time.

      We should kill the containers that don't respond to stop message.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75177188

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java —
          @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) {

          Preconditions.checkState(containerId != null,
          "No container found for {} with instanceId = {}", runnableName, instanceId);
          + return controller;
          + }

          + // This method only stops a runnable using the controller.
          + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop
          + // This method will block until handleCompleted() method runs or a timeout occurs
          + // Hence this method should not be called with a containerLock taken
          + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) {
          LOG.info("Stopping service: {} {}", runnableName, controller.getRunId());
          + // This call will block until handleCompleted() method runs or a timeout occurs
          controller.stopAndWait();

          • containers.remove(runnableName, containerId);
              • End diff –

          Where do we do the removal of containers now?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75177188 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java — @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) { Preconditions.checkState(containerId != null, "No container found for {} with instanceId = {}", runnableName, instanceId); + return controller; + } + // This method only stops a runnable using the controller. + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop + // This method will block until handleCompleted() method runs or a timeout occurs + // Hence this method should not be called with a containerLock taken + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) { LOG.info("Stopping service: {} {}", runnableName, controller.getRunId()); + // This call will block until handleCompleted() method runs or a timeout occurs controller.stopAndWait(); containers.remove(runnableName, containerId); End diff – Where do we do the removal of containers now?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75228168

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java —
          @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) {

          Preconditions.checkState(containerId != null,
          "No container found for {} with instanceId = {}", runnableName, instanceId);
          + return controller;
          + }

          + // This method only stops a runnable using the controller.
          + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop
          + // This method will block until handleCompleted() method runs or a timeout occurs
          + // Hence this method should not be called with a containerLock taken
          + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) {
          LOG.info("Stopping service: {} {}", runnableName, controller.getRunId());
          + // This call will block until handleCompleted() method runs or a timeout occurs
          controller.stopAndWait();

          • containers.remove(runnableName, containerId);
              • End diff –

          It is removed in `handleCompleted()` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75228168 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java — @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) { Preconditions.checkState(containerId != null, "No container found for {} with instanceId = {}", runnableName, instanceId); + return controller; + } + // This method only stops a runnable using the controller. + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop + // This method will block until handleCompleted() method runs or a timeout occurs + // Hence this method should not be called with a containerLock taken + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) { LOG.info("Stopping service: {} {}", runnableName, controller.getRunId()); + // This call will block until handleCompleted() method runs or a timeout occurs controller.stopAndWait(); containers.remove(runnableName, containerId); End diff – It is removed in `handleCompleted()` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75257373

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java —
          @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) {

          Preconditions.checkState(containerId != null,
          "No container found for {} with instanceId = {}", runnableName, instanceId);
          + return controller;
          + }

          + // This method only stops a runnable using the controller.
          + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop
          + // This method will block until handleCompleted() method runs or a timeout occurs
          + // Hence this method should not be called with a containerLock taken
          + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) {
          LOG.info("Stopping service: {} {}", runnableName, controller.getRunId());
          + // This call will block until handleCompleted() method runs or a timeout occurs
          controller.stopAndWait();

          • containers.remove(runnableName, containerId);
              • End diff –

          Realized that we need to clean up state in case of timeout, so added it back.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75257373 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java — @@ -222,14 +231,17 @@ private void removeInstanceById(String runnableName, int instanceId) { Preconditions.checkState(containerId != null, "No container found for {} with instanceId = {}", runnableName, instanceId); + return controller; + } + // This method only stops a runnable using the controller. + // The cleanup of the state happens when handleCompleted() method runs for the runnable after the stop + // This method will block until handleCompleted() method runs or a timeout occurs + // Hence this method should not be called with a containerLock taken + private void stopInstanceAndWait(String runnableName, TwillContainerController controller) { LOG.info("Stopping service: {} {}", runnableName, controller.getRunId()); + // This call will block until handleCompleted() method runs or a timeout occurs controller.stopAndWait(); containers.remove(runnableName, containerId); End diff – Realized that we need to clean up state in case of timeout, so added it back.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on the issue:

          https://github.com/apache/twill/pull/2

          @chtyim All the tests pass now, please review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on the issue: https://github.com/apache/twill/pull/2 @chtyim All the tests pass now, please review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hsaputra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75424881

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -153,21 +155,29 @@ public TwillContainerController start(RunId runId, int instanceId, Class<?> main
          .addCommand(firstCommand, command.toArray(new String[command.size()]))
          .launch();

          • TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
            + TwillContainerControllerImpl controller =
            + new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController);
            controller.start();
            return controller;
            }

          private static final class TwillContainerControllerImpl extends AbstractZKServiceController
          implements TwillContainerController {

          + private final String runnable;
          + private final int instanceId;
          private final ProcessController<Void> processController;
          + // This latch can be used to wait for container shutdown
          + private final CountDownLatch shutdownLatch;
          private volatile ContainerLiveNodeData liveData;

          • protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
            + protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, String runnable, int instanceId,
            ProcessController<Void> processController) {
            super(runId, zkClient);
            + this.runnable = runnable;
            + this.instanceId = instanceId;
            this.processController = processController;
            + this.shutdownLatch = new CountDownLatch(1);
              • End diff –

          I am confused, so the `CountDownLatch` is own as private member and being set in another method within a same instance?

          Show
          githubbot ASF GitHub Bot added a comment - Github user hsaputra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75424881 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -153,21 +155,29 @@ public TwillContainerController start(RunId runId, int instanceId, Class<?> main .addCommand(firstCommand, command.toArray(new String [command.size()] )) .launch(); TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController); + TwillContainerControllerImpl controller = + new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController); controller.start(); return controller; } private static final class TwillContainerControllerImpl extends AbstractZKServiceController implements TwillContainerController { + private final String runnable; + private final int instanceId; private final ProcessController<Void> processController; + // This latch can be used to wait for container shutdown + private final CountDownLatch shutdownLatch; private volatile ContainerLiveNodeData liveData; protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, + protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, String runnable, int instanceId, ProcessController<Void> processController) { super(runId, zkClient); + this.runnable = runnable; + this.instanceId = instanceId; this.processController = processController; + this.shutdownLatch = new CountDownLatch(1); End diff – I am confused, so the `CountDownLatch` is own as private member and being set in another method within a same instance?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75439711

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -153,21 +155,29 @@ public TwillContainerController start(RunId runId, int instanceId, Class<?> main
          .addCommand(firstCommand, command.toArray(new String[command.size()]))
          .launch();

          • TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
            + TwillContainerControllerImpl controller =
            + new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController);
            controller.start();
            return controller;
            }

          private static final class TwillContainerControllerImpl extends AbstractZKServiceController
          implements TwillContainerController {

          + private final String runnable;
          + private final int instanceId;
          private final ProcessController<Void> processController;
          + // This latch can be used to wait for container shutdown
          + private final CountDownLatch shutdownLatch;
          private volatile ContainerLiveNodeData liveData;

          • protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
            + protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, String runnable, int instanceId,
            ProcessController<Void> processController) {
            super(runId, zkClient);
            + this.runnable = runnable;
            + this.instanceId = instanceId;
            this.processController = processController;
            + this.shutdownLatch = new CountDownLatch(1);
              • End diff –

          The thread that stops the container needs to wait until the container stops. The only good way to know whether the container has stopped or not is to get a notification from RM during the heartbeat (that happens in a separate thread). Hence we wait on a `CountDownLatch` until the heartbeat thread calls `completed()` method where the latch get count down.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75439711 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -153,21 +155,29 @@ public TwillContainerController start(RunId runId, int instanceId, Class<?> main .addCommand(firstCommand, command.toArray(new String [command.size()] )) .launch(); TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController); + TwillContainerControllerImpl controller = + new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController); controller.start(); return controller; } private static final class TwillContainerControllerImpl extends AbstractZKServiceController implements TwillContainerController { + private final String runnable; + private final int instanceId; private final ProcessController<Void> processController; + // This latch can be used to wait for container shutdown + private final CountDownLatch shutdownLatch; private volatile ContainerLiveNodeData liveData; protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, + protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId, String runnable, int instanceId, ProcessController<Void> processController) { super(runId, zkClient); + this.runnable = runnable; + this.instanceId = instanceId; this.processController = processController; + this.shutdownLatch = new CountDownLatch(1); End diff – The thread that stops the container needs to wait until the container stops. The only good way to know whether the container has stopped or not is to get a notification from RM during the heartbeat (that happens in a separate thread). Hence we wait on a `CountDownLatch` until the heartbeat thread calls `completed()` method where the latch get count down.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75523615

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -177,7 +187,22 @@ protected void doStartUp() {

          @Override
          protected void doShutDown() {

          • // No-op
            + // Wait for sometime for the container to stop
            + // TODO: Use configurable value for stop time
            + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS;
            + try
            Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + }

            catch (InterruptedException e) {
            + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId);
            + // TODO: how do we handle the InterruptedException? Should we restore the interrupted status?
            + return;
            + }
            + // Container has not shutdown even after maxWaitSecs after sending stop message,
            + // we'll need to kill the container
            + LOG.warn("Killing runnable {}, instance {} after waiting {} secs", runnable, instanceId, maxWaitSecs);
            + kill();

              • End diff –

          In case of exception during a kill, I'll add code to retry the kill a few times.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75523615 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -177,7 +187,22 @@ protected void doStartUp() { @Override protected void doShutDown() { // No-op + // Wait for sometime for the container to stop + // TODO: Use configurable value for stop time + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS; + try Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + } catch (InterruptedException e) { + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId); + // TODO: how do we handle the InterruptedException? Should we restore the interrupted status? + return; + } + // Container has not shutdown even after maxWaitSecs after sending stop message, + // we'll need to kill the container + LOG.warn("Killing runnable {}, instance {} after waiting {} secs", runnable, instanceId, maxWaitSecs); + kill(); End diff – In case of exception during a kill, I'll add code to retry the kill a few times.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hsaputra commented on the issue:

          https://github.com/apache/twill/pull/2

          Hi @poornachandra , are you still working on the PR for updates, or ready for review follow up?

          Show
          githubbot ASF GitHub Bot added a comment - Github user hsaputra commented on the issue: https://github.com/apache/twill/pull/2 Hi @poornachandra , are you still working on the PR for updates, or ready for review follow up?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on the issue:

          https://github.com/apache/twill/pull/2

          @hsaputra The PR is ready for review. I would appreciate a review of the overall approach and structure. I'm just adding code to better handle exceptions or adding more test cases now. These commits will be small, and will not change the basic structure of the code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on the issue: https://github.com/apache/twill/pull/2 @hsaputra The PR is ready for review. I would appreciate a review of the overall approach and structure. I'm just adding code to better handle exceptions or adding more test cases now. These commits will be small, and will not change the basic structure of the code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75569082

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -177,7 +188,24 @@ protected void doStartUp() {

          @Override
          protected void doShutDown() {

          • // No-op
            + // Wait for sometime for the container to stop
            + // TODO: Use configurable value for stop time
            + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS;
            + try
            Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + }

            catch (InterruptedException e) {
            + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId);

              • End diff –

          I think it's better to ignore the interrupted exception, or at least clear the thread interrupt flag before calling `await`. This is because

          1. you are not blocking forever anyway
          2. we do want to spend max effort to let the contain finish it's job before it gets killed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75569082 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -177,7 +188,24 @@ protected void doStartUp() { @Override protected void doShutDown() { // No-op + // Wait for sometime for the container to stop + // TODO: Use configurable value for stop time + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS; + try Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + } catch (InterruptedException e) { + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId); End diff – I think it's better to ignore the interrupted exception, or at least clear the thread interrupt flag before calling `await`. This is because 1. you are not blocking forever anyway 2. we do want to spend max effort to let the contain finish it's job before it gets killed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75570480

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() {
          public void kill()

          { processController.cancel(); }

          +
          + private void killAndWait(int maxWaitSecs) {
          + try {
          + Stopwatch watch = new Stopwatch();
          + watch.start();
          + int tries = 0;
          + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
          — End diff –

          Why we need to kill it in a loop? Is that mean the `processController.cancel()` doesn't guarantee it will kill the container?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75570480 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() { public void kill() { processController.cancel(); } + + private void killAndWait(int maxWaitSecs) { + try { + Stopwatch watch = new Stopwatch(); + watch.start(); + int tries = 0; + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { — End diff – Why we need to kill it in a loop? Is that mean the `processController.cancel()` doesn't guarantee it will kill the container?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75570747

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() {
          public void kill()

          { processController.cancel(); }

          +
          + private void killAndWait(int maxWaitSecs) {
          + try {
          + Stopwatch watch = new Stopwatch();
          + watch.start();
          + int tries = 0;
          + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
          — End diff –

          If we have to loop, it's better to have it inside the `ProcessController.cancel()` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75570747 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() { public void kill() { processController.cancel(); } + + private void killAndWait(int maxWaitSecs) { + try { + Stopwatch watch = new Stopwatch(); + watch.start(); + int tries = 0; + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { — End diff – If we have to loop, it's better to have it inside the `ProcessController.cancel()` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75571007

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java —
          @@ -256,20 +261,36 @@ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launcher
          @Override
          public void completed(List<YarnContainerStatus> completed) {
          for (YarnContainerStatus status : completed)

          { + handleCompleted(completed); ids.remove(status.getContainerId()); }

          }
          };

          • runningContainers.stopAll();
            -
          • // Poll for 5 seconds to wait for containers to stop.
          • int count = 0;
          • while (!ids.isEmpty() && count++ < 5) { - amClient.allocate(0.0f, handler); - TimeUnit.SECONDS.sleep(1); - }

            + // Handle heartbeats during shutdown because runningContainers.stopAll() waits until
            + // handleCompleted() is called for every stopped runnable
            + ExecutorService stopPoller = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("stopPoller"));
            + stopPoller.execute(new Runnable() {
            + @Override
            + public void run() {
            + while (!ids.isEmpty()) {
            + try

            { + amClient.allocate(0.0f, handler); + TimeUnit.SECONDS.sleep(1); + }

            catch (InterruptedException e) {
            + // Exit on an interrupt

              • End diff –

          This comment is not accurate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75571007 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java — @@ -256,20 +261,36 @@ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launcher @Override public void completed(List<YarnContainerStatus> completed) { for (YarnContainerStatus status : completed) { + handleCompleted(completed); ids.remove(status.getContainerId()); } } }; runningContainers.stopAll(); - // Poll for 5 seconds to wait for containers to stop. int count = 0; while (!ids.isEmpty() && count++ < 5) { - amClient.allocate(0.0f, handler); - TimeUnit.SECONDS.sleep(1); - } + // Handle heartbeats during shutdown because runningContainers.stopAll() waits until + // handleCompleted() is called for every stopped runnable + ExecutorService stopPoller = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("stopPoller")); + stopPoller.execute(new Runnable() { + @Override + public void run() { + while (!ids.isEmpty()) { + try { + amClient.allocate(0.0f, handler); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + // Exit on an interrupt End diff – This comment is not accurate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75571268

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java —
          @@ -846,24 +875,31 @@ private boolean handleRestartRunnablesInstances(final Message message, final Run
          /**

          • Helper method to restart instances of runnables.
            */
          • private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) {
            + private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds)
            + throws InterruptedException {
              • End diff –

          I don't think this method throws `InterruptedException`. This also mean we don't have to change the `handleRestartRunnablesInstances` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75571268 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java — @@ -846,24 +875,31 @@ private boolean handleRestartRunnablesInstances(final Message message, final Run /** Helper method to restart instances of runnables. */ private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) { + private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) + throws InterruptedException { End diff – I don't think this method throws `InterruptedException`. This also mean we don't have to change the `handleRestartRunnablesInstances` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75572116

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java —
          @@ -846,24 +875,31 @@ private boolean handleRestartRunnablesInstances(final Message message, final Run
          /**

          • Helper method to restart instances of runnables.
            */
          • private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) {
            + private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds)
            + throws InterruptedException {
              • End diff –

          Good point, looks like I missed it during a refactor. Will remove it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75572116 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java — @@ -846,24 +875,31 @@ private boolean handleRestartRunnablesInstances(final Message message, final Run /** Helper method to restart instances of runnables. */ private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) { + private void restartRunnableInstances(String runnableName, @Nullable Set<Integer> instanceIds) + throws InterruptedException { End diff – Good point, looks like I missed it during a refactor. Will remove it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75572272

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -177,7 +188,24 @@ protected void doStartUp() {

          @Override
          protected void doShutDown() {

          • // No-op
            + // Wait for sometime for the container to stop
            + // TODO: Use configurable value for stop time
            + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS;
            + try
            Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + }

            catch (InterruptedException e) {
            + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId);

              • End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75572272 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -177,7 +188,24 @@ protected void doStartUp() { @Override protected void doShutDown() { // No-op + // Wait for sometime for the container to stop + // TODO: Use configurable value for stop time + int maxWaitSecs = Constants.APPLICATION_MAX_STOP_SECONDS; + try Unknown macro: { + if (shutdownLatch.await(maxWaitSecs, TimeUnit.SECONDS)) { + return; + } + } catch (InterruptedException e) { + LOG.error("Got exception while waiting for runnable {}, instance {} to stop", runnable, instanceId); End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on a diff in the pull request:

          https://github.com/apache/twill/pull/2#discussion_r75572275

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() {
          public void kill()

          { processController.cancel(); }

          +
          + private void killAndWait(int maxWaitSecs) {
          + try {
          + Stopwatch watch = new Stopwatch();
          + watch.start();
          + int tries = 0;
          + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
          — End diff –

          Agreed, I'll make the change

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on a diff in the pull request: https://github.com/apache/twill/pull/2#discussion_r75572275 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -225,5 +257,35 @@ public ContainerLiveNodeData getLiveNodeData() { public void kill() { processController.cancel(); } + + private void killAndWait(int maxWaitSecs) { + try { + Stopwatch watch = new Stopwatch(); + watch.start(); + int tries = 0; + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { — End diff – Agreed, I'll make the change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user poornachandra commented on the issue:

          https://github.com/apache/twill/pull/2

          @chtyim Except for moving the kill loop to `ProcessController.cancel()` method, I have addressed all other comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user poornachandra commented on the issue: https://github.com/apache/twill/pull/2 @chtyim Except for moving the kill loop to `ProcessController.cancel()` method, I have addressed all other comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user poornachandra opened a pull request:

          https://github.com/apache/twill/pull/4

          TWILL-190 Wait for Twill runnables to stop when restarting them

          Also kill non-responding Twill runnables after a timeout.

          JIRA - https://issues.apache.org/jira/browse/TWILL-190

          Opening against master this time

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/poornachandra/twill feature/kill-runnable

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/twill/pull/4.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4


          commit 740a144026b79903cf1460be447e88cfa8b7ea1d
          Author: poorna <poorna@cask.co>
          Date: 2016-08-17T08:12:36Z

          TWILL-190 Wait for Twill runnables to stop when restarting them, also kill non-responding Twill runnables after a timeout

          commit 1941ddf38836787bedfcfec834962398f624c204
          Author: poorna <poorna@cask.co>
          Date: 2016-08-25T20:08:36Z

          Reduce wait time when stopping or killing a runnable


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user poornachandra opened a pull request: https://github.com/apache/twill/pull/4 TWILL-190 Wait for Twill runnables to stop when restarting them Also kill non-responding Twill runnables after a timeout. JIRA - https://issues.apache.org/jira/browse/TWILL-190 Opening against master this time You can merge this pull request into a Git repository by running: $ git pull https://github.com/poornachandra/twill feature/kill-runnable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/twill/pull/4.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4 commit 740a144026b79903cf1460be447e88cfa8b7ea1d Author: poorna <poorna@cask.co> Date: 2016-08-17T08:12:36Z TWILL-190 Wait for Twill runnables to stop when restarting them, also kill non-responding Twill runnables after a timeout commit 1941ddf38836787bedfcfec834962398f624c204 Author: poorna <poorna@cask.co> Date: 2016-08-25T20:08:36Z Reduce wait time when stopping or killing a runnable
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/4#discussion_r76317898

          — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java —
          @@ -220,5 +250,31 @@ public ContainerLiveNodeData getLiveNodeData() {
          public void kill()

          { processController.cancel(); }

          +
          + private void killAndWait(int maxWaitSecs) {
          + Stopwatch watch = new Stopwatch();
          + watch.start();
          + int tries = 0;
          + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) {
          + // Kill the application
          + try

          { + ++tries; + kill(); + }

          catch (Exception e) {
          + LOG.error("Exception while killing runnable {}, instance {}", runnable, instanceId, e);
          + }
          +
          + // Wait on the shutdownLatch,
          + // if the runnable has stopped then the latch will be count down by completed() method
          + if (Uninterruptibles.awaitUninterruptibly(shutdownLatch, 10, TimeUnit.SECONDS))

          { + // Runnable has stopped now + return; + }

          + }
          +
          + // Timeout reached, runnable has not stopped
          + LOG.error("Failed to kill runnable {}, instance {} after {} tries", runnable, instanceId, tries);
          — End diff –

          Showing the number of tries is quite artificial since the retry is based on time. I think it's better to just say failed to kill after n seconds.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/4#discussion_r76317898 — Diff: twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java — @@ -220,5 +250,31 @@ public ContainerLiveNodeData getLiveNodeData() { public void kill() { processController.cancel(); } + + private void killAndWait(int maxWaitSecs) { + Stopwatch watch = new Stopwatch(); + watch.start(); + int tries = 0; + while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { + // Kill the application + try { + ++tries; + kill(); + } catch (Exception e) { + LOG.error("Exception while killing runnable {}, instance {}", runnable, instanceId, e); + } + + // Wait on the shutdownLatch, + // if the runnable has stopped then the latch will be count down by completed() method + if (Uninterruptibles.awaitUninterruptibly(shutdownLatch, 10, TimeUnit.SECONDS)) { + // Runnable has stopped now + return; + } + } + + // Timeout reached, runnable has not stopped + LOG.error("Failed to kill runnable {}, instance {} after {} tries", runnable, instanceId, tries); — End diff – Showing the number of tries is quite artificial since the retry is based on time. I think it's better to just say failed to kill after n seconds.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on a diff in the pull request:

          https://github.com/apache/twill/pull/4#discussion_r76318343

          — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java —
          @@ -268,20 +270,33 @@ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launcher
          @Override
          public void completed(List<YarnContainerStatus> completed) {
          for (YarnContainerStatus status : completed)

          { + handleCompleted(completed); ids.remove(status.getContainerId()); }

          }
          };

          • runningContainers.stopAll();
            -
          • // Poll for 5 seconds to wait for containers to stop.
          • int count = 0;
          • while (!ids.isEmpty() && count++ < 5) { - amClient.allocate(0.0f, handler); - TimeUnit.SECONDS.sleep(1); - }

            + // Handle heartbeats during shutdown because runningContainers.stopAll() waits until
            + // handleCompleted() is called for every stopped runnable
            + ExecutorService stopPoller = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("stopPoller"));
            + stopPoller.execute(new Runnable() {
            + @Override
            + public void run() {
            + while (!ids.isEmpty()) {
            + try {
            + amClient.allocate(0.0f, handler);
            + TimeUnit.SECONDS.sleep(1);

              • End diff –

          Should check if `ids` is already emptied before sleeping, since the call the `allocate` may already have the ids emptied by the handler and we don't have the sleep for an extra second for that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/4#discussion_r76318343 — Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java — @@ -268,20 +270,33 @@ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launcher @Override public void completed(List<YarnContainerStatus> completed) { for (YarnContainerStatus status : completed) { + handleCompleted(completed); ids.remove(status.getContainerId()); } } }; runningContainers.stopAll(); - // Poll for 5 seconds to wait for containers to stop. int count = 0; while (!ids.isEmpty() && count++ < 5) { - amClient.allocate(0.0f, handler); - TimeUnit.SECONDS.sleep(1); - } + // Handle heartbeats during shutdown because runningContainers.stopAll() waits until + // handleCompleted() is called for every stopped runnable + ExecutorService stopPoller = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("stopPoller")); + stopPoller.execute(new Runnable() { + @Override + public void run() { + while (!ids.isEmpty()) { + try { + amClient.allocate(0.0f, handler); + TimeUnit.SECONDS.sleep(1); End diff – Should check if `ids` is already emptied before sleeping, since the call the `allocate` may already have the ids emptied by the handler and we don't have the sleep for an extra second for that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chtyim commented on the issue:

          https://github.com/apache/twill/pull/4

          LGTM

          Show
          githubbot ASF GitHub Bot added a comment - Github user chtyim commented on the issue: https://github.com/apache/twill/pull/4 LGTM
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/twill/pull/4

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/twill/pull/4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/twill/pull/2

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/twill/pull/2

            People

            • Assignee:
              poornachandra Poorna Chandra
              Reporter:
              poornachandra Poorna Chandra
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development