diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java index 9a031af30fb..e1efcb5e8b6 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java @@ -26,6 +26,8 @@ import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.curator.retry.RetryNTimes; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -54,7 +56,6 @@ /** Curator for ZooKeeper. */ private CuratorFramework curator; - public ZKCuratorManager(Configuration config) throws IOException { this.conf = config; } @@ -119,7 +120,6 @@ public void close() { /** * Start the connection to the ZooKeeper ensemble. - * @param conf Configuration for the connection. * @throws IOException If the connection cannot be started. */ public void start() throws IOException { @@ -128,7 +128,6 @@ public void start() throws IOException { /** * Start the connection to the ZooKeeper ensemble. - * @param conf Configuration for the connection. * @param authInfos List of authentication keys. * @throws IOException If the connection cannot be started. */ @@ -337,4 +336,87 @@ public boolean delete(final String path) throws Exception { public static String getNodePath(String root, String nodeName) { return root + "/" + nodeName; } + + public void safeCreate(String path, byte[] data, List acl, + CreateMode mode, List fencingACL, String fencingNodePath) + throws Exception { + if (!exists(path)) { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.create(path, data, acl, mode); + transaction.commit(); + } + } + + /** + * Deletes the path. Checks for existence of path as well. + * @param path Path to be deleted. + * @throws Exception if any problem occurs while performing deletion. + */ + public void safeDelete(final String path, List fencingACL, + String fencingNodePath) throws Exception { + if (exists(path)) { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.delete(path); + transaction.commit(); + } + } + + public void safeSetData(String path, byte[] data, int version, + List fencingACL, String fencingNodePath) + throws Exception { + SafeTransaction transaction = createTransaction(fencingACL, + fencingNodePath); + transaction.setData(path, data, version); + transaction.commit(); + } + + public SafeTransaction createTransaction(List fencingACL, + String fencingNodePath) throws Exception { + return new SafeTransaction(fencingACL, fencingNodePath); + } + + /** + * Use curator transactions to ensure zk-operations are performed in an all + * or nothing fashion. This is equivalent to using ZooKeeper#multi. + * + * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll + * have to rewrite this inner class when we adopt that. + */ + public class SafeTransaction { + private CuratorTransactionFinal transactionFinal; + private String fencingNodePath; + + SafeTransaction(List fencingACL, String fencingNodePath) + throws Exception { + this.fencingNodePath = fencingNodePath; + CuratorTransaction transaction = curator.inTransaction(); + transactionFinal = transaction.create() + .withMode(CreateMode.PERSISTENT).withACL(fencingACL) + .forPath(fencingNodePath, new byte[0]).and(); + } + + public void commit() throws Exception { + transactionFinal = transactionFinal.delete() + .forPath(fencingNodePath).and(); + transactionFinal.commit(); + } + + public void create(String path, byte[] data, List acl, CreateMode mode) + throws Exception { + transactionFinal = transactionFinal.create() + .withMode(mode).withACL(acl).forPath(path, data).and(); + } + + public void delete(String path) throws Exception { + transactionFinal = transactionFinal.delete().forPath(path).and(); + } + + public void setData(String path, byte[] data, int version) + throws Exception { + transactionFinal = transactionFinal.setData() + .withVersion(version).forPath(path, data).and(); + } + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index a445e756433..ac67dcd9a86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -22,8 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -31,6 +29,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -416,9 +415,10 @@ protected synchronized void storeVersion() throws Exception { ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (exists(versionNodePath)) { - safeSetData(versionNodePath, data, -1); + zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath); } else { - safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } } @@ -447,12 +447,14 @@ public synchronized long getAndIncrementEpoch() throws Exception { // increment epoch and store it byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - safeSetData(epochNodePath, storeData, -1); + zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl, + fencingNodePath); } else { // initialize epoch node with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(epochNodePath, storeData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } return currentEpoch; @@ -721,7 +723,7 @@ private void checkRemoveParentAppNode(String appIdPath, int splitIndex) // No apps stored under parent path. if (children != null && children.isEmpty()) { try { - safeDelete(parentAppNode); + zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("No leaf app node exists. Removing parent node " + parentAppNode); @@ -749,7 +751,8 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, byte[] appStateData = appStateDataPB.getProto().toByteArray(); if (appStateData.length <= zknodeLimit) { - safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } else { if (LOG.isDebugEnabled()) { LOG.debug("Application state data size for " + appId + " is " @@ -780,7 +783,8 @@ protected synchronized void updateApplicationStateInternal( String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); if (!exists(rootNode)) { - safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } } } @@ -794,9 +798,11 @@ protected synchronized void updateApplicationStateInternal( byte[] appStateData = appStateDataPB.getProto().toByteArray(); if (pathExists) { - safeSetData(nodeUpdatePath, appStateData, -1); + zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl, + fencingNodePath); } else { - safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " + "exist. Creating a new znode to update the application state."); @@ -839,9 +845,11 @@ private void handleApplicationAttemptStateOp( switch (operation) { case UPDATE: if (exists(path)) { - safeSetData(path, attemptStateData, -1); + zkManager.safeSetData(path, attemptStateData, -1, zkAcl, + fencingNodePath); } else { - safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(path, attemptStateData, zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); if (LOG.isDebugEnabled()) { LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." + " Created a new znode to update the application attempt state."); @@ -849,10 +857,11 @@ private void handleApplicationAttemptStateOp( } break; case STORE: - safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); break; case REMOVE: - safeDelete(path); + zkManager.safeDelete(path, zkAcl, fencingNodePath); break; default: break; @@ -930,10 +939,10 @@ private void removeApp(String removeAppId, boolean safeRemove, for (ApplicationAttemptId attemptId : attempts) { String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); - safeDelete(attemptRemovePath); + zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath); } } - safeDelete(appIdRemovePath); + zkManager.safeDelete(appIdRemovePath, zkAcl, fencingNodePath); } else { CuratorFramework curatorFramework = zkManager.getCurator(); curatorFramework.delete().deletingChildrenIfNeeded(). @@ -947,7 +956,7 @@ private void removeApp(String removeAppId, boolean safeRemove, protected synchronized void storeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false); trx.commit(); } @@ -964,14 +973,14 @@ protected synchronized void removeRMDelegationTokenState( + rmDTIdentifier.getSequenceNumber()); } - safeDelete(nodeRemovePath); + zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); } @Override protected synchronized void updateRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); @@ -1035,8 +1044,8 @@ protected synchronized void storeRMDTMasterKeyState( ByteArrayOutputStream os = new ByteArrayOutputStream(); try(DataOutputStream fsOut = new DataOutputStream(os)) { delegationKey.write(fsOut); - safeCreate(nodeCreatePath, os.toByteArray(), zkAcl, - CreateMode.PERSISTENT); + zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl, + CreateMode.PERSISTENT, zkAcl, fencingNodePath); } } @@ -1051,7 +1060,7 @@ protected synchronized void removeRMDTMasterKeyState( LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - safeDelete(nodeRemovePath); + zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath); } @Override @@ -1078,7 +1087,8 @@ protected synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); - safeSetData(amrmTokenSecretManagerRoot, stateData, -1); + zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl, + fencingNodePath); } @Override @@ -1092,12 +1102,12 @@ protected synchronized void removeReservationState(String planName, + " for" + " plan " + planName); } - safeDelete(reservationPath); + zkManager.safeDelete(reservationPath, zkAcl, fencingNodePath); List reservationNodes = getChildren(planNodePath); if (reservationNodes.isEmpty()) { - safeDelete(planNodePath); + zkManager.safeDelete(planNodePath, zkAcl, fencingNodePath); } } @@ -1105,7 +1115,7 @@ protected synchronized void removeReservationState(String planName, protected synchronized void storeReservationState( ReservationAllocationStateProto reservationAllocation, String planName, String reservationIdName) throws Exception { - SafeTransaction trx = new SafeTransaction(); + SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath); addOrUpdateReservationState(reservationAllocation, planName, reservationIdName, trx, false); trx.commit(); @@ -1191,7 +1201,8 @@ private String getLeafAppIdNodePath(String appId, String rootNode, getNodePath(rootNode, nodeName.substring(0, splitIdx)); if (createParentIfNotExists && !exists(rootNodePath)) { try { - safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT); + zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); } catch (KeeperException.NodeExistsException e) { if (LOG.isDebugEnabled()) { LOG.debug("Unable to create app parent node " + rootNodePath + @@ -1248,76 +1259,6 @@ void delete(final String path) throws Exception { zkManager.delete(path); } - private void safeCreate(String path, byte[] data, List acl, - CreateMode mode) throws Exception { - if (!exists(path)) { - SafeTransaction transaction = new SafeTransaction(); - transaction.create(path, data, acl, mode); - transaction.commit(); - } - } - - /** - * Deletes the path. Checks for existence of path as well. - * @param path Path to be deleted. - * @throws Exception if any problem occurs while performing deletion. - */ - private void safeDelete(final String path) throws Exception { - if (exists(path)) { - SafeTransaction transaction = new SafeTransaction(); - transaction.delete(path); - transaction.commit(); - } - } - - private void safeSetData(String path, byte[] data, int version) - throws Exception { - SafeTransaction transaction = new SafeTransaction(); - transaction.setData(path, data, version); - transaction.commit(); - } - - /** - * Use curator transactions to ensure zk-operations are performed in an all - * or nothing fashion. This is equivalent to using ZooKeeper#multi. - * - * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll - * have to rewrite this inner class when we adopt that. - */ - private class SafeTransaction { - private CuratorTransactionFinal transactionFinal; - - SafeTransaction() throws Exception { - CuratorFramework curatorFramework = zkManager.getCurator(); - CuratorTransaction transaction = curatorFramework.inTransaction(); - transactionFinal = transaction.create() - .withMode(CreateMode.PERSISTENT).withACL(zkAcl) - .forPath(fencingNodePath, new byte[0]).and(); - } - - public void commit() throws Exception { - transactionFinal = transactionFinal.delete() - .forPath(fencingNodePath).and(); - transactionFinal.commit(); - } - - public void create(String path, byte[] data, List acl, CreateMode mode) - throws Exception { - transactionFinal = transactionFinal.create() - .withMode(mode).withACL(acl).forPath(path, data).and(); - } - - public void delete(String path) throws Exception { - transactionFinal = transactionFinal.delete().forPath(path).and(); - } - - public void setData(String path, byte[] data, int version) - throws Exception { - transactionFinal = transactionFinal.setData() - .withVersion(version).forPath(path, data).and(); - } - } - /** * Helper class that periodically attempts creating a znode to ensure that * this RM continues to be the Active. @@ -1332,7 +1273,7 @@ public void run() { try { while (!isFencedState()) { // Create and delete fencing node - new SafeTransaction().commit(); + zkManager.createTransaction(zkAcl, fencingNodePath).commit(); Thread.sleep(zkSessionTimeout); } } catch (InterruptedException ie) {