diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java index bd08efb..d23df79 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java @@ -72,6 +72,20 @@ private static int getPermFromString(String permString) { } /** + * Helper method to remove a subset of permissions (remove) from a + * given set (perms). + * @param perms The permissions flag to remove from. Should be an OR of a + * some combination of {@link ZooDefs.Perms} + * @param remove The permissions to be removed. Should be an OR of a + * some combination of {@link ZooDefs.Perms} + * @return A permissions flag that is an OR of {@link ZooDefs.Perms} + * present in perms and not present in remove + */ + public static int removeSpecificPerms(int perms, int remove) { + return perms ^ remove; + } + + /** * Parse comma separated list of ACL entries to secure generated nodes, e.g. * sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa * diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java index 1d14326..52d10ca 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.ZKUtil.BadAclFormatException; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; import org.apache.zookeeper.ZooDefs.Perms; @@ -76,6 +75,14 @@ private static void badAcl(String acls, String expectedErr) { } @Test + public void testRemoveSpecificPerms() { + int perms = Perms.ALL; + int remove = Perms.CREATE; + int newPerms = ZKUtil.removeSpecificPerms(perms, remove); + assertEquals("Removal failed", 0, newPerms & Perms.CREATE); + } + + @Test public void testGoodACLs() { List result = ZKUtil.parseACLs( "sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 5512dcd..fdec509 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -193,8 +193,8 @@ private static String getConfKeyForRMInstance(String prefix, return addSuffix(prefix, getRMHAId(conf)); } - private static String getConfValueForRMInstance(String prefix, - Configuration conf) { + public static String getConfValueForRMInstance(String prefix, + Configuration conf) { String confKey = getConfKeyForRMInstance(prefix, conf); String retVal = conf.getTrimmed(confKey); if (LOG.isTraceEnabled()) { @@ -205,8 +205,8 @@ private static String getConfValueForRMInstance(String prefix, return retVal; } - static String getConfValueForRMInstance(String prefix, String defaultValue, - Configuration conf) { + public static String getConfValueForRMInstance( + String prefix, String defaultValue, Configuration conf) { String value = getConfValueForRMInstance(prefix, conf); return (value == null) ? defaultValue : value; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..1e4c719 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -328,6 +328,8 @@ ZK_STATE_STORE_PREFIX + "acl"; public static final String DEFAULT_ZK_RM_STATE_STORE_ACL = "world:anyone:rwcda"; + public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL = + ZK_STATE_STORE_PREFIX + "root-node.acl"; /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 86501ad..7f6e050 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -279,7 +279,11 @@ Host:Port of the ZooKeeper server where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore - as the value for yarn.resourcemanager.store.class + as the value for yarn.resourcemanager.store.class. ZKRMStateStore + is implicitly fenced, meaning a single ResourceManager is + able to use the store at any point in time. More details on this, along + with setting up appropriate ACLs is discussed under the description for + yarn.resourcemanager.zk.state-store.root-node.acl. yarn.resourcemanager.zk.state-store.address @@ -321,6 +325,31 @@ + + ACLs to be used for the root znode when using ZKRMStateStore in a HA + scenario for fencing. + + ZKRMStateStore supports implicit fencing to allow a single + ResourceManager write-access to the store. For fencing, the + ResourceManagers in the cluster share read-write-admin privileges on the + root node, but the Active ResourceManager claims exclusive create-delete + permissions. + + By default, when this property is not set, we use the ACLs from + yarn.resourcemanager.zk.state-store.acl for shared admin access and + rm-address:cluster-timestamp for username-based exclusive create-delete + access. + + This property allows users to set ACLs of their choice instead of using + the default mechanism. For fencing to work, the ACLs should be + carefully set differently on each ResourceManger such that all the + ResourceManagers have shared admin access and the Active ResourceManger + takes over (exclusively) the create-delete access. + + yarn.resourcemanager.zk.state-store.root-node.acl + + + URI pointing to the location of the FileSystem path where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java index c74b282..f801203 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java @@ -67,7 +67,9 @@ protected HAServiceState haState = HAServiceState.INITIALIZING; private AccessControlList adminAcl; private Server haAdminServer; - private boolean haEnabled; + + @InterfaceAudience.Private + boolean haEnabled; public RMHAProtocolService(ResourceManager resourceManager) { super("RMHAProtocolService"); @@ -174,7 +176,8 @@ public synchronized void monitorHealth() } } - private synchronized void transitionToActive() throws Exception { + @InterfaceAudience.Private + synchronized void transitionToActive() throws Exception { if (haState == HAServiceState.ACTIVE) { LOG.info("Already in active state"); return; @@ -205,7 +208,8 @@ public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) } } - private synchronized void transitionToStandby(boolean initialize) + @InterfaceAudience.Private + synchronized void transitionToStandby(boolean initialize) throws Exception { if (haState == HAServiceState.STANDBY) { LOG.info("Already in standby state"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 41b119e..11fdf4a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -163,6 +165,10 @@ public ResourceManager() { super("ResourceManager"); } + + public RMHAProtocolService getHAService() { + return this.haService; + } public RMContext getRMContext() { return this.rmContext; @@ -216,6 +222,11 @@ protected NMTokenSecretManagerInRM createNMTokenSecretManager( return new SchedulerEventDispatcher(this.scheduler); } + protected RMStateStoreOperationFailedEventDispatcher + createRMStateStoreOperationFailedEventDispatcher() { + return new RMStateStoreOperationFailedEventDispatcher(haService); + } + protected Dispatcher createDispatcher() { return new AsyncDispatcher(); } @@ -339,6 +350,8 @@ protected void serviceInit(Configuration configuration) throws Exception { try { rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); + rmDispatcher.register(RMStateStoreOperationFailedEventType.class, + createRMStateStoreOperationFailedEventDispatcher()); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced @@ -633,6 +646,32 @@ public void handle(SchedulerEvent event) { } @Private + public static class RMStateStoreOperationFailedEventDispatcher implements + EventHandler { + private final RMHAProtocolService haService; + + public RMStateStoreOperationFailedEventDispatcher( + RMHAProtocolService haService) { + this.haService = haService; + } + + @Override + public void handle(RMStateStoreOperationFailedEvent event) { + if (event.getType() == RMStateStoreOperationFailedEventType.FENCED && + haService.haEnabled) { + try { + haService.transitionToStandby(true); + return; + } catch (Exception e) { + // Do nothing. RM is going to shutdown + } + } + + ExitUtil.terminate(1, event.getCause()); + } + } + + @Private public static final class ApplicationEventDispatcher implements EventHandler { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5a7c7dc..a42e50c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; + import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -384,9 +385,13 @@ protected abstract void updateApplicationAttemptStateInternal(String attemptId, */ public synchronized void storeRMDelegationTokenAndSequenceNumber( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, - int latestSequenceNumber) throws Exception { - storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, - latestSequenceNumber); + int latestSequenceNumber) { + try { + storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, + latestSequenceNumber); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -402,9 +407,12 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState( * RMDTSecretManager call this to remove the state of a delegation token */ public synchronized void removeRMDelegationToken( - RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) - throws Exception { - removeRMDelegationTokenState(rmDTIdentifier); + RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { + try { + removeRMDelegationTokenState(rmDTIdentifier); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -417,9 +425,12 @@ protected abstract void removeRMDelegationTokenState( /** * RMDTSecretManager call this to store the state of a master key */ - public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) - throws Exception { - storeRMDTMasterKeyState(delegationKey); + public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) { + try { + storeRMDTMasterKeyState(delegationKey); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -433,9 +444,12 @@ protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey) /** * RMDTSecretManager call this to remove the state of a master key */ - public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) - throws Exception { - removeRMDTMasterKeyState(delegationKey); + public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) { + try { + removeRMDTMasterKeyState(delegationKey); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -542,13 +556,12 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { } } catch (Exception e) { LOG.error("Error storing app: " + appId, e); - storedException = e; - } finally { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - notifyDoneStoringApplication(appId, storedException); - } else { - notifyDoneUpdatingApplication(appId, storedException); - } + notifyStoreOperationFailed(e); + } + if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { + notifyDoneStoringApplication(appId, storedException); + } else { + notifyDoneUpdatingApplication(appId, storedException); } } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { @@ -593,17 +606,16 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { .toString(), attemptStateData); } } catch (Exception e) { - LOG - .error("Error storing appAttempt: " + attemptState.getAttemptId(), e); - storedException = e; - } finally { - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + LOG.error( + "Error storing appAttempt: " + attemptState.getAttemptId(), e); + notifyStoreOperationFailed(e); + } + if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { + notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), storedException); - } else { - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + } else { + notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), storedException); - } } } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { ApplicationState appState = @@ -615,15 +627,31 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { removeApplicationState(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); - removedException = e; - } finally { - notifyDoneRemovingApplcation(appId, removedException); + notifyStoreOperationFailed(e); } + notifyDoneRemovingApplcation(appId, removedException); } else { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } } + /** + * In {#handleStoreEvent}, this method is called to notify the + * ResourceManager that the store operation has failed. + * @param failureCause the exception due to which the operation failed + */ + private void notifyStoreOperationFailed(Exception failureCause) { + RMStateStoreOperationFailedEventType type; + if (failureCause instanceof StoreFencedException) { + type = RMStateStoreOperationFailedEventType.FENCED; + } else { + type = RMStateStoreOperationFailedEventType.FAILED; + } + + rmDispatcher.getEventHandler().handle( + new RMStateStoreOperationFailedEvent(type, failureCause)); + } + @SuppressWarnings("unchecked") /** * In (@link handleStoreEvent}, this method is called to notify the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java new file mode 100644 index 0000000..5a98a48 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java @@ -0,0 +1,35 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class RMStateStoreOperationFailedEvent extends AbstractEvent { + private Exception cause; + + RMStateStoreOperationFailedEvent( + RMStateStoreOperationFailedEventType type, Exception cause) { + super(type); + this.cause = cause; + } + + public Exception getCause() { + return this.cause; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java new file mode 100644 index 0000000..f5a7dc5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +public enum RMStateStoreOperationFailedEventType { + FENCED, // Store operation failed because it was fenced + FAILED // Store operation failed for no known reason +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java new file mode 100644 index 0000000..1f8eb16 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class StoreFencedException extends YarnException { + private static final long serialVersionUID = 1L; + + public StoreFencedException() { + super("RMStateStore has been fenced"); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 628d260..3db677b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -23,7 +23,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -31,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -38,11 +41,14 @@ import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -53,11 +59,14 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; @Private @Unstable @@ -83,6 +92,55 @@ protected ZooKeeper zkClient; private ZooKeeper oldZkClient; + /** Fencing related variables */ + private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; + private String fencingNodePath; + private Op createFencingNodePathOp; + private Op deleteFencingNodePathOp; + + @VisibleForTesting + List zkRootNodeAcl; + private boolean useDefaultFencingScheme = false; + public static final int CREATE_DELETE_PERMS = + ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; + private final String zkRootNodeAuthScheme = + new DigestAuthenticationProvider().getScheme(); + + private String zkRootNodeUsername; + private String zkRootNodePassword; + + /** + * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for + * ZooKeeper access, construct the {@link ACL}s for the store's root node. + * In the constructed {@link ACL}, all the users allowed by zkAcl are given + * rwa access, while the current RM has exclude create-delete access. + * + * To be called only when HA is enabled and the configuration doesn't set ACL + * for the root node. + */ + @VisibleForTesting + @Private + @Unstable + protected List constructZkRootNodeACL( + Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { + List zkRootNodeAcl = new ArrayList(); + for (ACL acl : sourceACLs) { + zkRootNodeAcl.add(new ACL( + ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), + acl.getId())); + } + + zkRootNodeUsername = HAUtil.getConfValueForRMInstance( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, conf); + zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp()); + Id rmId = new Id(zkRootNodeAuthScheme, + DigestAuthenticationProvider.generateDigest( + zkRootNodeUsername + ":" + zkRootNodePassword)); + zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); + return zkRootNodeAcl; + } + @Override public synchronized void initInternal(Configuration conf) throws Exception { zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS); @@ -116,6 +174,29 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + + /* Initialize fencing related paths, acls, and ops */ + fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK; + createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, + CreateMode.PERSISTENT); + deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); + if (HAUtil.isHAEnabled(conf)) { + String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance + (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); + if (zkRootNodeAclConf != null) { + zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf); + try { + zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf); + } catch (ZKUtil.BadAclFormatException bafe) { + LOG.error("Invalid format for " + + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); + throw bafe; + } + } else { + useDefaultFencingScheme = true; + zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); + } + } } @Override @@ -126,20 +207,76 @@ public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDir(znodeWorkingPath); createRootDir(zkRootNodePath); + if (HAUtil.isHAEnabled(getConfig())){ + fence(); + } createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); } - private void createRootDir(String rootPath) throws Exception { + private void createRootDir(final String rootPath) throws Exception { + // For root dirs, we shouldn't use the doMulti helper methods try { - createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT); + new ZKAction() { + @Override + public String run() throws KeeperException, InterruptedException { + return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); + } + }.runWithRetries(); } catch (KeeperException ke) { - if (ke.code() != Code.NODEEXISTS) { + if (ke.code() == Code.NODEEXISTS) { + LOG.debug(rootPath + "znode already exists!"); + } else { throw ke; } } } + private void logRootNodeAcls(String prefix) throws KeeperException, + InterruptedException { + Stat getStat = new Stat(); + List getAcls = zkClient.getACL(zkRootNodePath, getStat); + + StringBuilder builder = new StringBuilder(); + builder.append(prefix); + for (ACL acl : getAcls) { + builder.append(acl.toString()); + } + builder.append(getStat.toString()); + LOG.debug(builder.toString()); + } + + private synchronized void fence() throws Exception { + if (LOG.isTraceEnabled()) { + logRootNodeAcls("Before fencing\n"); + } + + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1); + return null; + } + }.runWithRetries(); + + // delete fencingnodepath + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + try { + zkClient.multi(Collections.singletonList(deleteFencingNodePathOp)); + } catch (KeeperException.NoNodeException nne) { + LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete"); + } + return null; + } + }.runWithRetries(); + + if (LOG.isTraceEnabled()) { + logRootNodeAcls("After fencing\n"); + } + } + private synchronized void closeZkClients() throws IOException { if (zkClient != null) { try { @@ -176,7 +313,8 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true); + List childNodes = + getChildrenWithRetries(rmDTSecretManagerRoot, true); for (String childNodeName : childNodes) { if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { @@ -209,7 +347,7 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmAppRoot, true); + List childNodes = getChildrenWithRetries(rmAppRoot, true); List attempts = new ArrayList(); for (String childNodeName : childNodes) { @@ -466,6 +604,8 @@ public void process(WatchedEvent event) { } @VisibleForTesting + @Private + @Unstable public synchronized void processWatchEvent(WatchedEvent event) throws Exception { Event.EventType eventType = event.getType(); @@ -506,65 +646,71 @@ public synchronized void processWatchEvent(WatchedEvent event) } @VisibleForTesting + @Private + @Unstable String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } - @VisibleForTesting - public String createWithRetries( - final String path, final byte[] data, final List acl, - final CreateMode mode) throws Exception { - return new ZKAction() { - @Override - public String run() throws KeeperException, InterruptedException { - return zkClient.create(path, data, acl, mode); - } - }.runWithRetries(); - } - - private void deleteWithRetries(final String path, final int version) - throws Exception { + /** + * Helper method that creates fencing node, executes the passed operations, + * and deletes the fencing node. + */ + private synchronized void doMultiWithRetries( + final List opList) throws Exception { + final List execOpList = new ArrayList(opList.size() + 2); + execOpList.add(createFencingNodePathOp); + execOpList.addAll(opList); + execOpList.add(deleteFencingNodePathOp); new ZKAction() { @Override public Void run() throws KeeperException, InterruptedException { - /** - * Call exists() to leave a watch on the node denoted by path. - * Delete node if exists. To pass the existence information to the - * caller, call delete irrespective of whether node exists or not. - */ - if (zkClient.exists(path, true) == null) { - LOG.error("Trying to delete a path (" + path - + ") that doesn't exist."); - } - zkClient.delete(path, version); + zkClient.multi(execOpList); return null; } }.runWithRetries(); } - private void doMultiWithRetries(final ArrayList opList) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.multi(opList); - return null; + /** + * Helper method that creates fencing node, executes the passed operation, + * and deletes the fencing node. + */ + private void doMultiWithRetries(final Op op) throws Exception { + doMultiWithRetries(Collections.singletonList(op)); + } + + @VisibleForTesting + @Private + @Unstable + public void createWithRetries( + final String path, final byte[] data, final List acl, + final CreateMode mode) throws Exception { + doMultiWithRetries(Op.create(path, data, acl, mode)); + } + + private void deleteWithRetries(final String path, final int version) + throws Exception { + try { + doMultiWithRetries(Op.delete(path, version)); + } catch (KeeperException.NoNodeException nne) { + // We tried to delete a node that doesn't exist + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + path); } - }.runWithRetries(); + } } @VisibleForTesting + @Private + @Unstable public void setDataWithRetries(final String path, final byte[] data, final int version) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.setData(path, data, version); - return null; - } - }.runWithRetries(); + doMultiWithRetries(Op.setData(path, data, version)); } @VisibleForTesting + @Private + @Unstable public byte[] getDataWithRetries(final String path, final boolean watch) throws Exception { return new ZKAction() { @@ -576,6 +722,16 @@ public Void run() throws KeeperException, InterruptedException { }.runWithRetries(); } + private List getChildrenWithRetries( + final String path, final boolean watch) throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } + private abstract class ZKAction { // run() expects synchronization on ZKRMStateStore.this abstract T run() throws KeeperException, InterruptedException; @@ -596,11 +752,29 @@ T runWithCheck() throws Exception { } } + private boolean shouldRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + T runWithRetries() throws Exception { int retry = 0; while (true) { try { return runWithCheck(); + } catch (KeeperException.NoAuthException nae) { + if (HAUtil.isHAEnabled(getConfig())) { + // NoAuthException possibly means that this store is fenced due to + // another RM becoming active. Even if not, + // it is safer to assume we have been fenced + throw new StoreFencedException(); + } } catch (KeeperException ke) { if (shouldRetry(ke.code()) && ++retry < numRetries) { continue; @@ -611,17 +785,6 @@ T runWithRetries() throws Exception { } } - private static boolean shouldRetry(Code code) { - switch (code) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - return true; - default: - break; - } - return false; - } - private synchronized void createConnection() throws IOException, InterruptedException { closeZkClients(); @@ -629,6 +792,10 @@ private synchronized void createConnection() retries++) { try { zkClient = getNewZooKeeper(); + if (useDefaultFencingScheme) { + zkClient.addAuthInfo(zkRootNodeAuthScheme, new String( + zkRootNodeUsername + ":" + zkRootNodePassword).getBytes()); + } } catch (IOException ioe) { // Retry in case of network failures LOG.info("Failed to connect to the ZooKeeper on attempt - " + @@ -646,6 +813,8 @@ private synchronized void createConnection() // protected to mock for testing @VisibleForTesting + @Private + @Unstable protected synchronized ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index a6929a8..da7f137 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -18,15 +18,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; @@ -56,7 +73,7 @@ public ZooKeeper getNewZooKeeper() throws IOException { public RMStateStore getRMStateStore() throws Exception { String workingZnode = "/Test"; - YarnConfiguration conf = new YarnConfiguration(); + Configuration conf = new YarnConfiguration(); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -77,4 +94,79 @@ public void testZKRMStateStoreRealZK() throws Exception { testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); } + + private void setHAConfForRM(Configuration conf, String rmId, int adminPort) { + conf.set(YarnConfiguration.RM_HA_ID, rmId); + for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0"); + } + conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort); + } + + @SuppressWarnings("unchecked") + @Test + public void testFencing() throws Exception { + StateChangeRequestInfo req = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); + + setHAConfForRM(conf, "rm1", 1234); + ResourceManager rm1 = new ResourceManager(); + rm1.init(conf); + rm1.start(); + rm1.getHAService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getHAService().getServiceStatus().getState()); + + setHAConfForRM(conf, "rm2", 2345); + ResourceManager rm2 = new ResourceManager(); + rm2.init(conf); + rm2.start(); + rm2.getHAService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getHAService().getServiceStatus().getState()); + + // Try submitting an application with rm1 + Map mockMap = mock(Map.class); + ApplicationSubmissionContext asc = + ApplicationSubmissionContext.newInstance( + ApplicationId.newInstance(1000, 1), + "testApplication", // app Name + "default", // queue name + Priority.newInstance(0), + ContainerLaunchContext.newInstance(mockMap, mockMap, + new ArrayList(), mockMap, mock(ByteBuffer.class), + mockMap), + false, // unmanaged AM + true, // cancelTokens + 1, // max app attempts + Resource.newInstance(1024, 1)); + ClientRMService rmService = rm1.getClientRMService(); + rmService.submitApplication(SubmitApplicationRequest.newInstance(asc)); + + for (int i = 0; i < 30; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService() + .getServiceStatus().getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getHAService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getHAService().getServiceStatus().getState()); + } }