Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5971

JobLeaderIdService should time out registered jobs

    Details

      Description

      The JobLeaderIdService has no mechanism to time out inactive jobs. At the moment it relies on the RunningJobsRegistry which only gives a heuristic answer.

      We should remove the RunningJobsRegistry and register instead a timeout for each job which does not have a job leader associated.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3488

          FLINK-5971 [flip-6] Add timeout for registered jobs on the ResourceManager

          This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive
          if there is no active leader known for this job. In case that a job times out, it will
          be removed from the ResourceManager. Additionally, this PR removes the dependency of
          the JobLeaderIdService on the RunningJobsRegistry.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink jobLeaderIdServiceTimeoutJobs

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3488.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3488


          commit 6fb0e239921ca10bac218d62155b08fb96e3725d
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-03-06T15:57:43Z

          FLINK-5971 [flip-6] Add timeout for registered jobs on the ResourceManager

          This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive
          if there is no active leader known for this job. In case that a job times out, it will
          be removed from the ResourceManager. Additionally, this PR removes the dependency of
          the JobLeaderIdService on the RunningJobsRegistry.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3488 FLINK-5971 [flip-6] Add timeout for registered jobs on the ResourceManager This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive if there is no active leader known for this job. In case that a job times out, it will be removed from the ResourceManager. Additionally, this PR removes the dependency of the JobLeaderIdService on the RunningJobsRegistry. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink jobLeaderIdServiceTimeoutJobs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3488 commit 6fb0e239921ca10bac218d62155b08fb96e3725d Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-03-06T15:57:43Z FLINK-5971 [flip-6] Add timeout for registered jobs on the ResourceManager This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive if there is no active leader known for this job. In case that a job times out, it will be removed from the ResourceManager. Additionally, this PR removes the dependency of the JobLeaderIdService on the RunningJobsRegistry.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r104937304

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java —
          @@ -0,0 +1,37 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.configuration;
          +
          +import org.apache.flink.annotation.PublicEvolving;
          +
          +/**
          + * The set of configuration options relating to the ResourceManager
          + */
          +@PublicEvolving
          +public class ResourceManagerOptions {
          +
          + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
          + .key("resourcemanager.job.timeout")
          — End diff –

          maybe we can change the name to something like "inactive_job.timeout" or "idle_job.timeout" to make this config more specific

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r104937304 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java — @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") — End diff – maybe we can change the name to something like "inactive_job.timeout" or "idle_job.timeout" to make this config more specific
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105067829

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java —
          @@ -283,5 +302,31 @@ public void handleError(Exception exception)

          { JobLeaderIdListener.class.getSimpleName(), exception); }

          }
          +
          + private void activateTimeout() {
          + if (timeoutId != null)

          { + cancelTimeout(); + }

          +
          + final UUID newTimeoutId = UUID.randomUUID();
          +
          + timeoutId = newTimeoutId;
          +
          + timeoutFuture = scheduledExecutor.schedule(new Runnable() {
          + @Override
          + public void run() {
          + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
          — End diff –

          I think this thread is different with the leader retrieve thread which will call "notifyLeaderAddress", right?
          So is this possible that when a job is close to timeout, and then comes the notification of leader address, both threads are trying to `cancelTimeout`, NPE will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, while the other one has set `timeoutFuture = null`

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105067829 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java — @@ -283,5 +302,31 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + if (timeoutId != null) { + cancelTimeout(); + } + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); — End diff – I think this thread is different with the leader retrieve thread which will call "notifyLeaderAddress", right? So is this possible that when a job is close to timeout, and then comes the notification of leader address, both threads are trying to `cancelTimeout`, NPE will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, while the other one has set `timeoutFuture = null`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105068068

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java —
          @@ -0,0 +1,37 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.configuration;
          +
          +import org.apache.flink.annotation.PublicEvolving;
          +
          +/**
          + * The set of configuration options relating to the ResourceManager
          + */
          +@PublicEvolving
          +public class ResourceManagerOptions {
          +
          + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
          + .key("resourcemanager.job.timeout")
          — End diff –

          can we have a more specific name like "inactive_job.timeout". It looks like normal job will also timeout for 5 minutes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105068068 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java — @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") — End diff – can we have a more specific name like "inactive_job.timeout". It looks like normal job will also timeout for 5 minutes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105068133

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java —
          @@ -28,6 +28,10 @@
          @PublicEvolving
          public class AkkaOptions {

          + public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
          — End diff –

          Maybe add a comment to be consistency with other options in this class?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105068133 — Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java — @@ -28,6 +28,10 @@ @PublicEvolving public class AkkaOptions { + public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions — End diff – Maybe add a comment to be consistency with other options in this class?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105117165

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java —
          @@ -28,6 +28,10 @@
          @PublicEvolving
          public class AkkaOptions {

          + public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
          — End diff –

          True will add it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105117165 — Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java — @@ -28,6 +28,10 @@ @PublicEvolving public class AkkaOptions { + public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions — End diff – True will add it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105117904

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java —
          @@ -0,0 +1,37 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.configuration;
          +
          +import org.apache.flink.annotation.PublicEvolving;
          +
          +/**
          + * The set of configuration options relating to the ResourceManager
          + */
          +@PublicEvolving
          +public class ResourceManagerOptions {
          +
          + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
          + .key("resourcemanager.job.timeout")
          — End diff –

          Hmm not sure whether it's clear to the user what an inactive job is. Will add java docs, though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105117904 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java — @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") — End diff – Hmm not sure whether it's clear to the user what an inactive job is. Will add java docs, though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3488#discussion_r105118398

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java —
          @@ -283,5 +302,31 @@ public void handleError(Exception exception)

          { JobLeaderIdListener.class.getSimpleName(), exception); }

          }
          +
          + private void activateTimeout() {
          + if (timeoutId != null)

          { + cancelTimeout(); + }

          +
          + final UUID newTimeoutId = UUID.randomUUID();
          +
          + timeoutId = newTimeoutId;
          +
          + timeoutFuture = scheduledExecutor.schedule(new Runnable() {
          + @Override
          + public void run() {
          + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
          — End diff –

          Yes I think you're right. Good point. Will fix the problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105118398 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java — @@ -283,5 +302,31 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + if (timeoutId != null) { + cancelTimeout(); + } + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); — End diff – Yes I think you're right. Good point. Will fix the problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3488

          Thanks a lot for your review @KurtYoung. I've addressed you comments and updated the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3488 Thanks a lot for your review @KurtYoung. I've addressed you comments and updated the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3488

          +1 to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3488 +1 to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3488

          Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3488 Merging this PR.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed via fcd264a707d3dd8ef4247825752c8639732c943c

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via fcd264a707d3dd8ef4247825752c8639732c943c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3488

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3488

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development