diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
index bd08efb..510485f 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
@@ -71,6 +71,10 @@ private static int getPermFromString(String permString) {
return perm;
}
+ public static int removeSpecificPerms(int perms, int remove) {
+ return perms ^ remove;
+ }
+
/**
* Parse comma separated list of ACL entries to secure generated nodes, e.g.
* sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa
diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
index 1d14326..52d10ca 100644
--- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
+++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.BadAclFormatException;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.zookeeper.ZooDefs.Perms;
@@ -76,6 +75,14 @@ private static void badAcl(String acls, String expectedErr) {
}
@Test
+ public void testRemoveSpecificPerms() {
+ int perms = Perms.ALL;
+ int remove = Perms.CREATE;
+ int newPerms = ZKUtil.removeSpecificPerms(perms, remove);
+ assertEquals("Removal failed", 0, newPerms & Perms.CREATE);
+ }
+
+ @Test
public void testGoodACLs() {
List result = ZKUtil.parseACLs(
"sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
index 18f9896..d4771de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
@@ -78,8 +78,8 @@ public static String getRMHAId(Configuration conf) {
return rmId;
}
- private static String getConfValueForRMInstance(String prefix,
- Configuration conf) {
+ public static String getConfValueForRMInstance(String prefix,
+ Configuration conf) {
String confKey = addSuffix(prefix, getRMHAId(conf));
String retVal = conf.get(confKey);
if (LOG.isTraceEnabled()) {
@@ -90,8 +90,8 @@ private static String getConfValueForRMInstance(String prefix,
return retVal;
}
- static String getConfValueForRMInstance(String prefix, String defaultValue,
- Configuration conf) {
+ public static String getConfValueForRMInstance(
+ String prefix, String defaultValue, Configuration conf) {
String value = getConfValueForRMInstance(prefix, conf);
return (value == null) ? defaultValue : value;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index be9e301..fc42beb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -315,6 +315,8 @@
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
+ public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
+ ZK_STATE_STORE_PREFIX + "root-node.acl";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 86501ad..f1e7b23 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -321,6 +321,19 @@
+ ACLs to be used for the root znode when using
+ ZKRMStateStore, and is primarily meant for implicitly fencing the other
+ RM from making any changes to the store. For fencing, users are
+ required to set up ACLs in a way that both RMs have read-write-admin
+ access, but exclusive create-delete access. In a HA
+ setting, when not set, an RM-specific ACL is generated to ensure two
+ RMs don't use the store at the same time. In a non-HA setting,
+ when not set, the ACL set for all znodes via
+ yarn.resourcemanager.zk.state-store.acl is used by default.
+ yarn.resourcemanager.zk.state-store.root-node.acl
+
+
+
URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 41c95d3..eaf73b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -29,11 +29,13 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -47,16 +49,21 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
@Private
@@ -83,6 +90,53 @@
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
+ /** Fencing related variables */
+ private static final String FENCING_NODE_NAME = "OpInProgress";
+ private String fencingNodePath;
+ private Op createFencingNodePathOp;
+ private Op deleteFencingNodePathOp;
+
+ @VisibleForTesting
+ List zkRootNodeAcl;
+ private boolean useDefaultFencingScheme = false;
+ public static final int CREATE_DELETE_PERMS =
+ ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
+ private final String zkRootNodeAuthScheme =
+ new DigestAuthenticationProvider().getScheme();
+
+ private String zkRootNodeUsername;
+ private String zkRootNodePassword;
+
+ /**
+ * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
+ * ZooKeeper access, construct the {@link ACL}s for the store's root node.
+ * In the constructed {@link ACL}, all the users allowed by zkAcl are given
+ * rwa access, while the current RM has exclude create-delete access.
+ *
+ * To be called only when HA is enabled and the configuration doesn't set ACL
+ * for the root node.
+ */
+ @VisibleForTesting
+ protected List constructZkRootNodeACL(
+ Configuration conf, List sourceACLs) throws NoSuchAlgorithmException {
+ List zkRootNodeAcl = new ArrayList();
+ for (ACL acl : sourceACLs) {
+ zkRootNodeAcl.add(new ACL(
+ ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
+ acl.getId()));
+ }
+
+ zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+ zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp());
+ Id rmId = new Id(zkRootNodeAuthScheme,
+ DigestAuthenticationProvider.generateDigest(
+ zkRootNodeUsername + ":" + zkRootNodePassword));
+ zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
+ return zkRootNodeAcl;
+ }
+
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
@@ -116,6 +170,29 @@ public synchronized void initInternal(Configuration conf) throws Exception {
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+
+ /* Initialize fencing related paths, acls, and ops */
+ fencingNodePath = zkRootNodePath + "/" + FENCING_NODE_NAME;
+ createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
+ CreateMode.PERSISTENT);
+ deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
+ if (HAUtil.isHAEnabled(conf)) {
+ String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
+ (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
+ if (zkRootNodeAclConf != null) {
+ zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
+ try {
+ zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
+ } catch (ZKUtil.BadAclFormatException bafe) {
+ LOG.error("Invalid format for " +
+ YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
+ throw bafe;
+ }
+ } else {
+ useDefaultFencingScheme = true;
+ zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
+ }
+ }
}
@Override
@@ -126,20 +203,76 @@ public synchronized void startInternal() throws Exception {
// ensure root dirs exist
createRootDir(znodeWorkingPath);
createRootDir(zkRootNodePath);
+ if (HAUtil.isHAEnabled(getConfig())){
+ fence();
+ }
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
}
- private void createRootDir(String rootPath) throws Exception {
+ private void createRootDir(final String rootPath) throws Exception {
+ // For root dirs, we shouldn't use the doMulti helper methods
try {
- createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ new ZKAction() {
+ @Override
+ public String run() throws KeeperException, InterruptedException {
+ return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ }
+ }.runWithRetries();
} catch (KeeperException ke) {
- if (ke.code() != Code.NODEEXISTS) {
+ if (ke.code() == Code.NODEEXISTS) {
+ LOG.debug(rootPath + "znode already exists!");
+ } else {
throw ke;
}
}
}
+ private void logRootNodeAcls(String prefix) throws KeeperException,
+ InterruptedException {
+ Stat getStat = new Stat();
+ List getAcls = zkClient.getACL(zkRootNodePath, getStat);
+
+ StringBuilder builder = new StringBuilder();
+ builder.append(prefix);
+ for (ACL acl : getAcls) {
+ builder.append(acl.toString());
+ }
+ builder.append(getStat.toString());
+ LOG.debug(builder.toString());
+ }
+
+ private synchronized void fence() throws Exception {
+ if (LOG.isTraceEnabled()) {
+ logRootNodeAcls("Before fencing\n");
+ }
+
+ new ZKAction() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
+ return null;
+ }
+ }.runWithRetries();
+
+ // delete fencingnodepath
+ new ZKAction() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ try {
+ zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
+ }
+ return null;
+ }
+ }.runWithRetries();
+
+ if (LOG.isTraceEnabled()) {
+ logRootNodeAcls("After fencing\n");
+ }
+ }
+
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
try {
@@ -176,7 +309,8 @@ public synchronized RMState loadState() throws Exception {
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
- List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
+ List childNodes =
+ getChildrenWithRetries(rmDTSecretManagerRoot, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@@ -209,7 +343,7 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState)
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List childNodes = zkClient.getChildren(rmAppRoot, true);
+ List childNodes = getChildrenWithRetries(rmAppRoot, true);
List attempts =
new ArrayList();
for (String childNodeName : childNodes) {
@@ -476,58 +610,66 @@ String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
- @VisibleForTesting
- public String createWithRetries(
- final String path, final byte[] data, final List acl,
- final CreateMode mode) throws Exception {
- return new ZKAction() {
+ /**
+ * Helper method that creates fencing node, executes the passed operations,
+ * and deletes the fencing node.
+ */
+ private synchronized void doMultiWithRetries(
+ final List opList) throws Exception {
+ final List execOpList = new ArrayList(opList.size() + 2);
+ execOpList.add(createFencingNodePathOp);
+ execOpList.addAll(opList);
+ execOpList.add(deleteFencingNodePathOp);
+ new ZKAction() {
@Override
- public String run() throws KeeperException, InterruptedException {
- return zkClient.create(path, data, acl, mode);
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.multi(execOpList);
+ return null;
}
}.runWithRetries();
}
+ /**
+ * Helper method that creates fencing node, executes the passed operation,
+ * and deletes the fencing node.
+ */
+ private void doMultiWithRetries(final Op op) throws Exception {
+ doMultiWithRetries(Collections.singletonList(op));
+ }
+
+ @VisibleForTesting
+ public void createWithRetries(
+ final String path, final byte[] data, final List acl,
+ final CreateMode mode) throws Exception {
+ doMultiWithRetries(Op.create(path, data, acl, mode));
+ }
+
private void deleteWithRetries(final String path, final int version)
throws Exception {
+ /**
+ * Call exists() to leave a watch on the node denoted by path. To pass the
+ * existence information to the caller, call delete irrespective of whether
+ * node exists or not.
+ */
new ZKAction() {
@Override
public Void run() throws KeeperException, InterruptedException {
- /**
- * Call exists() to leave a watch on the node denoted by path.
- * Delete node if exists. To pass the existence information to the
- * caller, call delete irrespective of whether node exists or not.
- */
if (zkClient.exists(path, true) == null) {
LOG.error("Trying to delete a path (" + path
+ ") that doesn't exist.");
}
- zkClient.delete(path, version);
return null;
}
}.runWithRetries();
- }
- private void doMultiWithRetries(final ArrayList opList) throws Exception {
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.multi(opList);
- return null;
- }
- }.runWithRetries();
+ /** Perform actual delete */
+ doMultiWithRetries(Op.delete(path, version));
}
@VisibleForTesting
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
- new ZKAction() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.setData(path, data, version);
- return null;
- }
- }.runWithRetries();
+ doMultiWithRetries(Op.setData(path, data, version));
}
@VisibleForTesting
@@ -542,6 +684,16 @@ public Void run() throws KeeperException, InterruptedException {
}.runWithRetries();
}
+ private List getChildrenWithRetries(
+ final String path, final boolean watch) throws Exception {
+ return new ZKAction>() {
+ @Override
+ List run() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(path, watch);
+ }
+ }.runWithRetries();
+ }
+
private abstract class ZKAction {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
@@ -562,6 +714,17 @@ T runWithCheck() throws Exception {
}
}
+ private boolean shouldRetry(Code code) {
+ switch (code) {
+ case CONNECTIONLOSS:
+ case OPERATIONTIMEOUT:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
@@ -577,17 +740,6 @@ T runWithRetries() throws Exception {
}
}
- private static boolean shouldRetry(Code code) {
- switch (code) {
- case CONNECTIONLOSS:
- case OPERATIONTIMEOUT:
- return true;
- default:
- break;
- }
- return false;
- }
-
private synchronized void createConnection()
throws IOException, InterruptedException {
closeZkClients();
@@ -595,6 +747,10 @@ private synchronized void createConnection()
retries++) {
try {
zkClient = getNewZooKeeper();
+ if (useDefaultFencingScheme) {
+ zkClient.addAuthInfo(zkRootNodeAuthScheme, new String(
+ zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
+ }
} catch (IOException ioe) {
// Retry in case of network failures
LOG.info("Failed to connect to the ZooKeeper on attempt - " +
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
index 23939de..8d37f71 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
@@ -86,6 +86,7 @@ protected void storeNewMasterKey(DelegationKey newKey) {
rmContext.getStateStore().storeRMDTMasterKey(newKey);
} catch (Exception e) {
LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId());
+ LOG.error("Exception stack trace", e);
ExitUtil.terminate(1, e);
}
}