diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index cc8ba6e..70affa0 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1058,7 +1058,7 @@ public class HConnectionManager { } if (this.zooKeeper != null) { LOG.info("Closed zookeeper sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); this.zooKeeper.close(); this.zooKeeper = null; } diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f82e4a0..eed62ee 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -994,7 +994,7 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.getServerName()); // Async exists to set a watcher so we'll get triggered when // unassigned node changes. - this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw, + this.zkw.getZooKeeper().exists(path, this.zkw, new ExistsUnassignedAsyncCallback(this.counter), ctx); } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c4aec4e..250a8cf 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -348,7 +348,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { LOG.info("Server active/primary master; " + this.address + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + ", cluster-up flag was=" + wasUp); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index adedd34..dada818 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -173,7 +173,6 @@ public class SplitLogManager extends ZooKeeperListener { * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split - * @throws KeeperException */ public long splitLogDistributed(final Path logDir) throws IOException { this.fs = logDir.getFileSystem(conf); @@ -324,8 +323,7 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatch(String path, Long retry_count) { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(path, this.watcher, + this.watcher.getZooKeeper().getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); tot_mgr_get_data_queued.incrementAndGet(); } @@ -479,8 +477,7 @@ public class SplitLogManager extends ZooKeeperListener { private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - delete(path, -1, new DeleteAsyncCallback(), + this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), retries); } @@ -507,18 +504,12 @@ public class SplitLogManager extends ZooKeeperListener { /** * signal the workers that a task was resubmitted by creating the * RESCAN node. - * @throws KeeperException */ private void createRescanNode(long retries) { - try{ - watcher.getRecoverableZooKeeper().create(ZKSplitLog.getRescanNode(watcher), - TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL, - new CreateRescanAsyncCallback(), new Long(retries)); - } catch(KeeperException e) { - LOG.error("Zookeeper Exception: " + e.getCause()); - e.printStackTrace(); - } + watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), + TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL, + new CreateRescanAsyncCallback(), new Long(retries)); } private void createRescanSuccess(String path) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cdc62ac..d0a1e11 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -888,7 +888,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, LOG.info("Serving as " + this.serverInfo.getServerName() + ", RPC listening on " + this.server.getListenerAddress() + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); isOnline = true; } catch (Throwable e) { this.isOnline = false; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 563d3d5..e9d0f0a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -323,7 +323,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { */ private boolean ownTask(boolean isFirstTime) { try { - Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, + Stat stat = this.watcher.getZooKeeper().setData(currentTask, TaskState.TASK_OWNED.get(serverName), currentVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + currentTask); @@ -386,9 +386,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } void getDataSetWatchAsync() { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(currentTask, this.watcher, - new GetDataAsyncCallback(), null); + this.watcher.getZooKeeper().getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); tot_wkr_get_data_queued.incrementAndGet(); } diff --git a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 4b81825..d106275 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -300,16 +300,7 @@ public class Bytes { public static String toStringBinary(final byte [] b) { return toStringBinary(b, 0, b.length); } - - /** - * The same as {@link #toStringBinary(byte[])}, but returns a string "null" - * if given a null argument. - */ - public static String toStringBinarySafe(final byte [] b) { - if (b == null) - return "null"; - return toStringBinary(b, 0, b.length); - } + /** * Write a printable representation of a byte array. Non-printable * characters are hex escaped in the format \\x%02X, eg: diff --git a/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java deleted file mode 100644 index 6f9cbc0..0000000 --- a/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.TimeUnit; - -public class RetryCounter { - private final int maxRetries; - private int retriesRemaining; - private final int retryIntervalMillis; - private final TimeUnit timeUnit; - - public RetryCounter(int maxRetries, - int retryIntervalMillis, TimeUnit timeUnit) { - this.maxRetries = maxRetries; - this.retriesRemaining = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; - this.timeUnit = timeUnit; - } - - public int getMaxRetries() { - return maxRetries; - } - - public void sleepUntilNextRetry() throws InterruptedException { - timeUnit.sleep(retryIntervalMillis); - } - - public boolean shouldRetry() { - return retriesRemaining > 0; - } - - public void useRetry() { - retriesRemaining--; - } - - public int getAttemptTimes() { - return maxRetries-retriesRemaining+1; - } -} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java b/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java deleted file mode 100644 index 5c9e404..0000000 --- a/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.TimeUnit; - -public class RetryCounterFactory { - private final int maxRetries; - private final int retryIntervalMillis; - - public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { - this.maxRetries = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; - } - - public RetryCounter create() { - return - new RetryCounter( - maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS - ); - } -} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java deleted file mode 100644 index e61d3d3..0000000 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ /dev/null @@ -1,650 +0,0 @@ -package org.apache.hadoop.hbase.zookeeper; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.AsyncCallback.StatCallback; -import org.apache.zookeeper.AsyncCallback.StringCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.ZooKeeper.States; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling - * To handle recoverable errors, developers need to realize that there are two - * classes of requests: idempotent and non-idempotent requests. Read requests - * and unconditional sets and deletes are examples of idempotent requests, they - * can be reissued with the same results. - * (Although, the delete may throw a NoNodeException on reissue its effect on - * the ZooKeeper state is the same.) Non-idempotent requests need special - * handling, application and library writers need to keep in mind that they may - * need to encode information in the data or name of znodes to detect when a - * retries. A simple example is a create that uses a sequence flag. - * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection - * loss exception, that process will reissue another - * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a - * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be - * that x-109 was the result of the previous create, so the process actually - * owns both x-109 and x-111. An easy way around this is to use "x-process id-" - * when doing the create. If the process is using an id of 352, before reissuing - * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", - * "x-352-109", x-333-110". The process will know that the original create - * succeeded an the znode it created is "x-352-109". - * - * Reuse part of the code from calligraphus project in the datafreeway team - */ - -public class RecoverableZooKeeper { - private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); - // the actual ZooKeeper client instance - private ZooKeeper zk; - private final RetryCounterFactory retryCounterFactory; - // An identifier of this process in the cluster - private final String identifier; - private final byte[] id; - private int retryIntervalMillis; - - private static final int ID_OFFSET = Bytes.SIZEOF_INT; - // the magic number is to be backward compatible - private static final byte MAGIC =(byte) 0XFF; - private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE; - - public RecoverableZooKeeper(String quorumServers,int seesionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) - throws IOException { - this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher); - this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); - this.retryIntervalMillis = retryIntervalMillis; - - // the identifier = processID@hostName - this.identifier = ManagementFactory.getRuntimeMXBean().getName(); - LOG.info("The identifier of this process is "+identifier); - this.id = Bytes.toBytes(identifier); - } - - /** - * delete is an idempotent operation. Retry before throw out exception. - * This function will not throw out NoNodeException if the path is not existed - * @param path - * @param version - * @throws InterruptedException - * @throws KeeperException - */ - public void delete(String path, int version) - throws InterruptedException, KeeperException { - RetryCounter retryCounter = retryCounterFactory.create(); - boolean isRetry = false; // False for first attempt, true for all retries. - while (true) { - try { - zk.delete(path, version); - return; - } catch (KeeperException e) { - switch (e.code()) { - case NONODE: - if (isRetry) { - LOG.info("Node " + path + " already deleted. Assuming that a " + - "previous attempt succeeded."); - return; - } - LOG.warn("Node " + path + " already deleted, and this is not a " + - "retry"); - throw e; - - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper delete failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - isRetry = true; - } - } - - /** - * exists is an idempotent operation. Retry before throw out exception - * @param path - * @param watcher - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public Stat exists(String path, Watcher watcher) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.exists(path, watcher); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper exists failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * exists is an idempotent operation. Retry before throw out exception - * @param path - * @param watch - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public Stat exists(String path, boolean watch) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.exists(path, watch); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper exists failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * getChildren is an idempotent operation. Retry before throw out exception - * @param path - * @param watcher - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public List getChildren(String path, Watcher watcher) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.getChildren(path, watcher); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper getChildren failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * getChildren is an idempotent operation. Retry before throw out exception - * @param path - * @param watch - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public List getChildren(String path, boolean watch) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - return zk.getChildren(path, watch); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper getChildren failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * getData is an idempotent operation. Retry before throw out exception - * @param path - * @param watcher - * @param stat - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public byte[] getData(String path, Watcher watcher, Stat stat) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - byte[] revData = zk.getData(path, watcher, stat); - return this.removeMetaData(revData); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper getData failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * getData is an idemnpotent operation. Retry before throw out exception - * @param path - * @param watch - * @param stat - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public byte[] getData(String path, boolean watch, Stat stat) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try { - byte[] revData = zk.getData(path, watch, stat); - return this.removeMetaData(revData); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper getData failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - * setData is NOT an idempotent operation. Retry may cause BadVersion Exception - * Adding an identifier field into the data to check whether - * badversion is caused by the result of previous correctly setData - * @param path - * @param data - * @param version - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public Stat setData(String path, byte[] data, int version) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - - byte[] newData = appendMetaData(data); - while (true) { - try { - return zk.setData(path, newData, version); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper setData failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - case BADVERSION: - // try to verify whether the previous setData success or not - try{ - Stat stat = new Stat(); - byte[] revData = zk.getData(path, false, stat); - int idLength = Bytes.toInt(revData, ID_OFFSET); - int dataLength = revData.length-ID_OFFSET-idLength; - int dataOffset = ID_OFFSET+idLength; - - if(Bytes.compareTo(revData, ID_OFFSET, id.length, - revData, dataOffset, dataLength) == 0) { - // the bad version is caused by previous successful setData - return stat; - } - } catch(KeeperException keeperException){ - // the ZK is not reliable at this moment. just throw out exception - throw keeperException; - } - - // throw out other exceptions and verified bad version exceptions - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - /** - *

- * NONSEQUENTIAL create is idempotent operation. - * Retry before throw out exceptions. - * But this function will not throw out the NodeExist exception back to the - * application. - *

- *

- * But SEQUENTIAL is NOT idempotent operation. It is necessary to add - * identifier to the path to verify, whether the previous one is successful - * or not. - *

- * - * @param path - * @param data - * @param acl - * @param createMode - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public String create(String path, byte[] data, List acl, - CreateMode createMode) throws KeeperException, InterruptedException { - byte[] newData = appendMetaData(data); - switch (createMode) { - case EPHEMERAL: - case PERSISTENT: - return createNonSequential(path, newData, acl, createMode); - - case EPHEMERAL_SEQUENTIAL: - case PERSISTENT_SEQUENTIAL: - return createSequential(path, newData, acl, createMode); - - default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + - createMode); - } - } - - private String createNonSequential(String path, byte[] data, List acl, - CreateMode createMode) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - boolean isRetry = false; // False for first attempt, true for all retries. - while (true) { - try { - return zk.create(path, data, acl, createMode); - } catch (KeeperException e) { - switch (e.code()) { - case NODEEXISTS: - if (isRetry) { - // If the connection was lost, there is still a possibility that - // we have successfully created the node at our previous attempt, - // so we read the node and compare. - byte[] currentData = zk.getData(path, false, null); - if (currentData != null && - Bytes.compareTo(currentData, data) == 0) { - // We successfully created a non-sequential node - return path; - } - LOG.error("Node " + path + " already exists with " + - Bytes.toStringBinarySafe(currentData) + ", could not write " + - Bytes.toStringBinarySafe(data)); - throw e; - } - LOG.error("Node " + path + " already exists and this is not a " + - "retry"); - throw e; - - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper create failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - isRetry = true; - } - } - - private String createSequential(String path, byte[] data, - List acl, CreateMode createMode) - throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - boolean first = true; - String newPath = path+this.identifier; - while (true) { - try { - if (!first) { - // Check if we succeeded on a previous attempt - String previousResult = findPreviousSequentialNode(newPath); - if (previousResult != null) { - return previousResult; - } - } - first = false; - return zk.create(newPath, data, acl, createMode); - } catch (KeeperException e) { - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - LOG.warn("Possibly transient ZooKeeper exception: " + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper create failed after " - + retryCounter.getMaxRetries() + " retries"); - throw e; - } - break; - - default: - throw e; - } - } - LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + - "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } - } - - private String findPreviousSequentialNode(String path) - throws KeeperException, InterruptedException { - int lastSlashIdx = path.lastIndexOf('/'); - assert(lastSlashIdx != -1); - String parent = path.substring(0, lastSlashIdx); - String nodePrefix = path.substring(lastSlashIdx+1); - - List nodes = zk.getChildren(parent, false); - List matching = filterByPrefix(nodes, nodePrefix); - for (String node : matching) { - String nodePath = parent + "/" + node; - Stat stat = zk.exists(nodePath, false); - if (stat != null) { - return nodePath; - } - } - return null; - } - - public byte[] removeMetaData(byte[] data) { - if(data == null || data.length == 0) { - return data; - } - // check the magic data; to be backward compatible - byte magic = data[0]; - if(magic != MAGIC) { - return data; - } - - int idLength = Bytes.toInt(data, MAGIC_OFFSET); - int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength; - int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength; - - byte[] newData = new byte[dataLength]; - System.arraycopy(data, dataOffset, newData, 0, dataLength); - - return newData; - - } - - private byte[] appendMetaData(byte[] data) { - if(data == null){ - return null; - } - - byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length]; - int pos = 0; - pos = Bytes.putByte(newData, pos, MAGIC); - pos = Bytes.putInt(newData, pos, id.length); - pos = Bytes.putBytes(newData, pos, id, 0, id.length); - pos = Bytes.putBytes(newData, pos, data, 0, data.length); - - return newData; - } - - public long getSessionId() { - return zk.getSessionId(); - } - - public void close() throws InterruptedException { - zk.close(); - } - - public States getState() { - return zk.getState(); - } - - public ZooKeeper getZooKeeper() { - return zk; - } - - public byte[] getSessionPasswd() { - return zk.getSessionPasswd(); - } - - public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { - this.zk.sync(path, null, null); - } - - - /** - * Filters the given node list by the given prefixes. - * This method is all-inclusive--if any element in the node list starts - * with any of the given prefixes, then it is included in the result. - * - * @param nodes the nodes to filter - * @param prefixes the prefixes to include in the result - * @return list of every element that starts with one of the prefixes - */ - private static List filterByPrefix(List nodes, - String... prefixes) { - List lockChildren = new ArrayList(); - for (String child : nodes){ - for (String prefix : prefixes){ - if (child.startsWith(prefix)){ - lockChildren.add(child); - break; - } - } - } - return lockChildren; - } - -} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 1504d1f..7f5b377 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -72,20 +72,20 @@ public class ZKUtil { * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) + public static ZooKeeper connect(Configuration conf, Watcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(properties); return connect(conf, ensemble, watcher); } - public static RecoverableZooKeeper connect(Configuration conf, String ensemble, + public static ZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) throws IOException { return connect(conf, ensemble, watcher, ""); } - public static RecoverableZooKeeper connect(Configuration conf, String ensemble, + public static ZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String descriptor) throws IOException { if(ensemble == null) { @@ -94,11 +94,7 @@ public class ZKUtil { int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); - int retry = conf.getInt("zookeeper.recovery.retry", 3); - int retryIntervalMillis = - conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); - return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis); + return new ZooKeeper(ensemble, timeout, watcher); } // @@ -233,7 +229,7 @@ public class ZKUtil { public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); + Stat s = zkw.getZooKeeper().exists(znode, zkw); LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); return s != null ? true : false; } catch (KeeperException e) { @@ -261,7 +257,7 @@ public class ZKUtil { public static int checkExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); + Stat s = zkw.getZooKeeper().exists(znode, null); return s != null ? s.getVersion() : -1; } catch (KeeperException e) { LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); @@ -298,7 +294,7 @@ public class ZKUtil { ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - List children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); + List children = zkw.getZooKeeper().getChildren(znode, zkw); return children; } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + @@ -390,7 +386,7 @@ public class ZKUtil { List children = null; try { // List the children without watching - children = zkw.getRecoverableZooKeeper().getChildren(znode, null); + children = zkw.getZooKeeper().getChildren(znode, null); } catch(KeeperException.NoNodeException nne) { return null; } catch(InterruptedException ie) { @@ -464,7 +460,7 @@ public class ZKUtil { public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); + return !zkw.getZooKeeper().getChildren(znode, null).isEmpty(); } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + "because node does not exist (not an error)")); @@ -496,7 +492,7 @@ public class ZKUtil { public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); + Stat stat = zkw.getZooKeeper().exists(znode, null); return stat == null ? 0 : stat.getNumChildren(); } catch(KeeperException e) { LOG.warn(zkw.prefix("Unable to get children of node " + znode)); @@ -518,7 +514,7 @@ public class ZKUtil { public static byte [] getData(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); + byte [] data = zkw.getZooKeeper().getData(znode, null, null); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -550,7 +546,7 @@ public class ZKUtil { public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null); + byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); logRetrievedMsg(zkw, znode, data, true); return data; } catch (KeeperException.NoNodeException e) { @@ -587,7 +583,7 @@ public class ZKUtil { Stat stat) throws KeeperException { try { - byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat); + byte [] data = zkw.getZooKeeper().getData(znode, null, stat); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -650,7 +646,7 @@ public class ZKUtil { byte [] data, int expectedVersion) throws KeeperException { try { - zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); + zkw.getZooKeeper().setData(znode, data, expectedVersion); } catch(InterruptedException ie) { zkw.interruptedException(ie); } @@ -709,7 +705,7 @@ public class ZKUtil { byte [] data, int expectedVersion) throws KeeperException, KeeperException.NoNodeException { try { - return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; + return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null; } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -781,7 +777,7 @@ public class ZKUtil { String znode, byte [] data) throws KeeperException { try { - zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException.NodeExistsException nee) { if(!watchAndCheckExists(zkw, znode)) { @@ -820,11 +816,11 @@ public class ZKUtil { ZooKeeperWatcher zkw, String znode, byte [] data) throws KeeperException { try { - zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { try { - zkw.getRecoverableZooKeeper().exists(znode, zkw); + zkw.getZooKeeper().exists(znode, zkw); } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -857,9 +853,9 @@ public class ZKUtil { String znode, byte [] data) throws KeeperException, KeeperException.NodeExistsException { try { - zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion(); + return zkw.getZooKeeper().exists(znode, zkw).getVersion(); } catch (InterruptedException e) { zkw.interruptedException(e); return -1; @@ -884,7 +880,7 @@ public class ZKUtil { public static void asyncCreate(ZooKeeperWatcher zkw, String znode, byte [] data, final AsyncCallback.StringCallback cb, final Object ctx) { - zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx); } @@ -902,7 +898,7 @@ public class ZKUtil { String znode) throws KeeperException { try { - RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); + ZooKeeper zk = zkw.getZooKeeper(); if (zk.exists(znode, false) == null) { zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -910,7 +906,7 @@ public class ZKUtil { } catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NoAuthException nee){ try { - if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) { + if (null == zkw.getZooKeeper().exists(znode, false)) { // If we failed to create the file and it does not already exist. throw(nee); } @@ -940,7 +936,7 @@ public class ZKUtil { if(znode == null) { return; } - zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(KeeperException.NodeExistsException nee) { return; @@ -972,7 +968,7 @@ public class ZKUtil { int version) throws KeeperException { try { - zkw.getRecoverableZooKeeper().delete(node, version); + zkw.getZooKeeper().delete(node, version); return true; } catch(KeeperException.BadVersionException bve) { return false; @@ -991,7 +987,7 @@ public class ZKUtil { public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node) throws KeeperException { try { - zkw.getRecoverableZooKeeper().delete(node, -1); + zkw.getZooKeeper().delete(node, -1); } catch(KeeperException.NoNodeException nne) { } catch(InterruptedException ie) { zkw.interruptedException(ie); @@ -1013,7 +1009,7 @@ public class ZKUtil { deleteNodeRecursively(zkw, joinZNode(node, child)); } } - zkw.getRecoverableZooKeeper().delete(node, -1); + zkw.getZooKeeper().delete(node, -1); } catch(InterruptedException ie) { zkw.interruptedException(ie); } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 53276f6..dc471c4 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -59,7 +59,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { private String quorum; // zookeeper connection - private RecoverableZooKeeper recoverableZooKeeper; + private ZooKeeper zooKeeper; // abortable in case of zk failure private Abortable abortable; @@ -120,7 +120,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { this.identifier = descriptor; this.abortable = abortable; setNodeNames(conf); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); + this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); try { // Create all the necessary "directories" of znodes // TODO: Move this to an init method somewhere so not everyone calls it? @@ -129,28 +129,40 @@ public class ZooKeeperWatcher implements Watcher, Abortable { // Apparently this is recoverable. Retry a while. // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling // TODO: Generalize out in ZKUtil. - try { - ZKUtil.createAndFailSilent(this, baseZNode); - - } catch (KeeperException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Zookeeper Create Failed because "+e.getMessage()); + long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000); + long finished = System.currentTimeMillis() + wait; + KeeperException ke = null; + do { + try { + ZKUtil.createAndFailSilent(this, baseZNode); + ke = null; + break; + } catch (KeeperException.ConnectionLossException e) { + if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) { + LOG.debug("Retrying zk create for another " + + (finished - System.currentTimeMillis()) + + "ms; set 'hbase.zookeeper.recoverable.waittime' to change " + + "wait time); " + e.getMessage()); + } + ke = e; } - throw new ZooKeeperConnectionException("HBase is able to connect to" + - " ZooKeeper but the connection closes immediately. This could be" + - " a sign that the server has too many connections (30 is the" + - " default). Consider inspecting your ZK server logs for that" + - " error and then make sure you are reusing HBaseConfiguration" + - " as often as you can. See HTable's javadoc for more information.", - e); - } finally { + } while (isFinishedRetryingRecoverable(finished)); + // Convert connectionloss exception to ZKCE. + if (ke != null) { try { // If we don't close it, the zk connection managers won't be killed - this.recoverableZooKeeper.close(); + this.zooKeeper.close(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Interrupted while closing", e); } + throw new ZooKeeperConnectionException("HBase is able to connect to" + + " ZooKeeper but the connection closes immediately. This could be" + + " a sign that the server has too many connections (30 is the" + + " default). Consider inspecting your ZK server logs for that" + + " error and then make sure you are reusing HBaseConfiguration" + + " as often as you can. See HTable's javadoc for more information.", + ke); } ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); @@ -226,8 +238,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { * Get the connection to ZooKeeper. * @return connection reference to zookeeper */ - public RecoverableZooKeeper getRecoverableZooKeeper() { - return recoverableZooKeeper; + public ZooKeeper getZooKeeper() { + return zooKeeper; } /** @@ -312,16 +324,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable { this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); while (System.currentTimeMillis() < finished) { Threads.sleep(1); - if (this.recoverableZooKeeper != null) break; + if (this.zooKeeper != null) break; } - if (this.recoverableZooKeeper == null) { + if (this.zooKeeper == null) { LOG.error("ZK is null on connection event -- see stack trace " + "for the stack trace when constructor was called on this zkw", this.constructorCaller); throw new NullPointerException("ZK is null"); } this.identifier = this.identifier + "-0x" + - Long.toHexString(this.recoverableZooKeeper.getSessionId()); + Long.toHexString(this.zooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. LOG.debug(this.identifier + " connected"); break; @@ -356,7 +368,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { * is up-to-date from when we begin the operation. */ public void sync(String path) { - this.recoverableZooKeeper.sync(path, null, null); + this.zooKeeper.sync(path, null, null); } /** @@ -407,8 +419,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { */ public void close() { try { - if (recoverableZooKeeper != null) { - recoverableZooKeeper.close(); + if (zooKeeper != null) { + zooKeeper.close(); // super.close(); } } catch (InterruptedException e) { diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 1bdd541..8a64439 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1028,7 +1028,7 @@ public class HBaseTestingUtility { Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds - ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); + ZooKeeper zk = nodeZK.getZooKeeper(); byte[] password = zk.getSessionPasswd(); long sessionID = zk.getSessionId(); diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 476343a..ffa1710 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -99,8 +99,8 @@ public class TestZooKeeper { int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); - long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); - byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); + long sessionID = connectionZK.getZooKeeper().getSessionId(); + byte[] password = connectionZK.getZooKeeper().getSessionPasswd(); ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance, sessionID, password); zk.close(); @@ -112,14 +112,14 @@ public class TestZooKeeper { // Check that the old ZK conenction is closed, means we did expire System.err.println("ZooKeeper should have timed out"); - LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState()); - Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().equals( + LOG.info("state=" + connectionZK.getZooKeeper().getState()); + Assert.assertTrue(connectionZK.getZooKeeper().getState().equals( States.CLOSED)); // Check that the client recovered ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); - LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); - Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( + LOG.info("state=" + newConnectionZK.getZooKeeper().getState()); + Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals( States.CONNECTED)); } diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 9a88855..4aab4b9 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -198,7 +198,7 @@ public class TestSplitLogManager { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - zkw.getRecoverableZooKeeper().create(tasknode, + zkw.getZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -231,7 +231,7 @@ public class TestSplitLogManager { " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - zkw.getRecoverableZooKeeper().create(tasknode, + zkw.getZooKeeper().create(tasknode, TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); @@ -391,7 +391,7 @@ public class TestSplitLogManager { // create an orphan task in OWNED state String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); - zkw.getRecoverableZooKeeper().create(tasknode1, + zkw.getZooKeeper().create(tasknode1, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index ebba2be..ce15168 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -129,7 +129,7 @@ public class TestSplitLogWorker { public void testAcquireTaskAtStartup() throws Exception { LOG.info("testAcquireTaskAtStartup"); - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -160,7 +160,7 @@ public class TestSplitLogWorker { public void testRaceForTask() throws Exception { LOG.info("testRaceForTask"); - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -199,7 +199,7 @@ public class TestSplitLogWorker { Thread.sleep(100); // this time create a task node after starting the splitLogWorker - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -226,7 +226,7 @@ public class TestSplitLogWorker { Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -234,7 +234,7 @@ public class TestSplitLogWorker { // now the worker is busy doing the above task // create another task - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -261,7 +261,7 @@ public class TestSplitLogWorker { Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -274,7 +274,7 @@ public class TestSplitLogWorker { waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); // create a RESCAN node - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 8678384..7fc44e9 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -135,7 +135,7 @@ public class TestSplitTransactionOnCluster { hri.getEncodedName()); Stat stat = null; for (int i = 0; i < 10; i++) { - stat = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + stat = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); LOG.info("Stat for znode path=" + path + ": " + stat); if (stat == null) break; org.apache.hadoop.hbase.util.Threads.sleep(100); @@ -195,7 +195,7 @@ public class TestSplitTransactionOnCluster { String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), @@ -220,7 +220,7 @@ public class TestSplitTransactionOnCluster { assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); + stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally {