diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java index bd08efb..510485f 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java @@ -71,6 +71,10 @@ private static int getPermFromString(String permString) { return perm; } + public static int removeSpecificPerms(int perms, int remove) { + return perms ^ remove; + } + /** * Parse comma separated list of ACL entries to secure generated nodes, e.g. * sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java index 1d14326..52d10ca 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.ZKUtil.BadAclFormatException; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; import org.apache.zookeeper.ZooDefs.Perms; @@ -76,6 +75,14 @@ private static void badAcl(String acls, String expectedErr) { } @Test + public void testRemoveSpecificPerms() { + int perms = Perms.ALL; + int remove = Perms.CREATE; + int newPerms = ZKUtil.removeSpecificPerms(perms, remove); + assertEquals("Removal failed", 0, newPerms & Perms.CREATE); + } + + @Test public void testGoodACLs() { List result = ZKUtil.parseACLs( "sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 18f9896..d4771de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -78,8 +78,8 @@ public static String getRMHAId(Configuration conf) { return rmId; } - private static String getConfValueForRMInstance(String prefix, - Configuration conf) { + public static String getConfValueForRMInstance(String prefix, + Configuration conf) { String confKey = addSuffix(prefix, getRMHAId(conf)); String retVal = conf.get(confKey); if (LOG.isTraceEnabled()) { @@ -90,8 +90,8 @@ private static String getConfValueForRMInstance(String prefix, return retVal; } - static String getConfValueForRMInstance(String prefix, String defaultValue, - Configuration conf) { + public static String getConfValueForRMInstance( + String prefix, String defaultValue, Configuration conf) { String value = getConfValueForRMInstance(prefix, conf); return (value == null) ? defaultValue : value; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index be9e301..fc42beb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -315,6 +315,8 @@ ZK_STATE_STORE_PREFIX + "acl"; public static final String DEFAULT_ZK_RM_STATE_STORE_ACL = "world:anyone:rwcda"; + public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL = + ZK_STATE_STORE_PREFIX + "root-node.acl"; /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 86501ad..f1e7b23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -321,6 +321,19 @@ + ACLs to be used for the root znode when using + ZKRMStateStore, and is primarily meant for implicitly fencing the other + RM from making any changes to the store. For fencing, users are + required to set up ACLs in a way that both RMs have read-write-admin + access, but exclusive create-delete access. In a HA + setting, when not set, an RM-specific ACL is generated to ensure two + RMs don't use the store at the same time. In a non-HA setting, + when not set, the ACL set for all znodes via + yarn.resourcemanager.zk.state-store.acl is used by default. + yarn.resourcemanager.zk.state-store.root-node.acl + + + URI pointing to the location of the FileSystem path where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 41c95d3..eaf73b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -29,11 +29,13 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -47,16 +49,21 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @Private @@ -83,6 +90,53 @@ protected ZooKeeper zkClient; private ZooKeeper oldZkClient; + /** Fencing related variables */ + private static final String FENCING_NODE_NAME = "OpInProgress"; + private String fencingNodePath; + private Op createFencingNodePathOp; + private Op deleteFencingNodePathOp; + + @VisibleForTesting + List zkRootNodeAcl; + private boolean useDefaultFencingScheme = false; + public static final int CREATE_DELETE_PERMS = + ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; + private final String zkRootNodeAuthScheme = + new DigestAuthenticationProvider().getScheme(); + + private String zkRootNodeUsername; + private String zkRootNodePassword; + + /** + * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for + * ZooKeeper access, construct the {@link ACL}s for the store's root node. + * In the constructed {@link ACL}, all the users allowed by zkAcl are given + * rwa access, while the current RM has exclude create-delete access. + * + * To be called only when HA is enabled and the configuration doesn't set ACL + * for the root node. + */ + @VisibleForTesting + protected List constructZkRootNodeACL( + Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { + List zkRootNodeAcl = new ArrayList(); + for (ACL acl : sourceACLs) { + zkRootNodeAcl.add(new ACL( + ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), + acl.getId())); + } + + zkRootNodeUsername = HAUtil.getConfValueForRMInstance( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, conf); + zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp()); + Id rmId = new Id(zkRootNodeAuthScheme, + DigestAuthenticationProvider.generateDigest( + zkRootNodeUsername + ":" + zkRootNodePassword)); + zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); + return zkRootNodeAcl; + } + @Override public synchronized void initInternal(Configuration conf) throws Exception { zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS); @@ -116,6 +170,29 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + + /* Initialize fencing related paths, acls, and ops */ + fencingNodePath = zkRootNodePath + "/" + FENCING_NODE_NAME; + createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, + CreateMode.PERSISTENT); + deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); + if (HAUtil.isHAEnabled(conf)) { + String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance + (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); + if (zkRootNodeAclConf != null) { + zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf); + try { + zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf); + } catch (ZKUtil.BadAclFormatException bafe) { + LOG.error("Invalid format for " + + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); + throw bafe; + } + } else { + useDefaultFencingScheme = true; + zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); + } + } } @Override @@ -126,20 +203,76 @@ public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDir(znodeWorkingPath); createRootDir(zkRootNodePath); + if (HAUtil.isHAEnabled(getConfig())){ + fence(); + } createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); } - private void createRootDir(String rootPath) throws Exception { + private void createRootDir(final String rootPath) throws Exception { + // For root dirs, we shouldn't use the doMulti helper methods try { - createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT); + new ZKAction() { + @Override + public String run() throws KeeperException, InterruptedException { + return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); + } + }.runWithRetries(); } catch (KeeperException ke) { - if (ke.code() != Code.NODEEXISTS) { + if (ke.code() == Code.NODEEXISTS) { + LOG.debug(rootPath + "znode already exists!"); + } else { throw ke; } } } + private void logRootNodeAcls(String prefix) throws KeeperException, + InterruptedException { + Stat getStat = new Stat(); + List getAcls = zkClient.getACL(zkRootNodePath, getStat); + + StringBuilder builder = new StringBuilder(); + builder.append(prefix); + for (ACL acl : getAcls) { + builder.append(acl.toString()); + } + builder.append(getStat.toString()); + LOG.debug(builder.toString()); + } + + private synchronized void fence() throws Exception { + if (LOG.isTraceEnabled()) { + logRootNodeAcls("Before fencing\n"); + } + + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1); + return null; + } + }.runWithRetries(); + + // delete fencingnodepath + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + try { + zkClient.multi(Collections.singletonList(deleteFencingNodePathOp)); + } catch (KeeperException.NoNodeException nne) { + LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete"); + } + return null; + } + }.runWithRetries(); + + if (LOG.isTraceEnabled()) { + logRootNodeAcls("After fencing\n"); + } + } + private synchronized void closeZkClients() throws IOException { if (zkClient != null) { try { @@ -176,7 +309,8 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true); + List childNodes = + getChildrenWithRetries(rmDTSecretManagerRoot, true); for (String childNodeName : childNodes) { if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { @@ -209,7 +343,7 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmAppRoot, true); + List childNodes = getChildrenWithRetries(rmAppRoot, true); List attempts = new ArrayList(); for (String childNodeName : childNodes) { @@ -476,58 +610,66 @@ String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } - @VisibleForTesting - public String createWithRetries( - final String path, final byte[] data, final List acl, - final CreateMode mode) throws Exception { - return new ZKAction() { + /** + * Helper method that creates fencing node, executes the passed operations, + * and deletes the fencing node. + */ + private synchronized void doMultiWithRetries( + final List opList) throws Exception { + final List execOpList = new ArrayList(opList.size() + 2); + execOpList.add(createFencingNodePathOp); + execOpList.addAll(opList); + execOpList.add(deleteFencingNodePathOp); + new ZKAction() { @Override - public String run() throws KeeperException, InterruptedException { - return zkClient.create(path, data, acl, mode); + public Void run() throws KeeperException, InterruptedException { + zkClient.multi(execOpList); + return null; } }.runWithRetries(); } + /** + * Helper method that creates fencing node, executes the passed operation, + * and deletes the fencing node. + */ + private void doMultiWithRetries(final Op op) throws Exception { + doMultiWithRetries(Collections.singletonList(op)); + } + + @VisibleForTesting + public void createWithRetries( + final String path, final byte[] data, final List acl, + final CreateMode mode) throws Exception { + doMultiWithRetries(Op.create(path, data, acl, mode)); + } + private void deleteWithRetries(final String path, final int version) throws Exception { + /** + * Call exists() to leave a watch on the node denoted by path. To pass the + * existence information to the caller, call delete irrespective of whether + * node exists or not. + */ new ZKAction() { @Override public Void run() throws KeeperException, InterruptedException { - /** - * Call exists() to leave a watch on the node denoted by path. - * Delete node if exists. To pass the existence information to the - * caller, call delete irrespective of whether node exists or not. - */ if (zkClient.exists(path, true) == null) { LOG.error("Trying to delete a path (" + path + ") that doesn't exist."); } - zkClient.delete(path, version); return null; } }.runWithRetries(); - } - private void doMultiWithRetries(final ArrayList opList) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.multi(opList); - return null; - } - }.runWithRetries(); + /** Perform actual delete */ + doMultiWithRetries(Op.delete(path, version)); } @VisibleForTesting public void setDataWithRetries(final String path, final byte[] data, final int version) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.setData(path, data, version); - return null; - } - }.runWithRetries(); + doMultiWithRetries(Op.setData(path, data, version)); } @VisibleForTesting @@ -542,6 +684,16 @@ public Void run() throws KeeperException, InterruptedException { }.runWithRetries(); } + private List getChildrenWithRetries( + final String path, final boolean watch) throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } + private abstract class ZKAction { // run() expects synchronization on ZKRMStateStore.this abstract T run() throws KeeperException, InterruptedException; @@ -562,6 +714,17 @@ T runWithCheck() throws Exception { } } + private boolean shouldRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + T runWithRetries() throws Exception { int retry = 0; while (true) { @@ -577,17 +740,6 @@ T runWithRetries() throws Exception { } } - private static boolean shouldRetry(Code code) { - switch (code) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - return true; - default: - break; - } - return false; - } - private synchronized void createConnection() throws IOException, InterruptedException { closeZkClients(); @@ -595,6 +747,10 @@ private synchronized void createConnection() retries++) { try { zkClient = getNewZooKeeper(); + if (useDefaultFencingScheme) { + zkClient.addAuthInfo(zkRootNodeAuthScheme, new String( + zkRootNodeUsername + ":" + zkRootNodePassword).getBytes()); + } } catch (IOException ioe) { // Retry in case of network failures LOG.info("Failed to connect to the ZooKeeper on attempt - " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939de..8d37f71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -86,6 +86,7 @@ protected void storeNewMasterKey(DelegationKey newKey) { rmContext.getStateStore().storeRMDTMasterKey(newKey); } catch (Exception e) { LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId()); + LOG.error("Exception stack trace", e); ExitUtil.terminate(1, e); } }