From 6fa0b83a211ee4868e0392ea86e6f231e794a738 Mon Sep 17 00:00:00 2001 From: Garry Weng Date: Thu, 14 Apr 2016 15:44:22 +0800 Subject: [PATCH] node labels store in zookeeper --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 10 + .../yarn/nodelabels/ZookeeperNodeLabelsStore.java | 826 +++++++++++++++++++++ .../nodelabels/TestZooKeeperNodeLabelsStore.java | 204 +++++ 3 files changed, 1040 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/ZookeeperNodeLabelsStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestZooKeeperNodeLabelsStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8acee57..4ad5e38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2293,6 +2293,16 @@ public static boolean isAclEnabled(Configuration conf) { + "enabled"; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; + public static final String ZK_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + + "zk-store.root-dir"; + + public static final String ZK_NODE_LABELS_STORE_DIR = NODE_LABELS_PREFIX + + "zk-store.labels-dir"; + + public static final String ZK_NODETOLABELS_STORE_DIR = NODE_LABELS_PREFIX + + "zk-store.nodetolabels-dir"; + + public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/ZookeeperNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/ZookeeperNodeLabelsStore.java new file mode 100644 index 0000000..69d6b27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/ZookeeperNodeLabelsStore.java @@ -0,0 +1,826 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.KeeperException.Code; + +/** + * Follow the logic of NonAppendableFSNodeLabelStore, snapshot the state of + * CommonNodeLabelsStore directly. + * 1.Since the limit of ZNode size is 1M, we use hashcode of nodeId to divide + * the relationship of nodeid and labels to several buckets + * 2.Since the zkClient.multi only limit 1M data to guarantee atomic write operation, + * we use tag the latest version to guarantee write finished or not. In that way, we can + * store more nodelabel info. + * + */ +public class ZookeeperNodeLabelsStore extends NodeLabelsStore { + + protected static final Log LOG = LogFactory + .getLog(ZookeeperNodeLabelsStore.class); + + private static final int DEFAULT_NODEID_BUCKETS_NUM = 60; + + protected static final String DEFAULT_DIR_NAME = "node-labels"; + protected static final String DEFAULT_BUCKET_PREFIX = "nodeid-bucket"; + protected static final String DEFAULT_NODE_LABEL_LATEST_VERSION = + "node-label-latest-version"; + + private String zkWorkingPath; + + private String defaultzkLatestLabelsPath; + private String zkLatestLabelsPath; + private String zkNewLabelsPath; + + private String defaultzkLatestNodeToLabelsPath; + private String zkLatestNodeToLabelsPath; + private String zkNewNodeToLabelsPath; + + private String latestVersion; + private String newVersion; + + private List zkAcl; + private List zkAuths; + private ZooKeeper zkClient; + private String zkHost; + private int zkSessionTimeout; + private int numRetries; + private long zkResyncWaitTime; + private long zkRetryInterval; + + /* + * activeZkClient is not used to do actual operations, it is only used to + * verify client session for watched events and it gets activated into + * zkClient on connection event. + */ + private ZooKeeper activeZkClient; + + private boolean isCheckNodeIdBucketNumConsistent = false; + private boolean checkNodeIdBucketNumConsistentResult = false; + + private String currentSerialNum; + + private String getDefaultZKNodeLabelsRootDir() throws IOException { + return YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/" + + DEFAULT_DIR_NAME; + } + + private String getDefaultZKNodeLabelsDir() throws IOException { + return YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/" + + DEFAULT_DIR_NAME + "/labels"; + } + + private String getDefaultZKNodeToLabelsDir() throws IOException { + return YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/" + + DEFAULT_DIR_NAME + "/nodeToLabels"; + } + + private void getLatestNodeLabelPath() throws Exception { + byte[] version = + getData(zkWorkingPath + "/" + DEFAULT_NODE_LABEL_LATEST_VERSION, null); + if (version == null) { + zkLatestLabelsPath = defaultzkLatestLabelsPath; + zkLatestNodeToLabelsPath = defaultzkLatestNodeToLabelsPath; + } else { + latestVersion = new String(version); + zkLatestLabelsPath = defaultzkLatestLabelsPath + latestVersion; + zkLatestNodeToLabelsPath = + defaultzkLatestNodeToLabelsPath + latestVersion; + } + LOG.info("get latest labels path " + zkLatestLabelsPath + + " and node to labels path " + zkLatestNodeToLabelsPath); + } + + private void getNewNodeLabelPath() { + // get a not exist version + try { + zkNewLabelsPath = + createSequentialDirWithParents(defaultzkLatestLabelsPath); + if (zkNewLabelsPath == null || zkNewLabelsPath.isEmpty()) { + LOG.error("get New NodeLabel Path fail, the new path is empty."); + return; + } + newVersion = + zkNewLabelsPath.substring(defaultzkLatestLabelsPath.length()); + zkNewNodeToLabelsPath = defaultzkLatestNodeToLabelsPath + newVersion; + createDirWithParents(zkNewNodeToLabelsPath); + } catch (Exception e) { + LOG.error("get New NodeLabel Path fail. "); + e.printStackTrace(); + } + } + + private Stat setLatestNodeLabelVersion(String version) throws Exception { + return setData( + getNodePath(zkWorkingPath, DEFAULT_NODE_LABEL_LATEST_VERSION), + version.getBytes(), -1); + } + + @Override + public void init(Configuration conf) throws Exception { + zkHost = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHost == null) { + throw new YarnRuntimeException("No server address specified for " + + "zookeeper state store for node labels recovery. " + + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + + zkSessionTimeout = + conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + zkAcl = getZKAcls(conf); + zkAuths = getZKAuths(conf); + + numRetries = + conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, + YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); + + zkRetryInterval = + conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); + + zkResyncWaitTime = zkRetryInterval * numRetries; + + zkWorkingPath = + conf.get(YarnConfiguration.ZK_NODE_LABELS_STORE_ROOT_DIR, + getDefaultZKNodeLabelsRootDir()); + defaultzkLatestLabelsPath = + conf.get(YarnConfiguration.ZK_NODE_LABELS_STORE_DIR, + getDefaultZKNodeLabelsDir()); + defaultzkLatestNodeToLabelsPath = + conf.get(YarnConfiguration.ZK_NODETOLABELS_STORE_DIR, + getDefaultZKNodeToLabelsDir()); + + createConnection(); + + if (null == existsPath(zkWorkingPath, null)) { + createDirWithParents(zkWorkingPath); + } + + if (null == existsPath( + getNodePath(zkWorkingPath, DEFAULT_NODE_LABEL_LATEST_VERSION), null)) { + createDirWithParents(getNodePath(zkWorkingPath, + DEFAULT_NODE_LABEL_LATEST_VERSION)); + } + + } + + public boolean initNodeIdBuckets(String path) { + try { + if (null == existsPath(path, null)) { + createDirWithParents(path); + } + List execOpList = new ArrayList(); + Op createOp = null; + for (int i = 0; i < DEFAULT_NODEID_BUCKETS_NUM; i++) { + String nodeLabelPath = getNodePath(path, getPartitionZnodeName(i)); + createOp = Op.create(nodeLabelPath, null, zkAcl, CreateMode.PERSISTENT); + execOpList.add(createOp); + } + doStoreMulti(execOpList); + return true; + } catch (Exception e) { + LOG.error("initionlize NodeId Buckets fail."); + e.printStackTrace(); + } + return false; + } + + @Override + public void close() throws IOException { + closeZkClients(); + } + + String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } + + @Override + public void updateNodeToLabelsMappings(Map> nodeToLabels) + throws IOException { + writeNewMirror(); + } + + @Override + public void storeNewClusterNodeLabels(List label) throws IOException { + writeNewMirror(); + } + + @Override + public void removeClusterNodeLabels(Collection labels) + throws IOException { + writeNewMirror(); + } + + @Override + public void recover() throws IOException { + + + try { + getLatestNodeLabelPath(); + if (null == existsPath(zkLatestLabelsPath, null) + || null == existsPath(zkLatestNodeToLabelsPath, null)) { + LOG.error("Latest LabelsPath " + zkLatestLabelsPath + " not exist or " + + "Latest NodeToLabelsPath " + zkLatestNodeToLabelsPath + + " not exist"); + if (latestVersion == null || latestVersion.isEmpty()) {// ignore cold + // start case + return; + } else { + throw new IOException( + "recover node label path not exist , recover node label abort"); + } + } + byte[] data = getData(zkLatestLabelsPath, null); + AddToClusterNodeLabelsRequestPBImpl nodeLabels = + data != null ? new AddToClusterNodeLabelsRequestPBImpl( + AddToClusterNodeLabelsRequestProto.parseFrom(data)) + : new AddToClusterNodeLabelsRequestPBImpl(); + + List currentLabels = nodeLabels.getNodeLabels(); + mgr.addToCluserNodeLabels(currentLabels); + + Map> NodeIdToLabelsMap = getNodeToLabelsMapFromZK(); + mgr.replaceLabelsOnNode(NodeIdToLabelsMap); + + } catch (Exception e) { + LOG.error("recover labels from zookeeper " + zkLatestLabelsPath + + " fails"); + e.printStackTrace(); + throw new IOException( + "get data from zookeeper fail, recover node label abort"); + } + } + + private boolean checkNodeIdBucketNumConsistent() { + if (isCheckNodeIdBucketNumConsistent) { + return checkNodeIdBucketNumConsistentResult; + } + + try { + Set nodeToLabelsSet = new HashSet(); + List nodeToLabelsList = getChildren(zkLatestNodeToLabelsPath, null); + nodeToLabelsSet = new HashSet(nodeToLabelsList); + isCheckNodeIdBucketNumConsistent = true; + if (nodeToLabelsSet.size() != 0 + && nodeToLabelsSet.size() != DEFAULT_NODEID_BUCKETS_NUM) { + LOG.fatal("the size of nodeToLabels buckets " + nodeToLabelsSet.size() + + " is not same as the default " + DEFAULT_NODEID_BUCKETS_NUM + + " stop to recover"); + checkNodeIdBucketNumConsistentResult = false; + return false; + } + checkNodeIdBucketNumConsistentResult = true; + return true; + } catch (Exception e) { + LOG.error("access zookeeper " + zkLatestLabelsPath + " fails"); + } + return false; + } + + + @SuppressWarnings("unchecked") + private void writeNewMirror() throws IOException { + + ReentrantReadWriteLock.ReadLock readLock = mgr.readLock; + + getNewNodeLabelPath(); + if (!initNodeIdBuckets(zkNewNodeToLabelsPath)) { + LOG.error("node id buckets initilize fails"); + return; + } + + try { + readLock.lock(); + + List nodeLabels = mgr.getClusterNodeLabels(); + byte[] data = + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest + .newInstance(nodeLabels)).getProto().toByteArray(); + + setData(zkNewLabelsPath, data, -1); + + Map> nodeToLabels = mgr.getNodeLabels(); + HashMap> nodeToLabelsBuckets[] = + new HashMap[DEFAULT_NODEID_BUCKETS_NUM]; + + for (Map.Entry> entry : nodeToLabels.entrySet()) { + int iteration = + Math.abs(entry.getKey().hashCode() % DEFAULT_NODEID_BUCKETS_NUM); + if (nodeToLabelsBuckets[iteration] == null) { + nodeToLabelsBuckets[iteration] = new HashMap>(); + } + nodeToLabelsBuckets[iteration].put(entry.getKey(), entry.getValue()); + } + for (int i = 0; i < DEFAULT_NODEID_BUCKETS_NUM; i++) { + String nodeLabelPath = + getNodePath(zkNewNodeToLabelsPath, getPartitionZnodeName(i)); + + data = + nodeToLabelsBuckets[i] != null ? ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(nodeToLabelsBuckets[i])).getProto().toByteArray() + : null; + + setData(nodeLabelPath, data, -1); + } + getLatestNodeLabelPath(); + String zkOldLabelsPath = zkLatestLabelsPath; + String zkOldNodeToLabelsPath = zkLatestNodeToLabelsPath; + setLatestNodeLabelVersion(newVersion); + cleanUpOldVersion(zkOldLabelsPath, zkOldNodeToLabelsPath); + } catch (Exception e) { + LOG.error("Execute NodeToLabels update in zookeeper fail " + + zkLatestNodeToLabelsPath); + e.printStackTrace(); + } finally { + readLock.unlock(); + } + } + + private Map> getNodeToLabelsMapFromZK() { + + Map> nodeToLabelsMap = + new HashMap>(); + + try { + Set nodeToLabelsSet = new HashSet(); + + List nodeToLabelsList = getChildren(zkLatestNodeToLabelsPath, null); + nodeToLabelsSet = new HashSet(nodeToLabelsList); + + for (String nodeIdString : nodeToLabelsSet) { + // skip improper node + int index = getPartitionNumFromZnodeName(nodeIdString); + if (!nodeIdString.startsWith(DEFAULT_BUCKET_PREFIX) || index == -1) { + LOG.info("Skip improper node " + nodeIdString + " without prefix " + + DEFAULT_BUCKET_PREFIX); + continue; + } + + String nodeToLabelPath = getNodePath(zkLatestNodeToLabelsPath, nodeIdString); + byte[] nodeToLabelData = getData(nodeToLabelPath, null); + + ReplaceLabelsOnNodeRequest nodeIdToLabels = + nodeToLabelData != null ? new ReplaceLabelsOnNodeRequestPBImpl( + ReplaceLabelsOnNodeRequestProto.parseFrom(nodeToLabelData)) + : null; + if (nodeIdToLabels != null) { + nodeToLabelsMap.putAll(nodeIdToLabels.getNodeToLabels()); + } + } + } catch (Exception e) { + LOG.error("recover labels from zookeeper " + zkLatestLabelsPath + " fails"); + } + return nodeToLabelsMap; + } + + private String getPartitionZnodeName(int i) { + return DEFAULT_BUCKET_PREFIX + i; + } + + private int getPartitionNumFromZnodeName(String znodeName) { + int result = -1; + try { + String numString = znodeName.substring(DEFAULT_BUCKET_PREFIX.length()); + result = Integer.parseInt(numString); + } catch (NumberFormatException e) { + LOG.error("fail to get partition num from Znode name"); + } + return result; + } + + private List getZKAcls(Configuration conf) throws Exception { + // Parse authentication from configuration. + String zkAclConf = + conf.get(YarnConfiguration.RM_ZK_ACL, + YarnConfiguration.DEFAULT_RM_ZK_ACL); + try { + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + return ZKUtil.parseACLs(zkAclConf); + } catch (Exception e) { + LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); + throw e; + } + } + + private List getZKAuths(Configuration conf) + throws Exception { + String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); + try { + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + if (zkAuthConf != null) { + return ZKUtil.parseAuth(zkAuthConf); + } else { + return Collections.emptyList(); + } + } catch (Exception e) { + LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); + throw e; + } + } + + private synchronized void createConnection() throws IOException, + InterruptedException { + closeZkClients(); + for (int retries = 0; retries < numRetries && zkClient == null; retries++) { + try { + activeZkClient = getNewZooKeeper(); + zkClient = activeZkClient; + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); + } + } catch (IOException ioe) { + // Retry in case of network failures + LOG.info("Failed to connect to the ZooKeeper on attempt - " + + (retries + 1)); + ioe.printStackTrace(); + } + } + if (zkClient == null) { + LOG.error("Unable to connect to Zookeeper"); + throw new YarnRuntimeException("Unable to connect to Zookeeper"); + } + notifyAll(); + LOG.info("Created new ZK connection"); + } + + private final class ForwardingWatcher implements Watcher { + private ZooKeeper watchedZkClient; + + public ForwardingWatcher(ZooKeeper client) { + this.watchedZkClient = client; + } + + @Override + public void process(WatchedEvent event) { + try { + processWatchEvent(watchedZkClient, event); + } catch (Throwable t) { + LOG.error("Failed to process watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } + } + } + + // use watch to handle the SyncConnected, Disconnected, Expired event. + private synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) + throws Exception { + // only process watcher event from current ZooKeeper Client session. + if (zk != activeZkClient) { + LOG.info("Ignore watcher event type: " + event.getType() + " with state:" + + event.getState() + " for path:" + event.getPath() + + " from old session"); + return; + } + + Event.EventType eventType = event.getType(); + LOG.info("Watcher event type: " + eventType + " with state:" + + event.getState() + " for path:" + event.getPath() + " for " + this); + + if (eventType == Event.EventType.None) { + + // the connection state has changed + switch (event.getState()) { + case SyncConnected: + LOG.info("ZKRMStateStore Session connected"); + if (zkClient == null) { + // the SyncConnected must be from the client that sent Disconnected + zkClient = activeZkClient; + notifyAll(); + LOG.info("ZKRMStateStore Session restored"); + } + break; + case Disconnected: + LOG.info("ZKRMStateStore Session disconnected"); + zkClient = null; + break; + case Expired: + // the connection got terminated because of session timeout + // call listener to reconnect + LOG.info("ZKRMStateStore Session expired"); + createConnection(); + break; + default: + LOG.error("Unexpected Zookeeper" + " watch event state: " + + event.getState()); + break; + } + } + } + + private synchronized ZooKeeper getNewZooKeeper() throws IOException, + InterruptedException { + ZooKeeper zk = new ZooKeeper(zkHost, zkSessionTimeout, null); + zk.register(new ForwardingWatcher(zk)); + return zk; + } + + private synchronized void closeZkClients() throws IOException { + zkClient = null; + if (activeZkClient != null) { + try { + activeZkClient.close(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing ZK", e); + } + activeZkClient = null; + } + } + + private Stat existsPath(final String path, final Watcher watch) + throws Exception { + return new ZKAction() { + @Override + Stat run() throws KeeperException, InterruptedException { + return zkClient.exists(path, watch); + } + }.runWithRetries(); + } + + private void createDir(final String path) throws Exception { + new ZKAction() { + @Override + Void run() throws KeeperException, InterruptedException { + zkClient.create(path, null, zkAcl, CreateMode.PERSISTENT); + return null; + } + }.runWithRetries(); + } + + private String createSequentialDir(final String path) throws Exception { + return new ZKAction() { + @Override + String run() throws KeeperException, InterruptedException { + return zkClient.create(path, null, zkAcl, + CreateMode.PERSISTENT_SEQUENTIAL); + } + }.runWithRetries(); + } + + private String createSequentialDirWithParents(String path) throws Exception { + if (path.length() > 0 && path.lastIndexOf("/") > 0) { + String temp = path.substring(0, path.lastIndexOf("/")); + createDirWithParents(temp); + return createSequentialDir(path); + } else { + return ""; + } + } + + private void createDirWithParents(String path) throws Exception { + if (path.length() > 0 && null == existsPath(path, null)) { + String temp = path.substring(0, path.lastIndexOf("/")); + createDirWithParents(temp); + createDir(path); + } else { + return; + } + } + + private List getChildren(final String path, final Watcher watch) + throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } + + private byte[] getData(final String path, final Watcher watch) + throws Exception { + return new ZKAction() { + @Override + public byte[] run() throws KeeperException, InterruptedException { + return zkClient.getData(path, watch, null); + } + }.runWithRetries(); + } + + private Stat setData(final String path, final byte[] data, final int version) + throws Exception { + return new ZKAction() { + @Override + Stat run() throws KeeperException, InterruptedException { + return zkClient.setData(path, data, version); + } + }.runWithRetries(); + } + + private synchronized void doStoreMulti(final List opList) + throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.multi(opList); + return null; + } + }.runWithRetries(); + } + + private synchronized void delPath(final String path, final int version) + throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.delete(path, version); + return null; + } + }.runWithRetries(); + } + + private synchronized void delPathWithChildren(String path, int version) + throws Exception { + List children = this.getChildren(path, null); + for (String child : children) { + delPathWithChildren(getNodePath(path, child), version); + } + delPath(path, version); + } + + private void cleanUpOldVersion(String zkOldLabelsPath,String zkOldNodeToLabelsPath) { + try{ + delPathWithChildren(zkOldLabelsPath, -1); + delPathWithChildren(zkOldNodeToLabelsPath, -1); + } catch (Exception e) { + LOG.error("clean up old version fail, zkOldLabelsPath " + zkOldLabelsPath + + " zkOldNodeToLabelsPath " + zkOldNodeToLabelsPath); + e.printStackTrace(); + } + } + + + private abstract class ZKAction { + private boolean hasDeleteNodeOp = false; + + void setHasDeleteNodeOp(boolean hasDeleteOp) { + this.hasDeleteNodeOp = hasDeleteOp; + } + + // run() expects synchronization on ZKRMStateStore.this + abstract T run() throws KeeperException, InterruptedException; + + T runWithCheck() throws Exception { + long startTime = System.currentTimeMillis(); + synchronized (ZookeeperNodeLabelsStore.this) { + while (zkClient == null) { + ZookeeperNodeLabelsStore.this.wait(zkResyncWaitTime); + if (zkClient != null) { + break; + } + if (System.currentTimeMillis() - startTime > zkResyncWaitTime) { + throw new IOException("Wait for ZKClient creation timed out"); + } + } + return run(); + } + } + + private boolean shouldRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(Code code) { + // For fast recovery, we choose to close current connection after + // SESSIONMOVED occurs. Latest state of a zknode path is ensured by + // following zk.sync(path) operation. + switch (code) { + case SESSIONEXPIRED: + case SESSIONMOVED: + return true; + default: + break; + } + return false; + } + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return runWithCheck(); + } catch (KeeperException ke) { + if (ke.code() == Code.NODEEXISTS) { + LOG.info("znode already exists!"); + return null; + } + if (hasDeleteNodeOp && ke.code() == Code.NONODE) { + LOG.info("znode has already been deleted!"); + return null; + } + + LOG.info("Exception while executing a ZK operation.", ke); + retry++; + if (shouldRetry(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK. Retry no. " + retry); + Thread.sleep(zkRetryInterval); + continue; + } + if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + syncInternal(ke.getPath()); + continue; + } + LOG.info("Maxed out ZK retries. Giving up!"); + throw ke; + } + } + } + + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc == Code.OK.intValue()) { + LOG.info("ZooKeeper sync operation succeeded. path: " + path); + } else { + LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + + "timeout. path: " + path); + } + } + } + + private void syncInternal(final String path) throws InterruptedException { + if (path == null) { + LOG.error("the sync path is null."); + return; + } + final ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + final String pathForSync = path; + try { + new ZKAction() { + @Override + Void run() throws KeeperException, InterruptedException { + zkClient.sync(pathForSync, cb, null); + return null; + } + }.runWithRetries(); + } catch (Exception e) { + LOG.fatal("sync failed."); + } + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestZooKeeperNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestZooKeeperNodeLabelsStore.java new file mode 100644 index 0000000..8fd8729 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestZooKeeperNodeLabelsStore.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestZooKeeperNodeLabelsStore extends NodeLabelTestBase { + MockNodeLabelManager mgr = null; + Configuration conf = null; + + private static class MockNodeLabelManager extends + CommonNodeLabelsManager { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } + } + + private ZookeeperNodeLabelsStore getStore() { + return (ZookeeperNodeLabelsStore) mgr.store; + } + + @Before + public void before() throws IOException { + mgr = new MockNodeLabelManager(); + conf = new Configuration(); + conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, + "org.apache.hadoop.yarn.nodelabels.ZookeeperNodeLabelsStore"); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, "127.0.0.1:2181"); + conf.set(YarnConfiguration.RM_ZK_NUM_RETRIES, "10"); + conf.set(YarnConfiguration.RM_ZK_TIMEOUT_MS, "100000000"); + conf.set(YarnConfiguration.NODE_LABELS_ENABLED, "true"); + + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 100000000) + public void testRecoverWithMirror() throws Exception { + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3")); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p4")); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p5", "p6")); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"), + toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6"))); + + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeFromClusterNodeLabels(toSet("p1")); + mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + System.out.println(mgr.getClusterNodeLabels().size()); + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabelNames().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + + // stutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabelNames().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 100000000) + public void testMoreNodesWithLabels() throws Exception { + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p1", "p2", "p3")); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p4")); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("p5", "p6")); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"))); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"), + toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"), + toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6"))); + + Map> BigNodeLabels = new HashMap>(); + Set tempSet1 = new HashSet(); + tempSet1.add("p2"); + for (int i = 8; i < 100001; i++) { + BigNodeLabels.put(toNodeId("n" + i), tempSet1); + } + mgr.replaceLabelsOnNode(BigNodeLabels); + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeFromClusterNodeLabels(toSet("p1")); + mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5")); + + Assert.assertEquals(mgr.getNodeLabels().size(), 99997); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabelNames().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + Assert.assertEquals(mgr.getNodeLabels().size(), 99997); + // stutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockNodeLabelManager(); + mgr.init(conf); + mgr.start(); + + // check variables + Assert.assertEquals(3, mgr.getClusterNodeLabels().size()); + Assert.assertTrue(mgr.getClusterNodeLabelNames().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"), + toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"), + toNodeId("n7"), toSet("p6"))); + mgr.stop(); + } +} -- 1.9.4.msysgit.0