diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 839765c..058153c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -58,7 +58,7 @@ public static final String IPC_PREFIX = YARN_PREFIX + "ipc."; /** Factory to create client IPC classes.*/ - public static final String IPC_CLIENT_FACTORY_CLASS = + public static final String IPC_CLIENT_FACTORY_CLASS = IPC_PREFIX + "client.factory.class"; public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl"; @@ -86,6 +86,9 @@ //////////////////////////////// public static final String RM_PREFIX = "yarn.resourcemanager."; + public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster.id"; + public static final String DEFAULT_RM_CLUSTER_ID = "yarn-cluster"; + /** The address of the applications manager interface in the RM.*/ public static final String RM_ADDRESS = RM_PREFIX + "address"; @@ -294,6 +297,38 @@ RM_WEBAPP_ADDRESS, RM_WEBAPP_HTTPS_ADDRESS)); + public static final String RM_HA_FENCER = RM_HA_PREFIX + "fencer"; + + public static final String AUTO_FAILOVER_PREFIX = + RM_HA_PREFIX + "automatic-failover."; + + public static final String AUTO_FAILOVER_ENABLED = + AUTO_FAILOVER_PREFIX + "enabled"; + public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = false; + + public static final String AUTO_FAILOVER_EMBEDDED = + AUTO_FAILOVER_PREFIX + "embedded"; + public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = true; + + public static final String AUTO_FAILOVER_ZK_QUORUM = + AUTO_FAILOVER_PREFIX + "zk-quorum"; + + public static final String AUTO_FAILOVER_ZK_TIMEOUT_MS = + AUTO_FAILOVER_PREFIX + "zk-timeout-ms"; + public static final long DEFAULT_AUTO_FAILOVER_ZK_TIMEOUT_MS = 10000; + + public static final String AUTO_FAILOVER_ZK_BASE_PATH = + AUTO_FAILOVER_PREFIX + "zk-base-path"; + public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH = + "/yarn-leader-election"; + + public static final String AUTO_FAILOVER_ZK_ACL = + AUTO_FAILOVER_PREFIX + "zk-acl"; + public static final String DEFAULT_AUTO_FAILOVER_ZK_ACL = "world:anyone:rwcda"; + + public static final String AUTO_FAILOVER_ZK_AUTH = + AUTO_FAILOVER_PREFIX + "zk-auth"; + //////////////////////////////// // RM state store configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java new file mode 100644 index 0000000..33389fd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/YarnBadConfigurationException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class YarnBadConfigurationException extends YarnRuntimeException { + private static final long serialVersionUID = 1L; + + public YarnBadConfigurationException(String message) { + super(message); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index df77486..00d5124 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -126,3 +126,13 @@ message RMStateVersionProto { optional int32 major_version = 1; optional int32 minor_version = 2; } + +////////////////////////////////////////////////////////////////// +///////////// RM Failover related records //////////////////////// +////////////////////////////////////////////////////////////////// +message RMActiveNodeInfoProto { + required int32 port = 1; + required string hostname = 2; + required string clusterid = 3; + required string rmId = 4; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java index 74cb499..41472ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client; import org.apache.hadoop.ha.BadFencingConfigurationException; +import org.apache.hadoop.ha.FenceMethod; import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.NodeFencer; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -28,6 +29,7 @@ public class RMHAServiceTarget extends HAServiceTarget { private InetSocketAddress haAdminServiceAddress; + private NodeFencer fencer; public RMHAServiceTarget(YarnConfiguration conf) throws IOException { @@ -35,6 +37,10 @@ public RMHAServiceTarget(YarnConfiguration conf) YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + + if (conf.get(YarnConfiguration.RM_HA_FENCER) != null) { + fencer = NodeFencer.create(conf, YarnConfiguration.RM_HA_FENCER); + } } @Override @@ -50,13 +56,14 @@ public InetSocketAddress getZKFCAddress() { @Override public NodeFencer getFencer() { - // TODO (YARN-1026): Hook up fencing implementation - return null; + return fencer; } @Override public void checkFencingConfigured() throws BadFencingConfigurationException { - // TODO (YARN-1026): Based on fencing implementation + if (fencer == null) { + throw new BadFencingConfigurationException("Fencer not configured"); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c43dc1a..0830f8f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -425,6 +425,14 @@ + Fencing mechanism to use when HA is enabled. Multiple methods + can be provided, one per line. Fencing mechanisms must implement + org.apache.hadoop.ha.FenceMethod + yarn.resourcemanager.ha.fencer + + + + The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications 10000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java new file mode 100644 index 0000000..d41542f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DummyFencer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.ha.BadFencingConfigurationException; +import org.apache.hadoop.ha.FenceMethod; +import org.apache.hadoop.ha.HAServiceTarget; + +/** Dummy fencer to be used for implicit fencing */ +public class DummyFencer extends Configured implements FenceMethod { + + @Override + public void checkArgs(String args) throws BadFencingConfigurationException { + // No need to check + } + + @Override + public boolean tryFence(HAServiceTarget target, String args) + throws BadFencingConfigurationException { + // Trivially fenced + return true; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java new file mode 100644 index 0000000..3909ebd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java @@ -0,0 +1,240 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector; +import org.apache.hadoop.ha.BadFencingConfigurationException; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnBadConfigurationException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.zookeeper.data.ACL; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; + +public class RMZKActiveStandbyElectorService extends AbstractService + implements ActiveStandbyElector.ActiveStandbyElectorCallback { + private static final Log LOG = + LogFactory.getLog(RMZKActiveStandbyElectorService.class.getName()); + + private AdminService service; + private String clusterId; + + private byte[] localActiveNodeInfoBytes; + private ActiveStandbyElector elector; + private ElectorThread electorThread; + + RMZKActiveStandbyElectorService(AdminService service) { + super(RMZKActiveStandbyElectorService.class.getName()); + this.service = service; + } + + @Override + protected synchronized void serviceInit(Configuration conf) + throws Exception { + conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); + + String zkQuorum = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_QUORUM); + if (zkQuorum == null) { + throw new YarnBadConfigurationException("Embedded automatic failover " + + "is enabled, but " + YarnConfiguration.AUTO_FAILOVER_ZK_QUORUM + + "is not set"); + } + + long zkSessionTimeout = conf.getLong( + YarnConfiguration.AUTO_FAILOVER_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_TIMEOUT_MS); + + String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); + clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + String electionZNode = zkBasePath + "/" + clusterId; + + String zkAclConf = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_ACL, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_ACL); + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + List zkAcls; + try { + zkAcls = ZKUtil.parseACLs(zkAclConf); + } catch (ZKUtil.BadAclFormatException bafe) { + throw new YarnBadConfigurationException( + YarnConfiguration.AUTO_FAILOVER_ZK_ACL + "has ill-formatted ACLs"); + } + + // Parse authentication from configuration. + String zkAuthConf = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_AUTH); + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + List zkAuths; + if (zkAuthConf != null) { + zkAuths = ZKUtil.parseAuth(zkAuthConf); + } else { + zkAuths = Collections.emptyList(); + } + + elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, + electionZNode, zkAcls, zkAuths, this); + if (elector.parentZNodeExists()) { + try { + getRemoteHAServiceTarget(elector.getActiveData()); + } catch (YarnRuntimeException yre) { + // Incorrect active data, clear existing parentZNode + elector.clearParentZNode(); + } + } + elector.ensureParentZNode(); + + localActiveNodeInfoBytes = + localActiveNodeInfoBytes((YarnConfiguration) conf); + electorThread = new ElectorThread(); + electorThread.setDaemon(true); + + super.serviceInit(conf); + } + + @Override + protected synchronized void serviceStart() throws Exception { + if (electorThread != null) { + electorThread.start(); + } + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (electorThread != null) { + electorThread.interrupt(); + electorThread.join(); + } + super.serviceStop(); + } + + class ElectorThread extends Thread { + @Override + public void run() { + while (true) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // do nothing + } + + //noinspection SynchronizeOnNonFinalField + synchronized (elector) { + switch(service.haState) { + case ACTIVE: + case STANDBY: + elector.joinElection(localActiveNodeInfoBytes); + default: + elector.quitElection(false); + } + } + } + } + } + + private byte[] localActiveNodeInfoBytes(YarnConfiguration conf) + throws IOException { + RMHAServiceTarget localRMHAServiceTarget = + new RMHAServiceTarget(new YarnConfiguration(conf)); + String rmId = conf.get(YarnConfiguration.RM_HA_ID); + + InetSocketAddress addr = localRMHAServiceTarget.getAddress(); + + return YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto + .newBuilder() + .setHostname(addr.getHostName()) + .setPort(addr.getPort()) + .setClusterid(clusterId) + .setRmId(rmId) + .build() + .toByteArray(); + } + + @Override + public void becomeActive() throws ServiceFailedException { + try { + service.transitionToActive(); + } catch (Exception e) { + // Failed to become active, let the elector know + throw new ServiceFailedException("RM could not transition to Active", e); + } + } + + @Override + public void becomeStandby() { + try { + service.transitionToStandby(true); + } catch (Exception e) { + // Log the exception. The fencer should be able to fence this node + LOG.error("RM could not transition to Standby mode", e); + } + } + + @Override + public void enterNeutralMode() { + /** + * Possibly due to transient connection issues. Do nothing. + * TODO: Might want to keep track of how long in this state and transition + * to standby. + */ + } + + @Override + public void notifyFatalError(String errorMessage) { + LOG.fatal("Received " + errorMessage); + throw new YarnRuntimeException(errorMessage); + } + + @Override + public void fenceOldActive(byte[] oldActiveData) { + RMHAServiceTarget target = getRemoteHAServiceTarget(oldActiveData); + try { + target.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + throw new YarnBadConfigurationException(e.getMessage()); + } + + if (!target.getFencer().fence(target)) { + throw new YarnRuntimeException("Could not fence old active"); + } + } + + private RMHAServiceTarget getRemoteHAServiceTarget(byte[] data) { + YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto proto; + try { + proto = YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto + .parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new YarnRuntimeException( + "Invalid data in ZK: " + StringUtils.byteToHexString(data)); + } + + // Check if the passed proto corresponds to an RM in the same cluster + if (!proto.getClusterid().equals(clusterId)) { + throw new YarnRuntimeException("Mismatched cluster! The other RM seems " + + "to be from a different cluster. Current cluster = " + clusterId + + "Other RM's cluster = " + proto.getClusterid()); + } + + RMHAServiceTarget ret; + try { + YarnConfiguration conf = new YarnConfiguration(getConfig()); + conf.set(YarnConfiguration.RM_HA_ID, proto.getRmId()); + ret = new RMHAServiceTarget(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Couldn't create RMHAServiceTarget", e); + } + return ret; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 597d18c..42184e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -43,10 +43,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnBadConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -188,6 +190,26 @@ protected void serviceInit(Configuration conf) throws Exception { addService(adminService); rmContext.setRMAdminService(adminService); + if (HAUtil.isHAEnabled(conf)) { + // HA enabled + if (conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_ENABLED)) { + // Automatic failover enabled + if (conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_EMBEDDED)) { + // Embedded automatic failover enabled + RMZKActiveStandbyElectorService electorService = + new RMZKActiveStandbyElectorService(adminService); + addService(electorService); + } else { + // TODO (YARN-1177): + throw new YarnBadConfigurationException("Currently, " + + "automatic failover can only be through embedded ZK-based " + + "ActiveStandbyElector"); + } + } + } + super.serviceInit(conf); }