Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5830

OutOfMemoryError during notify final state in TaskExecutor may cause job stuck

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: None
    • Labels:
      None

      Description

      The scenario is like this:

      JobMaster tries to cancel all the executions when process failed execution, and the task executor already acknowledge the cancel rpc message.
      When notify the final state in TaskExecutor, it causes OOM in AkkaRpcActor and this error is caught to log the info. The final state will not be sent any more.
      The JobMaster can not receive the final state and trigger the restart strategy.

      One solution is to catch the OutOfMemoryError and throw it, then it will cause to shut down the ActorSystem resulting in exiting the TaskExecutor. The JobMaster can be notified of TaskExecutor failure and fail all the tasks to trigger restart successfully.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhijiangW opened a pull request:

          https://github.com/apache/flink/pull/3360

          FLINK-5830[Distributed Coordination] Handle OutOfMemory error during process async message in akka rpc actor

          If caught OOM error during process async messages in *AkkaRpcActor, it will bring ambiguous behavior and may lost rpc messages. If the message is for notifying final state in **TaskExecutor, it will result in **JobMaster* can not receive final state any more during process failing job, and may cause job stuck in final.

          The solution is to catch this special error in *AkkaRpcActor* and throw it, then it will result in shutting down *ActorSystem* and exiting *TaskExecutor* process. So the *JobMaster* can be aware of that and make the job restart if necessary.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhijiangW/flink FLINK-5830

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3360.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3360


          commit 1365c6da1c456d764a3171c858bce81511ed8da5
          Author: 淘江 <taojiang.wzj@alibaba-inc.com>
          Date: 2017-02-20T09:54:54Z

          FLINK-5830[Distributed Coordination]Handle OutOfMemory error during process async message in akka rpc actor


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/3360 FLINK-5830 [Distributed Coordination] Handle OutOfMemory error during process async message in akka rpc actor If caught OOM error during process async messages in * AkkaRpcActor , it will bring ambiguous behavior and may lost rpc messages. If the message is for notifying final state in **TaskExecutor , it will result in **JobMaster * can not receive final state any more during process failing job, and may cause job stuck in final. The solution is to catch this special error in * AkkaRpcActor * and throw it, then it will result in shutting down * ActorSystem * and exiting * TaskExecutor * process. So the * JobMaster * can be aware of that and make the job restart if necessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-5830 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3360 commit 1365c6da1c456d764a3171c858bce81511ed8da5 Author: 淘江 <taojiang.wzj@alibaba-inc.com> Date: 2017-02-20T09:54:54Z FLINK-5830 [Distributed Coordination] Handle OutOfMemory error during process async message in akka rpc actor
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3360

          I would suggest that we adopt the following pattern for all the places like the one in this pull request where we catch Throwables:

          ```java
          try

          { ... } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); // the other handling logic... }
          ```

          This requires to add the function `rethrowIfFatalErrorOrOOM(Throwable)` to the `ExceptionUtils`, similar to the method here: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L109

          It would be even nicer if we could do something like Scala supports, but I think there is no way better way to do this in Java than the way suggested above
          ```scala
          try { ... }

          catch {
          case NonFatal(t) => // does not include OOM and internal errors
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3360 I would suggest that we adopt the following pattern for all the places like the one in this pull request where we catch Throwables: ```java try { ... } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); // the other handling logic... } ``` This requires to add the function `rethrowIfFatalErrorOrOOM(Throwable)` to the `ExceptionUtils`, similar to the method here: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L109 It would be even nicer if we could do something like Scala supports, but I think there is no way better way to do this in Java than the way suggested above ```scala try { ... } catch { case NonFatal(t) => // does not include OOM and internal errors ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3360

          @StephanEwen , thank you for so quick reviews!

          That is a good idea to add the uniform way in the utils, so we can use that in anywhere.

          I will fix it as your suggestions later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , thank you for so quick reviews! That is a good idea to add the uniform way in the utils, so we can use that in anywhere. I will fix it as your suggestions later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3360

          @StephanEwen , already submit the modifications.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , already submit the modifications.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3360

          I think adding this safety net makes sense and protects against a corrupted state.

          However, isn't the root cause of the described problem that the JobMaster-TaskExecutor communication does not tolerate a lost message? Maybe we should introduce an acknowledge message which signals the correct reception of the status message. If the response times out we can either retry to send it or fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3360 I think adding this safety net makes sense and protects against a corrupted state. However, isn't the root cause of the described problem that the JobMaster-TaskExecutor communication does not tolerate a lost message? Maybe we should introduce an acknowledge message which signals the correct reception of the status message. If the response times out we can either retry to send it or fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3360

          Hi @tillrohrmann , thank you for reviews and positive suggestions!

          I try to explain the root case of this issue first:

          From JobMaster side, it sends the cancel rpc message and gets the acknowledge from TaskExecutor, then the execution state transition to *CANCELING*.

          From TaskExecutor side, it would notify the final state to JobMaster before task exits. The *notifyFinalState* can be divided into two steps:

          • Execute the *RunAsync* message by akka actor and this is a tell action, and it will trigger to run *unregisterTaskAndNotifyFinalState*.
          • In process of *unregisterTaskAndNotifyFinalState, it will trigger the rpc message of **updateTaskExecutionState* , and it is a ask action, so the mechanism can avoid lost message.

          The problem is that it may cause OOM before trigger *updateTaskExecutionState, and this error is caught by **AkkaRpcActor* and does not do anything resulting in interrupting the following process. The *updateTaskExecutionState* will not be executed anymore.

          For the key point interaction between TaskExecutor and JobMaster, it should not tolerate lost message, and I agree with your above suggestions. So there may be two ideas for this improvement:

          • Enhance the robustness of *notifyFinalState*, and the current rethrow OOM is an easy option, but it will cause the TaskExecutor exit,there should be other ways to make the cost reduction.
          • After get cancel acknowledge in JobMaster side, it will trigger a timeout to check the execution final state. If the execution has not entered the final state within timeout, the JobMaster can resend the acknowledge message to TaskExecutor to confirm the status.

          And I prefers the first way to just make it sense in one side, avoid the complex interaction between TaskExecutor and JobMaster.

          Wish your further suggestions or any ideas.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 Hi @tillrohrmann , thank you for reviews and positive suggestions! I try to explain the root case of this issue first: From JobMaster side, it sends the cancel rpc message and gets the acknowledge from TaskExecutor, then the execution state transition to * CANCELING *. From TaskExecutor side, it would notify the final state to JobMaster before task exits. The * notifyFinalState * can be divided into two steps: Execute the * RunAsync * message by akka actor and this is a tell action, and it will trigger to run * unregisterTaskAndNotifyFinalState *. In process of * unregisterTaskAndNotifyFinalState , it will trigger the rpc message of **updateTaskExecutionState * , and it is a ask action, so the mechanism can avoid lost message. The problem is that it may cause OOM before trigger * updateTaskExecutionState , and this error is caught by **AkkaRpcActor * and does not do anything resulting in interrupting the following process. The * updateTaskExecutionState * will not be executed anymore. For the key point interaction between TaskExecutor and JobMaster, it should not tolerate lost message, and I agree with your above suggestions. So there may be two ideas for this improvement: Enhance the robustness of * notifyFinalState *, and the current rethrow OOM is an easy option, but it will cause the TaskExecutor exit,there should be other ways to make the cost reduction. After get cancel acknowledge in JobMaster side, it will trigger a timeout to check the execution final state. If the execution has not entered the final state within timeout, the JobMaster can resend the acknowledge message to TaskExecutor to confirm the status. And I prefers the first way to just make it sense in one side, avoid the complex interaction between TaskExecutor and JobMaster. Wish your further suggestions or any ideas.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3360

          Looking at this from another angle: If any Runnable that is scheduled ever lets an exception bubble out, can we still assume that the JobManager is in a sane state? Or should be actually make every uncaught exception in the RPC executors a fatal error and send a `notifyFatalError` to the `RpcEndpoint`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3360 Looking at this from another angle: If any Runnable that is scheduled ever lets an exception bubble out, can we still assume that the JobManager is in a sane state? Or should be actually make every uncaught exception in the RPC executors a fatal error and send a `notifyFatalError` to the `RpcEndpoint`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3360

          Thanks for the clarification @zhijiangW. I know understand the problem that we effectively introduce via `RpcEndpoint.runAsync` another message which might get "lost" (e.g. due to OOM exception).

          I agree with Stephan that it's hard to reason about the consistency of the `AkkaRpcActor
          s` internal state once we see an exception. The conservative approach would probably be to let it terminate or calling `notifyFatalError` to handle it.

          Related to this is also how we handle exceptions in the `AkkaRpcActor.handleRpcInvocation`. There we catch all exception and simply send them to the caller. I think in this method we should only send the non-fatal exceptions back and terminate otherwise.

          To follow a similar pattern for the `handleRunAsync` we could think about returning a `Future` which we return when calling `RpcEndpoint.runAsync` which will be completed with non fatal exceptions or if the `Runnable` has been executed. And in case that we see a fatal exception we terminate or call `notifyFatalError`. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3360 Thanks for the clarification @zhijiangW. I know understand the problem that we effectively introduce via `RpcEndpoint.runAsync` another message which might get "lost" (e.g. due to OOM exception). I agree with Stephan that it's hard to reason about the consistency of the `AkkaRpcActor s` internal state once we see an exception. The conservative approach would probably be to let it terminate or calling `notifyFatalError` to handle it. Related to this is also how we handle exceptions in the `AkkaRpcActor.handleRpcInvocation`. There we catch all exception and simply send them to the caller. I think in this method we should only send the non-fatal exceptions back and terminate otherwise. To follow a similar pattern for the `handleRunAsync` we could think about returning a `Future` which we return when calling `RpcEndpoint.runAsync` which will be completed with non fatal exceptions or if the `Runnable` has been executed. And in case that we see a fatal exception we terminate or call `notifyFatalError`. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3360

          @StephanEwen , if the exception is bubbled out, and cause TaskExecutor to exit as a result, I think the JobMaster can be assumed in a sane state in final based on detection of TaskExecutor failure.

          The current solution just refers to OOM error, maybe it can extend to any exceptions, because it is difficult to confirm the consistency of the internal state and the conservative approach is to let it terminate as @tillrohrmann said.

          If I understand correctly from @tillrohrmann 's suggestions, the *RpcEndpoint.runAsync* method would modify to return a *Future* that is similar with *RpcEndpoint.callAsync, but still a **Tell* action in akka. And this *Future* should be set as a field in *RunAsync* in order to get it when handle in *AkkaRpcActor. The **Future* can help to determine whether the message is executed successfully or lost to enhance the *Tell* mechanism. If the *Future* with *Tell* action is better than current ** RpcEndpoint .callAsync** which refers to *Ask* action, I will try to do for that. Or another option is tolerating the lost message in current *RpcEndpoint.runAsync, and it should be used in such scenarios for efficiency and not safe. For the important interaction, it should resort to **RpcEndpoint.callAsync*. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , if the exception is bubbled out, and cause TaskExecutor to exit as a result, I think the JobMaster can be assumed in a sane state in final based on detection of TaskExecutor failure. The current solution just refers to OOM error, maybe it can extend to any exceptions, because it is difficult to confirm the consistency of the internal state and the conservative approach is to let it terminate as @tillrohrmann said. If I understand correctly from @tillrohrmann 's suggestions, the * RpcEndpoint.runAsync * method would modify to return a * Future * that is similar with * RpcEndpoint.callAsync , but still a **Tell * action in akka. And this * Future * should be set as a field in * RunAsync * in order to get it when handle in * AkkaRpcActor . The **Future * can help to determine whether the message is executed successfully or lost to enhance the * Tell * mechanism. If the * Future * with * Tell * action is better than current ** RpcEndpoint .callAsync** which refers to * Ask * action, I will try to do for that. Or another option is tolerating the lost message in current * RpcEndpoint.runAsync , and it should be used in such scenarios for efficiency and not safe. For the important interaction, it should resort to **RpcEndpoint.callAsync *. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3360

          Merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3360 Merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3360

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3360
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in 527eabdd4fb3e34b0698b53ec9a7fb1348882791

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 527eabdd4fb3e34b0698b53ec9a7fb1348882791

            People

            • Assignee:
              zjwang zhijiang
              Reporter:
              zjwang zhijiang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development