diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 00d5124..bee5f8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -133,6 +133,7 @@ message RMStateVersionProto { message RMActiveNodeInfoProto { required int32 port = 1; required string hostname = 2; - required string clusterid = 3; + required string clusterId = 3; required string rmId = 4; + optional int32 zkfcPort = 5; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java index b1fd06a..563aed2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKActiveStandbyElectorService.java @@ -1,6 +1,5 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -9,20 +8,16 @@ import org.apache.hadoop.ha.FailoverController; import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ZKFCProtocol; -import org.apache.hadoop.ha.ZKFailoverController; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnBadConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; @@ -249,16 +244,7 @@ public void run() { private byte[] targetToData(RMHAServiceTarget target, String rmId) throws IOException { - InetSocketAddress addr = target.getAddress(); - - return YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto - .newBuilder() - .setHostname(addr.getHostName()) - .setPort(addr.getPort()) - .setClusterid(clusterId) - .setRmId(rmId) - .build() - .toByteArray(); + return RMZKUtils.targetToData(target, clusterId, rmId); } @Override @@ -312,30 +298,6 @@ public void fenceOldActive(byte[] oldActiveData) { } private RMHAServiceTarget dataToTarget(byte[] data) { - YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto proto; - try { - proto = YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto - .parseFrom(data); - } catch (InvalidProtocolBufferException e) { - throw new YarnRuntimeException( - "Invalid data in ZK: " + StringUtils.byteToHexString(data)); - } - - // Check if the passed proto corresponds to an RM in the same cluster - if (!proto.getClusterid().equals(clusterId)) { - throw new YarnRuntimeException("Mismatched cluster! The other RM seems " + - "to be from a different cluster. Current cluster = " + clusterId + - "Other RM's cluster = " + proto.getClusterid()); - } - - RMHAServiceTarget ret; - try { - YarnConfiguration conf = new YarnConfiguration(getConfig()); - conf.set(YarnConfiguration.RM_HA_ID, proto.getRmId()); - ret = new RMHAServiceTarget(conf); - } catch (IOException e) { - throw new YarnRuntimeException("Couldn't create RMHAServiceTarget", e); - } - return ret; + return RMZKUtils.dataToTarget(data, getConfig()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKFailoverController.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKFailoverController.java new file mode 100644 index 0000000..75818bf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKFailoverController.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.ha.ZKFailoverController; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto; +import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; + +@InterfaceAudience.Private +public class RMZKFailoverController extends ZKFailoverController { + + private static final Log LOG = LogFactory.getLog(RMZKFailoverController.class); + + private AccessControlList adminAcl; + private final RMHAServiceTarget localRMServiceTarget; + + @Override + protected HAServiceTarget dataToTarget(byte[] data) { + RMActiveNodeInfoProto proto; + try { + proto = RMActiveNodeInfoProto.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new YarnRuntimeException( + "Invalid data in ZK: " + StringUtils.byteToHexString(data)); + } + + // Check if the passed proto corresponds to an RM in the same cluster + if (!proto.getClusterid().equals(localRMServiceTarget.getClusterId())) { + throw new YarnRuntimeException("Mismatched cluster! The other RM seems " + + "to be from a different cluster. Current cluster = " + + localRMServiceTarget.getClusterId() + + "Other RM's cluster = " + proto.getClusterid()); + } + + RMHAServiceTarget ret = null; + try { + ret = new RMHAServiceTarget((YarnConfiguration)conf, proto.getRmId()); + } catch (IOException e) { + throw new YarnRuntimeException("Couldn't create RMHAServiceTarget", e); + } + ret.setAutoFailoverPort(proto.getZkfcPort()); + return ret; + } + + @Override + protected byte[] targetToData(HAServiceTarget target) { + InetSocketAddress addr = target.getAddress(); + + return RMActiveNodeInfoProto.newBuilder() + .setHostname(addr.getHostName()) + .setPort(addr.getPort()) + .setZkfcPort(target.getZKFCAddress().getPort()) + .setClusterid(localRMServiceTarget.getClusterId()) + .setRmId(localRMServiceTarget.getRmId()) + .build() + .toByteArray(); + } + + @Override + protected InetSocketAddress getRpcAddressToBindTo() { + int zkfcPort = getZkfcPort(conf); + return new InetSocketAddress(localTarget.getAddress().getAddress(), + zkfcPort); + } + + + @Override + protected PolicyProvider getPolicyProvider() { + return new RMPolicyProvider(); + } + + static int getZkfcPort(Configuration conf) { + return conf.getInt(YarnConfiguration.RM_HA_AUTOMATIC_FAILOVER_PORT, + YarnConfiguration.DEFAULT_RM_HA_AUTOMATIC_FAILOVER_PORT); + } + + public static RMZKFailoverController create(Configuration conf) + throws IOException { + YarnConfiguration localRMConf = new YarnConfiguration(conf); + if (!HAUtil.isHAEnabled(localRMConf)) { + throw new HadoopIllegalArgumentException( + "HA is not enabled for this ResourceManager."); + } + + String rmId = HAUtil.getRMHAId(localRMConf); + RMHAServiceTarget localTarget = new RMHAServiceTarget(localRMConf, rmId); + return new RMZKFailoverController(localRMConf, localTarget); + } + + private RMZKFailoverController(Configuration conf, + RMHAServiceTarget localTarget) { + super(conf, localTarget); + this.localRMServiceTarget = localTarget; + this.adminAcl = new AccessControlList(conf.get( + YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + LOG.info("Failover controller configured for NameNode " + localTarget); + } + + @Override + protected void initRPC() throws IOException { + super.initRPC(); + localRMServiceTarget.setAutoFailoverPort(rpcServer.getAddress().getPort()); + } + + @Override + public void loginAsFCUser() throws IOException { + // No need to do anything in YARN + } + + @Override + protected String getScopeInsideParentNode() { + return localRMServiceTarget.getClusterId(); + } + + @Override + protected void checkRpcAdminAccess() throws IOException { + try { + RMServerUtils.verifyAccess(adminAcl, "ZKFC", LOG); + } catch (YarnException e) { + throw new IOException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java new file mode 100644 index 0000000..ce3a822 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java @@ -0,0 +1,77 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class RMZKUtils { + public static RMHAServiceTarget dataToTarget(byte[] data, + Configuration conf) { + YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto proto; + try { + proto = YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto + .parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new YarnRuntimeException( + "Invalid data in ZK: " + StringUtils.byteToHexString(data)); + } + + String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + // Check if the passed proto corresponds to an RM in the same cluster + if (!proto.getClusterid().equals(clusterId)) { + throw new YarnRuntimeException("Mismatched cluster! The other RM seems " + + "to be from a different cluster. Current cluster = " + clusterId + + "Other RM's cluster = " + proto.getClusterid()); + } + + RMHAServiceTarget ret; + try { + YarnConfiguration targetConf = new YarnConfiguration(conf); + targetConf.set(YarnConfiguration.RM_HA_ID, proto.getRmId()); + ret = new RMHAServiceTarget(targetConf); + } catch (IOException e) { + throw new YarnRuntimeException("Couldn't create RMHAServiceTarget", e); + } + return ret; + } + + public static byte[] targetToData(RMHAServiceTarget target, + String clusterId, String rmId) throws IOException { + InetSocketAddress addr = target.getAddress(); + + return YarnServerResourceManagerServiceProtos.RMActiveNodeInfoProto + .newBuilder() + .setHostname(addr.getHostName()) + .setPort(addr.getPort()) + .setClusterid(clusterId) + .setRmId(rmId) + .build() + .toByteArray(); + } +}