diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 839765c..058153c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -58,7 +58,7 @@
public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
/** Factory to create client IPC classes.*/
- public static final String IPC_CLIENT_FACTORY_CLASS =
+ public static final String IPC_CLIENT_FACTORY_CLASS =
IPC_PREFIX + "client.factory.class";
public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
@@ -86,6 +86,9 @@
////////////////////////////////
public static final String RM_PREFIX = "yarn.resourcemanager.";
+ public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster.id";
+ public static final String DEFAULT_RM_CLUSTER_ID = "yarn-cluster";
+
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";
@@ -294,6 +297,38 @@
RM_WEBAPP_ADDRESS,
RM_WEBAPP_HTTPS_ADDRESS));
+ public static final String RM_HA_FENCER = RM_HA_PREFIX + "fencer";
+
+ public static final String AUTO_FAILOVER_PREFIX =
+ RM_HA_PREFIX + "automatic-failover.";
+
+ public static final String AUTO_FAILOVER_ENABLED =
+ AUTO_FAILOVER_PREFIX + "enabled";
+ public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = false;
+
+ public static final String AUTO_FAILOVER_EMBEDDED =
+ AUTO_FAILOVER_PREFIX + "embedded";
+ public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = true;
+
+ public static final String AUTO_FAILOVER_ZK_QUORUM =
+ AUTO_FAILOVER_PREFIX + "zk-quorum";
+
+ public static final String AUTO_FAILOVER_ZK_TIMEOUT_MS =
+ AUTO_FAILOVER_PREFIX + "zk-timeout-ms";
+ public static final long DEFAULT_AUTO_FAILOVER_ZK_TIMEOUT_MS = 10000;
+
+ public static final String AUTO_FAILOVER_ZK_BASE_PATH =
+ AUTO_FAILOVER_PREFIX + "zk-base-path";
+ public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH =
+ "/yarn-leader-election";
+
+ public static final String AUTO_FAILOVER_ZK_ACL =
+ AUTO_FAILOVER_PREFIX + "zk-acl";
+ public static final String DEFAULT_AUTO_FAILOVER_ZK_ACL = "world:anyone:rwcda";
+
+ public static final String AUTO_FAILOVER_ZK_AUTH =
+ AUTO_FAILOVER_PREFIX + "zk-auth";
+
////////////////////////////////
// RM state store configs
////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java
new file mode 100644
index 0000000..33389fd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnBadConfigurationException extends YarnRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public YarnBadConfigurationException(String message) {
+ super(message);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index df77486..00d5124 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -126,3 +126,13 @@ message RMStateVersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}
+
+//////////////////////////////////////////////////////////////////
+///////////// RM Failover related records ////////////////////////
+//////////////////////////////////////////////////////////////////
+message RMActiveNodeInfoProto {
+ required int32 port = 1;
+ required string hostname = 2;
+ required string clusterid = 3;
+ required string rmId = 4;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
index 74cb499..41472ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.ha.BadFencingConfigurationException;
+import org.apache.hadoop.ha.FenceMethod;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -28,6 +29,7 @@
public class RMHAServiceTarget extends HAServiceTarget {
private InetSocketAddress haAdminServiceAddress;
+ private NodeFencer fencer;
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
@@ -35,6 +37,10 @@ public RMHAServiceTarget(YarnConfiguration conf)
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
+
+ if (conf.get(YarnConfiguration.RM_HA_FENCER) != null) {
+ fencer = NodeFencer.create(conf, YarnConfiguration.RM_HA_FENCER);
+ }
}
@Override
@@ -50,13 +56,14 @@ public InetSocketAddress getZKFCAddress() {
@Override
public NodeFencer getFencer() {
- // TODO (YARN-1026): Hook up fencing implementation
- return null;
+ return fencer;
}
@Override
public void checkFencingConfigured()
throws BadFencingConfigurationException {
- // TODO (YARN-1026): Based on fencing implementation
+ if (fencer == null) {
+ throw new BadFencingConfigurationException("Fencer not configured");
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c43dc1a..0830f8f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -425,6 +425,14 @@
+ Fencing mechanism to use when HA is enabled. Multiple methods
+ can be provided, one per line. Fencing mechanisms must implement
+ org.apache.hadoop.ha.FenceMethod
+ yarn.resourcemanager.ha.fencer
+
+
+
+
The maximum number of completed applications RM keeps.
yarn.resourcemanager.max-completed-applications
10000
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java
new file mode 100644
index 0000000..d41542f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ha.BadFencingConfigurationException;
+import org.apache.hadoop.ha.FenceMethod;
+import org.apache.hadoop.ha.HAServiceTarget;
+
+/** Dummy fencer to be used for implicit fencing */
+public class DummyFencer extends Configured implements FenceMethod {
+
+ @Override
+ public void checkArgs(String args) throws BadFencingConfigurationException {
+ // No need to check
+ }
+
+ @Override
+ public boolean tryFence(HAServiceTarget target, String args)
+ throws BadFencingConfigurationException {
+ // Trivially fenced
+ return true;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java
new file mode 100644
index 0000000..3909ebd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java
@@ -0,0 +1,240 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector;
+import org.apache.hadoop.ha.BadFencingConfigurationException;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.client.RMHAServiceTarget;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnBadConfigurationException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+
+public class RMZKActiveStandbyElectorService extends AbstractService
+ implements ActiveStandbyElector.ActiveStandbyElectorCallback {
+ private static final Log LOG =
+ LogFactory.getLog(RMZKActiveStandbyElectorService.class.getName());
+
+ private AdminService service;
+ private String clusterId;
+
+ private byte[] localActiveNodeInfoBytes;
+ private ActiveStandbyElector elector;
+ private ElectorThread electorThread;
+
+ RMZKActiveStandbyElectorService(AdminService service) {
+ super(RMZKActiveStandbyElectorService.class.getName());
+ this.service = service;
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf)
+ throws Exception {
+ conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
+
+ String zkQuorum = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_QUORUM);
+ if (zkQuorum == null) {
+ throw new YarnBadConfigurationException("Embedded automatic failover " +
+ "is enabled, but " + YarnConfiguration.AUTO_FAILOVER_ZK_QUORUM +
+ "is not set");
+ }
+
+ long zkSessionTimeout = conf.getLong(
+ YarnConfiguration.AUTO_FAILOVER_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_TIMEOUT_MS);
+
+ String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+ clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+ String electionZNode = zkBasePath + "/" + clusterId;
+
+ String zkAclConf = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_ACL,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_ACL);
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+ List zkAcls;
+ try {
+ zkAcls = ZKUtil.parseACLs(zkAclConf);
+ } catch (ZKUtil.BadAclFormatException bafe) {
+ throw new YarnBadConfigurationException(
+ YarnConfiguration.AUTO_FAILOVER_ZK_ACL + "has ill-formatted ACLs");
+ }
+
+ // Parse authentication from configuration.
+ String zkAuthConf = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_AUTH);
+ zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+ List zkAuths;
+ if (zkAuthConf != null) {
+ zkAuths = ZKUtil.parseAuth(zkAuthConf);
+ } else {
+ zkAuths = Collections.emptyList();
+ }
+
+ elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
+ electionZNode, zkAcls, zkAuths, this);
+ if (elector.parentZNodeExists()) {
+ try {
+ getRemoteHAServiceTarget(elector.getActiveData());
+ } catch (YarnRuntimeException yre) {
+ // Incorrect active data, clear existing parentZNode
+ elector.clearParentZNode();
+ }
+ }
+ elector.ensureParentZNode();
+
+ localActiveNodeInfoBytes =
+ localActiveNodeInfoBytes((YarnConfiguration) conf);
+ electorThread = new ElectorThread();
+ electorThread.setDaemon(true);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ if (electorThread != null) {
+ electorThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ if (electorThread != null) {
+ electorThread.interrupt();
+ electorThread.join();
+ }
+ super.serviceStop();
+ }
+
+ class ElectorThread extends Thread {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+
+ //noinspection SynchronizeOnNonFinalField
+ synchronized (elector) {
+ switch(service.haState) {
+ case ACTIVE:
+ case STANDBY:
+ elector.joinElection(localActiveNodeInfoBytes);
+ default:
+ elector.quitElection(false);
+ }
+ }
+ }
+ }
+ }
+
+ private byte[] localActiveNodeInfoBytes(YarnConfiguration conf)
+ throws IOException {
+ RMHAServiceTarget localRMHAServiceTarget =
+ new RMHAServiceTarget(new YarnConfiguration(conf));
+ String rmId = conf.get(YarnConfiguration.RM_HA_ID);
+
+ InetSocketAddress addr = localRMHAServiceTarget.getAddress();
+
+ return YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto
+ .newBuilder()
+ .setHostname(addr.getHostName())
+ .setPort(addr.getPort())
+ .setClusterid(clusterId)
+ .setRmId(rmId)
+ .build()
+ .toByteArray();
+ }
+
+ @Override
+ public void becomeActive() throws ServiceFailedException {
+ try {
+ service.transitionToActive();
+ } catch (Exception e) {
+ // Failed to become active, let the elector know
+ throw new ServiceFailedException("RM could not transition to Active", e);
+ }
+ }
+
+ @Override
+ public void becomeStandby() {
+ try {
+ service.transitionToStandby(true);
+ } catch (Exception e) {
+ // Log the exception. The fencer should be able to fence this node
+ LOG.error("RM could not transition to Standby mode", e);
+ }
+ }
+
+ @Override
+ public void enterNeutralMode() {
+ /**
+ * Possibly due to transient connection issues. Do nothing.
+ * TODO: Might want to keep track of how long in this state and transition
+ * to standby.
+ */
+ }
+
+ @Override
+ public void notifyFatalError(String errorMessage) {
+ LOG.fatal("Received " + errorMessage);
+ throw new YarnRuntimeException(errorMessage);
+ }
+
+ @Override
+ public void fenceOldActive(byte[] oldActiveData) {
+ RMHAServiceTarget target = getRemoteHAServiceTarget(oldActiveData);
+ try {
+ target.checkFencingConfigured();
+ } catch (BadFencingConfigurationException e) {
+ throw new YarnBadConfigurationException(e.getMessage());
+ }
+
+ if (!target.getFencer().fence(target)) {
+ throw new YarnRuntimeException("Could not fence old active");
+ }
+ }
+
+ private RMHAServiceTarget getRemoteHAServiceTarget(byte[] data) {
+ YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto proto;
+ try {
+ proto = YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto
+ .parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new YarnRuntimeException(
+ "Invalid data in ZK: " + StringUtils.byteToHexString(data));
+ }
+
+ // Check if the passed proto corresponds to an RM in the same cluster
+ if (!proto.getClusterid().equals(clusterId)) {
+ throw new YarnRuntimeException("Mismatched cluster! The other RM seems " +
+ "to be from a different cluster. Current cluster = " + clusterId +
+ "Other RM's cluster = " + proto.getClusterid());
+ }
+
+ RMHAServiceTarget ret;
+ try {
+ YarnConfiguration conf = new YarnConfiguration(getConfig());
+ conf.set(YarnConfiguration.RM_HA_ID, proto.getRmId());
+ ret = new RMHAServiceTarget(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Couldn't create RMHAServiceTarget", e);
+ }
+ return ret;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 597d18c..42184e5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -43,10 +43,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnBadConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -188,6 +190,26 @@ protected void serviceInit(Configuration conf) throws Exception {
addService(adminService);
rmContext.setRMAdminService(adminService);
+ if (HAUtil.isHAEnabled(conf)) {
+ // HA enabled
+ if (conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ENABLED)) {
+ // Automatic failover enabled
+ if (conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_EMBEDDED)) {
+ // Embedded automatic failover enabled
+ RMZKActiveStandbyElectorService electorService =
+ new RMZKActiveStandbyElectorService(adminService);
+ addService(electorService);
+ } else {
+ // TODO (YARN-1177):
+ throw new YarnBadConfigurationException("Currently, " +
+ "automatic failover can only be through embedded ZK-based " +
+ "ActiveStandbyElector");
+ }
+ }
+ }
+
super.serviceInit(conf);
}