Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6295

Update suspended ExecutionGraph to lower latency

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Webfrontend
    • Labels:
      None

      Description

      Now in ExecutionGraphHolder, which is used in many handlers, we use a WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage collection.

      The latency is too high when JVM do GC rarely, which will make status of jobs or its tasks unmatched with the real ones.

      LoadingCache is a common used cache implementation from guava lib, we can use its time based eviction to lower latency of status update.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user WangTaoTheTonic opened a pull request:

          https://github.com/apache/flink/pull/3709

          FLINK-6295use LoadingCache instead of WeakHashMap to lower latency

          Now in ExecutionGraphHolder, which is used in many handlers, we use a WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage collection.

          The latency is too high when JVM do GC rarely, which will make status of jobs or its tasks unmatched with the real ones. (WE once observed that the web still shows tasks cancelled/failed, after the actual states of tasks coming back to normal for *30+ mins,* until a gc happened)

          LoadingCache is a common used cache implementation from guava lib, we can use its time based eviction to lower latency of status update.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6295

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3709.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3709


          commit d76ced06242623d150f9ad09205e2b92f910c1a1
          Author: WangTaoTheTonic <wangtao111@huawei.com>
          Date: 2017-04-11T11:48:52Z

          use LoadingCache instead of WeakHashMap to lower latency


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user WangTaoTheTonic opened a pull request: https://github.com/apache/flink/pull/3709 FLINK-6295 use LoadingCache instead of WeakHashMap to lower latency Now in ExecutionGraphHolder, which is used in many handlers, we use a WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage collection. The latency is too high when JVM do GC rarely, which will make status of jobs or its tasks unmatched with the real ones. (WE once observed that the web still shows tasks cancelled/failed, after the actual states of tasks coming back to normal for * 30+ mins, * until a gc happened) LoadingCache is a common used cache implementation from guava lib, we can use its time based eviction to lower latency of status update. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6295 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3709.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3709 commit d76ced06242623d150f9ad09205e2b92f910c1a1 Author: WangTaoTheTonic <wangtao111@huawei.com> Date: 2017-04-11T11:48:52Z use LoadingCache instead of WeakHashMap to lower latency
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wenlong88 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3709#discussion_r111308326

          — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java —
          @@ -48,7 +52,41 @@

          private final FiniteDuration timeout;

          • private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
            + private AtomicReference<ActorGateway> jobManagerRef = new AtomicReference<>(null);
            +
            + private final LoadingCache<JobID, AccessExecutionGraph> cache =
            + CacheBuilder.newBuilder()
            + .maximumSize(1000)
            + .expireAfterWrite(30, TimeUnit.SECONDS)
            + .build(new CacheLoader<JobID, AccessExecutionGraph>() {
            + @Override
            + public AccessExecutionGraph load(JobID jobID) throws Exception {
            + try {
            + if (jobManagerRef.get() != null) {
            + Future<Object> future = jobManagerRef.get().ask(new JobManagerMessages.RequestJob(jobID), timeout);
            + Object result = Await.result(future, timeout);
            +
            + if (result instanceof JobManagerMessages.JobNotFound) {
            + return null;
              • End diff –

          CacheLoader do not support return null.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3709#discussion_r111308326 — Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java — @@ -48,7 +52,41 @@ private final FiniteDuration timeout; private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>(); + private AtomicReference<ActorGateway> jobManagerRef = new AtomicReference<>(null); + + private final LoadingCache<JobID, AccessExecutionGraph> cache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(30, TimeUnit.SECONDS) + .build(new CacheLoader<JobID, AccessExecutionGraph>() { + @Override + public AccessExecutionGraph load(JobID jobID) throws Exception { + try { + if (jobManagerRef.get() != null) { + Future<Object> future = jobManagerRef.get().ask(new JobManagerMessages.RequestJob(jobID), timeout); + Object result = Await.result(future, timeout); + + if (result instanceof JobManagerMessages.JobNotFound) { + return null; End diff – CacheLoader do not support return null.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          It shouldn't matter for the display in the web-frontend in which data structure the cached ExecutionGraphs are being held. We are caching the actual ExecutionGraph that the JobManager works with and not some copy, thus there is simply no way for the handler to work with outdated data. This implies that the entire premise of this issue is flawed.

          It's more likely some web-related issue like the page not refreshing automatically or being cached by the browser for an inexplicable long time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 It shouldn't matter for the display in the web-frontend in which data structure the cached ExecutionGraphs are being held. We are caching the actual ExecutionGraph that the JobManager works with and not some copy, thus there is simply no way for the handler to work with outdated data. This implies that the entire premise of this issue is flawed. It's more likely some web-related issue like the page not refreshing automatically or being cached by the browser for an inexplicable long time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          @wenlong88 LoadingCache can also cache and evict data as WeakHashMap, as this implementation shows it will evict data every 30 seconds and fetch data if it doesn't contain the required key.

          @zentol You're right. The data structures used doesn't matter, while what is showed in web frontend and how they are updated does. I don't think user can tasks' stauts update only triggered by JobManager GC(which could be a very long time).

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @wenlong88 LoadingCache can also cache and evict data as WeakHashMap, as this implementation shows it will evict data every 30 seconds and fetch data if it doesn't contain the required key. @zentol You're right. The data structures used doesn't matter, while what is showed in web frontend and how they are updated does. I don't think user can tasks' stauts update only triggered by JobManager GC(which could be a very long time).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic You still haven't explained why the JobManager GC has anything to do with the update in the web-ui.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic You still haven't explained why the JobManager GC has anything to do with the update in the web-ui.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          @zentol The execution graphs cached in `ExecutionGraphHolder`(which is backed by a WeakHashMap) will be evicted only when gc happens.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @zentol The execution graphs cached in `ExecutionGraphHolder`(which is backed by a WeakHashMap) will be evicted only when gc happens.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic I get that, but when the EG is garbage-collected it also means the job was removed from the JM. Not just from the set of running jobs, but also from the history of finished jobs. While that does mean it can still be displayed in the web-ui until the GC happens, it doesn't explain that the display of task states is outdated. Especially since you say the tasks were actually running later on, which contradicts the idea that the EG was GC'd in the first place.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic I get that, but when the EG is garbage-collected it also means the job was removed from the JM. Not just from the set of running jobs, but also from the history of finished jobs. While that does mean it can still be displayed in the web-ui until the GC happens, it doesn't explain that the display of task states is outdated. Especially since you say the tasks were actually running later on, which contradicts the idea that the EG was GC'd in the first place.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          @zentol No you're wrong.

          If you take a look at `ExecutionGraphHolder`, you'll find the graphs in it are generated from message answered by JobManager, which means there's no reference from JobManager but only from handlers in netty web backend. Once there's no reference from those handlers, they would be garbage collected no matter the actual job is running or recovering.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @zentol No you're wrong. If you take a look at `ExecutionGraphHolder`, you'll find the graphs in it are generated from message answered by JobManager, which means there's no reference from JobManager but only from handlers in netty web backend. Once there's no reference from those handlers, they would be garbage collected no matter the actual job is running or recovering.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          The web backend and JM work with the same ActorSystem; as a result all transmitted objects are neither serialized nor copied but simply passed around through a local akka channel, which means that they are, in fact, the exact same object.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 The web backend and JM work with the same ActorSystem; as a result all transmitted objects are neither serialized nor copied but simply passed around through a local akka channel, which means that they are, in fact, the exact same object.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          If this were not the case the you'd get NotSerializableExceptions when attempting to transfer the EG.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 If this were not the case the you'd get NotSerializableExceptions when attempting to transfer the EG.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          I'm not a akka expert. As we observed, the status of cancelled tasks will be updated to running only when gc happens in JM.
          Way to reproduce:
          1. launch a flink job with ha mode
          2. restart zookeeper(to make tasks failed)
          3. after tasks recovered, check if status of tasks are running or cancelled(if there's gc happens, tasks' status showed in web frontend will be same with the actual states, or the tasks' status are delayed, may cause inconsistend with those in backend)

          We oberved such phenomemon in yarn mode, and it is fixed after this patch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 I'm not a akka expert. As we observed, the status of cancelled tasks will be updated to running only when gc happens in JM. Way to reproduce: 1. launch a flink job with ha mode 2. restart zookeeper(to make tasks failed) 3. after tasks recovered, check if status of tasks are running or cancelled(if there's gc happens, tasks' status showed in web frontend will be same with the actual states, or the tasks' status are delayed, may cause inconsistend with those in backend) We oberved such phenomemon in yarn mode, and it is fixed after this patch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          That this happens when HA is enabled is a really important detail; finally figured out what happens.

          Here's roughly what going on:

          • EG A is cached by EGHolder for ID_A
          • ZK goes down
          • JM revokes leadership, throws out all jobs, specifically EG A stored under ID_A
          • ZK starts up again
          • (in your case the same) JM gets leadership back
          • JM recovers jobs, which means creating a new EG B, the ID of which is ID_A again

          When a new request hits the EGHolder cache for ID_A there's still the old EG cached. That old EG will remain there until the GC kicks in, from which point on the new EG is used.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 That this happens when HA is enabled is a really important detail; finally figured out what happens. Here's roughly what going on: EG A is cached by EGHolder for ID_A ZK goes down JM revokes leadership, throws out all jobs, specifically EG A stored under ID_A ZK starts up again (in your case the same) JM gets leadership back JM recovers jobs, which means creating a new EG B, the ID of which is ID_A again When a new request hits the EGHolder cache for ID_A there's still the old EG cached. That old EG will remain there until the GC kicks in, from which point on the new EG is used.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Give a bit time to think of other ways to solve this; we are trying to reduce the usage of guava, let's see if there isn't another way. 30 seconds still seem like a long time to server an old EG.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Give a bit time to think of other ways to solve this; we are trying to reduce the usage of guava, let's see if there isn't another way. 30 seconds still seem like a long time to server an old EG.
          Hide
          WangTao Tao Wang added a comment -

          Thanks for the digging. I guess that's the reason

          -----------------
          Regards.
          On 04/18/2017 20:34, ASF GitHub Bot (JIRA) wrote:

          [ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972584#comment-15972584 ]

          ASF GitHub Bot commented on FLINK-6295:
          ---------------------------------------

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Give a bit time to think of other ways to solve this; we are trying to reduce the usage of guava, let's see if there isn't another way. 30 seconds still seem like a long time to server an old EG.


          This message was sent by Atlassian JIRA
          (v6.3.15#6346)

          Show
          WangTao Tao Wang added a comment - Thanks for the digging. I guess that's the reason ----------------- Regards. On 04/18/2017 20:34, ASF GitHub Bot (JIRA) wrote: [ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972584#comment-15972584 ] ASF GitHub Bot commented on FLINK-6295 : --------------------------------------- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Give a bit time to think of other ways to solve this; we are trying to reduce the usage of guava, let's see if there isn't another way. 30 seconds still seem like a long time to server an old EG. – This message was sent by Atlassian JIRA (v6.3.15#6346)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Assuming that some JM will be granted leadership and recovers the suspended jobs, the easiest solution would be to simply check in the EGHolder whether the cached job is in the SUSPENDED state, and remove it from the cache if it is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Assuming that some JM will be granted leadership and recovers the suspended jobs, the easiest solution would be to simply check in the EGHolder whether the cached job is in the SUSPENDED state, and remove it from the cache if it is.
          Hide
          WangTao Tao Wang added a comment -

          I doubt it. EGH is simply a cache, I don't think it should be assigned too heavy task. Another concern is that not just JM failover affecting status of tasks.

          -----------------
          Regards.
          On 04/18/2017 21:06, ASF GitHub Bot (JIRA) wrote:

          [ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972628#comment-15972628 ]

          ASF GitHub Bot commented on FLINK-6295:
          ---------------------------------------

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Assuming that some JM will be granted leadership and recovers the suspended jobs, the easiest solution would be to simply check in the EGHolder whether the cached job is in the SUSPENDED state, and remove it from the cache if it is.


          This message was sent by Atlassian JIRA
          (v6.3.15#6346)

          Show
          WangTao Tao Wang added a comment - I doubt it. EGH is simply a cache, I don't think it should be assigned too heavy task. Another concern is that not just JM failover affecting status of tasks. ----------------- Regards. On 04/18/2017 21:06, ASF GitHub Bot (JIRA) wrote: [ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972628#comment-15972628 ] ASF GitHub Bot commented on FLINK-6295 : --------------------------------------- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Assuming that some JM will be granted leadership and recovers the suspended jobs, the easiest solution would be to simply check in the EGHolder whether the cached job is in the SUSPENDED state, and remove it from the cache if it is. – This message was sent by Atlassian JIRA (v6.3.15#6346)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          In my opinion EGHolder is simply a cache which should not be assigned too complicated task.

          If we add the check logic, how long it should be? Will other events afftects status of tasks? I believe there're more concerns if we added it.

          This fix only change internal data structures and decouple with both JobManager and web frontend.

          I am not sure why we are reducing usage of guava, but it sounds not a very good idea

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 In my opinion EGHolder is simply a cache which should not be assigned too complicated task. If we add the check logic, how long it should be? Will other events afftects status of tasks? I believe there're more concerns if we added it. This fix only change internal data structures and decouple with both JobManager and web frontend. I am not sure why we are reducing usage of guava, but it sounds not a very good idea
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic Because everyone uses guava which results again and again in dependency conflicts.

          What do you mean with "how long it should be"? We remove the job from the cache and that's it. If more request for that job come in nothing will be returned resulting in the response you get when querying for a non-existing job, which is an accurate representation of the state of the JobManager. If the same JM recovers the job then it is no longer in a SUSPENDED state and will be added to the cache again. If another JM picks the job up the web-ui will be redirected to that JM and everything's fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic Because everyone uses guava which results again and again in dependency conflicts. What do you mean with "how long it should be"? We remove the job from the cache and that's it. If more request for that job come in nothing will be returned resulting in the response you get when querying for a non-existing job, which is an accurate representation of the state of the JobManager. If the same JM recovers the job then it is no longer in a SUSPENDED state and will be added to the cache again. If another JM picks the job up the web-ui will be redirected to that JM and everything's fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          I mean who's in charge of updating EGHolder? EGHolder itself or JobManager? EGHolder don't sense status changing of jobs until it queries from JobManager periodically.

          If JobManager took the responsibility, so it will be a listenser design pattern, i guess? Would it be too complicated as now EGHolder is just a light weighted cache?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 I mean who's in charge of updating EGHolder? EGHolder itself or JobManager? EGHolder don't sense status changing of jobs until it queries from JobManager periodically. If JobManager took the responsibility, so it will be a listenser design pattern, i guess? Would it be too complicated as now EGHolder is just a light weighted cache?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          eh... in charge?

          Whenever anything related to a job is requested from the web-ui the EGHolder is accessed.

          Suppose you have the job info page (/jobs/:jobid) open in a browser or smth. The WebUI periodically sends requests to the backend, which will asks the EGHolder, which then asks the JM if it doesn't find the job in the cache. Now, if we remove the suspended EG we will in fact keep polling the JM until the job was recovered.

          This is actually the same behavior that you would have if the job is suspended and the GC/guava cache starts right away rr if the job was resumed on another JM but you aren't refreshing the webUI (which should redirect to the current leader).

          So for adding entries nothing changes; for removing entries the GC is still mostly in charge; we're just adding a small 2-line branch to invalidate suspended ExecutionGraphs that is activated if a handler accesses the EGHolder.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 eh... in charge? Whenever anything related to a job is requested from the web-ui the EGHolder is accessed. Suppose you have the job info page (/jobs/:jobid) open in a browser or smth. The WebUI periodically sends requests to the backend, which will asks the EGHolder, which then asks the JM if it doesn't find the job in the cache. Now, if we remove the suspended EG we will in fact keep polling the JM until the job was recovered. This is actually the same behavior that you would have if the job is suspended and the GC/guava cache starts right away rr if the job was resumed on another JM but you aren't refreshing the webUI (which should redirect to the current leader). So for adding entries nothing changes; for removing entries the GC is still mostly in charge; we're just adding a small 2-line branch to invalidate suspended ExecutionGraphs that is activated if a handler accesses the EGHolder.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          Ok i think i've got your point.

          Now using WeakHashMap, we add entries when the map doesn't contain the requested EG id, remove invalid entries when GC happens.

          By adding `small 2-line branch` as you suggest, we add entries as same way as before, but check if a entry is valid when it's accessed by a handler, and update/remove it if it's invalid. Is it right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 Ok i think i've got your point. Now using WeakHashMap, we add entries when the map doesn't contain the requested EG id, remove invalid entries when GC happens. By adding `small 2-line branch` as you suggest, we add entries as same way as before, but check if a entry is valid when it's accessed by a handler, and update/remove it if it's invalid. Is it right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic Yes, that is correct. @zentol's suggestion should work.

          On access, if the `JobStatus` is suspended, remove the entry from the `WeakHashMap`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic Yes, that is correct. @zentol's suggestion should work. On access, if the `JobStatus` is suspended, remove the entry from the `WeakHashMap`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          That means every time EGHolder received a request, it will check if the job status in request is suspended or not, right? This will make cache in EGHolder unmeaningful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 That means every time EGHolder received a request, it will check if the job status in request is suspended or not, right? This will make cache in EGHolder unmeaningful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic The purpose of the cache is to reduce queries to the JobManager; and since the state of the job is available through the ExecutionGraph the cache still fulfills its purpose.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic The purpose of the cache is to reduce queries to the JobManager; and since the state of the job is available through the ExecutionGraph the cache still fulfills its purpose.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          @zentol How do we know if a job requested is supended or not, as the status of jobs in backend is alway changing?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @zentol How do we know if a job requested is supended or not, as the status of jobs in backend is alway changing?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic Doesn't matter that the job status is ever changing, we only care about the state at the time of the request.

          There are 2 cases to consider when accessing the cache for a given ID:

          *a) An EG was cached for the given ID*

          In this case we can check the state of the job via `AccessExceutionGraph#getState`. Modify the this block in `ExecutionGraphHolder`

          ```
          if (cached != null)

          { return cached; }

          ```
          to this
          ```
          if (cached != null) {
          if (cached.getState() == JobStatus.SUSPENDED)

          { cache.remove(jid); }

          return cached;
          }
          ```
          and you're done.

          *b) No EG was cached for the given ID*

          In this case the status doesn't matter, you ask the JM and if it returns an EG you add it to the cache. We don't care whether this EG is suspended because it will be removed with the next request that comes in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic Doesn't matter that the job status is ever changing, we only care about the state at the time of the request. There are 2 cases to consider when accessing the cache for a given ID: * a) An EG was cached for the given ID * In this case we can check the state of the job via `AccessExceutionGraph#getState`. Modify the this block in `ExecutionGraphHolder` ``` if (cached != null) { return cached; } ``` to this ``` if (cached != null) { if (cached.getState() == JobStatus.SUSPENDED) { cache.remove(jid); } return cached; } ``` and you're done. * b) No EG was cached for the given ID * In this case the status doesn't matter, you ask the JM and if it returns an EG you add it to the cache. We don't care whether this EG is suspended because it will be removed with the next request that comes in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          I got it, but still have one question: what about the other state transition? Like when job is cancelling or failing or else?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 I got it, but still have one question: what about the other state transition? Like when job is cancelling or failing or else?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Other states are irrelevant since they don't result in a new ExecutionGraph being created for the same JobID. For those cases the existing behavior is perfectly fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Other states are irrelevant since they don't result in a new ExecutionGraph being created for the same JobID. For those cases the existing behavior is perfectly fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          My main concern is that the status showing in web doesn't match the actual state backend.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 My main concern is that the status showing in web doesn't match the actual state backend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          Why would the states by out of sync for non-suspended ExecutionGraphs? As i said before, the JobManager and web-backend are working on the same object.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 Why would the states by out of sync for non-suspended ExecutionGraphs? As i said before, the JobManager and web-backend are working on the same object.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          All right. I'll change as you suggest and verify the result. Thanks for comments and advise

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 All right. I'll change as you suggest and verify the result. Thanks for comments and advise
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          I've testet and the function is ok. Please check if it's good to go, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 I've testet and the function is ok. Please check if it's good to go, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3709

          @WangTaoTheTonic I think the big source of confusion is the following: The cache does not cache any status. It really duplicates the pointer to the life `ExecutionGraph` object (the `AccessExecutionGraph` and the `ExecutionGraph` are the same here, the names are an artifact of an earlier approach to create a History Server).

          The only case that is problematic is the case where there are multiple execution graphs, which happens upon leader change.

          Another way to fix this would have been to remove the graph from the cache whenever leader status is lost.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic I think the big source of confusion is the following: The cache does not cache any status. It really duplicates the pointer to the life `ExecutionGraph` object (the `AccessExecutionGraph` and the `ExecutionGraph` are the same here, the names are an artifact of an earlier approach to create a History Server). The only case that is problematic is the case where there are multiple execution graphs, which happens upon leader change. Another way to fix this would have been to remove the graph from the cache whenever leader status is lost.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3709

          @StephanEwen Sure. The current fix is like a "pull", while what you suggest is a "push" way. Both them can fix just make difference in how the EGs being updated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @StephanEwen Sure. The current fix is like a "pull", while what you suggest is a "push" way. Both them can fix just make difference in how the EGs being updated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3709

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3709

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3709
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: 49d0e4321df1853d00e26593556c34acb9bed3d2

          Show
          Zentol Chesnay Schepler added a comment - 1.3: 49d0e4321df1853d00e26593556c34acb9bed3d2

            People

            • Assignee:
              WangTao Tao Wang
              Reporter:
              WangTao Tao Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development