diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 0b9deac..9e4b021 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1706,8 +1706,8 @@ public class HConnectionManager { } this.servers.clear(); if (this.zooKeeper != null) { - LOG.info("Closed zookeeper sessionid=0x" - + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + LOG.info("Closed zookeeper sessionid=0x" + + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); this.zooKeeper.close(); this.zooKeeper = null; } diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 51cbc79..49d1e7c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1145,7 +1145,7 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString()); // Async exists to set a watcher so we'll get triggered when // unassigned node changes. - this.zkw.getZooKeeper().exists(path, this.zkw, + this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw, new ExistsUnassignedAsyncCallback(this.counter, destination), ctx); } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 64c45fc..b727c7c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -370,7 +370,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + ", cluster-up flag was=" + wasUp); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 930a6f5..096b133 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -197,6 +197,7 @@ public class SplitLogManager extends ZooKeeperListener { * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split + * @throws KeeperException */ public long splitLogDistributed(final Path logDir) throws IOException { List logDirs = new ArrayList(); @@ -370,7 +371,8 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatch(String path, Long retry_count) { - this.watcher.getZooKeeper().getData(path, this.watcher, + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); tot_mgr_get_data_queued.incrementAndGet(); } @@ -524,7 +526,8 @@ public class SplitLogManager extends ZooKeeperListener { private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); - this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + delete(path, -1, new DeleteAsyncCallback(), retries); } @@ -551,9 +554,11 @@ public class SplitLogManager extends ZooKeeperListener { /** * signal the workers that a task was resubmitted by creating the * RESCAN node. + * @throws KeeperException */ private void createRescanNode(long retries) { - watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + create(ZKSplitLog.getRescanNode(watcher), TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new CreateRescanAsyncCallback(), new Long(retries)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 06ed8f6..395d93f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -879,7 +879,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RPC listening on " + this.isa + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); isOnline = true; } catch (Throwable e) { this.isOnline = false; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 843873e..d61f6e8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -329,7 +329,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { */ private boolean ownTask(boolean isFirstTime) { try { - Stat stat = this.watcher.getZooKeeper().setData(currentTask, + Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask, TaskState.TASK_OWNED.get(serverName), currentVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + currentTask); @@ -392,8 +392,9 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } void getDataSetWatchAsync() { - this.watcher.getZooKeeper().getData(currentTask, this.watcher, - new GetDataAsyncCallback(), null); + this.watcher.getRecoverableZooKeeper().getZooKeeper(). + getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); tot_wkr_get_data_queued.incrementAndGet(); } diff --git a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 2a2585d..f5f742f 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -310,7 +310,16 @@ public class Bytes { public static String toStringBinary(final byte [] b) { return toStringBinary(b, 0, b.length); } - + + /** + * The same as {@link #toStringBinary(byte[])}, but returns a string "null" + * if given a null argument. + */ + public static String toStringBinarySafe(final byte [] b) { + if (b == null) + return "null"; + return toStringBinary(b, 0, b.length); + } /** * Write a printable representation of a byte array. Non-printable * characters are hex escaped in the format \\x%02X, eg: diff --git a/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java new file mode 100644 index 0000000..61ea552 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java @@ -0,0 +1,57 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounter { + private final int maxRetries; + private int retriesRemaining; + private final int retryIntervalMillis; + private final TimeUnit timeUnit; + + public RetryCounter(int maxRetries, + int retryIntervalMillis, TimeUnit timeUnit) { + this.maxRetries = maxRetries; + this.retriesRemaining = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + this.timeUnit = timeUnit; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void sleepUntilNextRetry() throws InterruptedException { + timeUnit.sleep(retryIntervalMillis); + } + + public boolean shouldRetry() { + return retriesRemaining > 0; + } + + public void useRetry() { + retriesRemaining--; + } + + public int getAttemptTimes() { + return maxRetries-retriesRemaining+1; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java b/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java new file mode 100644 index 0000000..445234e --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java @@ -0,0 +1,38 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +public class RetryCounterFactory { + private final int maxRetries; + private final int retryIntervalMillis; + + public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + } + + public RetryCounter create() { + return new RetryCounter( + maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS + ); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java new file mode 100644 index 0000000..d8cc8aa --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -0,0 +1,661 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/** + * A zookeeper that can handle 'recoverable' errors. + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * when doing the create. If the process is using an id of 352, before reissuing + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create + * succeeded an the znode it created is "x-352-109". + * @see http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling + */ +public class RecoverableZooKeeper { + private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); + // the actual ZooKeeper client instance + private ZooKeeper zk; + private final RetryCounterFactory retryCounterFactory; + // An identifier of this process in the cluster + private final String identifier; + private final byte[] id; + private int retryIntervalMillis; + + private static final int ID_OFFSET = Bytes.SIZEOF_INT; + // the magic number is to be backward compatible + private static final byte MAGIC =(byte) 0XFF; + private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE; + + public RecoverableZooKeeper(String quorumServers, int seesionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis) + throws IOException { + this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher); + this.retryCounterFactory = + new RetryCounterFactory(maxRetries, retryIntervalMillis); + this.retryIntervalMillis = retryIntervalMillis; + + // the identifier = processID@hostName + this.identifier = ManagementFactory.getRuntimeMXBean().getName(); + LOG.info("The identifier of this process is " + identifier); + this.id = Bytes.toBytes(identifier); + } + + /** + * delete is an idempotent operation. Retry before throw out exception. + * This function will not throw out NoNodeException if the path is not existed + * @param path + * @param version + * @throws InterruptedException + * @throws KeeperException + */ + public void delete(String path, int version) + throws InterruptedException, KeeperException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + zk.delete(path, version); + return; + } catch (KeeperException e) { + switch (e.code()) { + case NONODE: + if (isRetry) { + LOG.info("Node " + path + " already deleted. Assuming that a " + + "previous attempt succeeded."); + return; + } + LOG.warn("Node " + path + " already deleted, and this is not a " + + "retry"); + throw e; + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper delete failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * exists is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat exists(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.exists(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper exists failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, Watcher watcher) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watcher); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getChildren is an idempotent operation. Retry before throw out exception + * @param path + * @param watch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public List getChildren(String path, boolean watch) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return zk.getChildren(path, watch); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getChildren failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idempotent operation. Retry before throw out exception + * @param path + * @param watcher + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, Watcher watcher, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watcher, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * getData is an idemnpotent operation. Retry before throw out exception + * @param path + * @param watch + * @param stat + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] getData(String path, boolean watch, Stat stat) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + byte[] revData = zk.getData(path, watch, stat); + return this.removeMetaData(revData); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper getData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + * setData is NOT an idempotent operation. Retry may cause BadVersion Exception + * Adding an identifier field into the data to check whether + * badversion is caused by the result of previous correctly setData + * @param path + * @param data + * @param version + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public Stat setData(String path, byte[] data, int version) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + byte[] newData = appendMetaData(data); + while (true) { + try { + return zk.setData(path, newData, version); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper setData failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + case BADVERSION: + // try to verify whether the previous setData success or not + try{ + Stat stat = new Stat(); + byte[] revData = zk.getData(path, false, stat); + int idLength = Bytes.toInt(revData, ID_OFFSET); + int dataLength = revData.length-ID_OFFSET-idLength; + int dataOffset = ID_OFFSET+idLength; + + if(Bytes.compareTo(revData, ID_OFFSET, id.length, + revData, dataOffset, dataLength) == 0) { + // the bad version is caused by previous successful setData + return stat; + } + } catch(KeeperException keeperException){ + // the ZK is not reliable at this moment. just throw out exception + throw keeperException; + } + + // throw out other exceptions and verified bad version exceptions + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + /** + *

+ * NONSEQUENTIAL create is idempotent operation. + * Retry before throw out exceptions. + * But this function will not throw out the NodeExist exception back to the + * application. + *

+ *

+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful + * or not. + *

+ * + * @param path + * @param data + * @param acl + * @param createMode + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public String create(String path, byte[] data, List acl, + CreateMode createMode) + throws KeeperException, InterruptedException { + byte[] newData = appendMetaData(data); + switch (createMode) { + case EPHEMERAL: + case PERSISTENT: + return createNonSequential(path, newData, acl, createMode); + + case EPHEMERAL_SEQUENTIAL: + case PERSISTENT_SEQUENTIAL: + return createSequential(path, newData, acl, createMode); + + default: + throw new IllegalArgumentException("Unrecognized CreateMode: " + + createMode); + } + } + + private String createNonSequential(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + return zk.create(path, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case NODEEXISTS: + if (isRetry) { + // If the connection was lost, there is still a possibility that + // we have successfully created the node at our previous attempt, + // so we read the node and compare. + byte[] currentData = zk.getData(path, false, null); + if (currentData != null && + Bytes.compareTo(currentData, data) == 0) { + // We successfully created a non-sequential node + return path; + } + LOG.error("Node " + path + " already exists with " + + Bytes.toStringBinarySafe(currentData) + ", could not write " + + Bytes.toStringBinarySafe(data)); + throw e; + } + LOG.error("Node " + path + " already exists and this is not a " + + "retry"); + throw e; + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + isRetry = true; + } + } + + private String createSequential(String path, byte[] data, + List acl, CreateMode createMode) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean first = true; + String newPath = path+this.identifier; + while (true) { + try { + if (!first) { + // Check if we succeeded on a previous attempt + String previousResult = findPreviousSequentialNode(newPath); + if (previousResult != null) { + return previousResult; + } + } + first = false; + return zk.create(newPath, data, acl, createMode); + } catch (KeeperException e) { + switch (e.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + LOG.warn("Possibly transient ZooKeeper exception: " + e); + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper create failed after " + + retryCounter.getMaxRetries() + " retries"); + throw e; + } + break; + + default: + throw e; + } + } + LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " + + "ZooKeeper after sleeping "+retryIntervalMillis+" ms"); + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } + } + + private String findPreviousSequentialNode(String path) + throws KeeperException, InterruptedException { + int lastSlashIdx = path.lastIndexOf('/'); + assert(lastSlashIdx != -1); + String parent = path.substring(0, lastSlashIdx); + String nodePrefix = path.substring(lastSlashIdx+1); + + List nodes = zk.getChildren(parent, false); + List matching = filterByPrefix(nodes, nodePrefix); + for (String node : matching) { + String nodePath = parent + "/" + node; + Stat stat = zk.exists(nodePath, false); + if (stat != null) { + return nodePath; + } + } + return null; + } + + public byte[] removeMetaData(byte[] data) { + if(data == null || data.length == 0) { + return data; + } + // check the magic data; to be backward compatible + byte magic = data[0]; + if(magic != MAGIC) { + return data; + } + + int idLength = Bytes.toInt(data, MAGIC_OFFSET); + int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength; + int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength; + + byte[] newData = new byte[dataLength]; + System.arraycopy(data, dataOffset, newData, 0, dataLength); + + return newData; + + } + + private byte[] appendMetaData(byte[] data) { + if(data == null){ + return null; + } + + byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length]; + int pos = 0; + pos = Bytes.putByte(newData, pos, MAGIC); + pos = Bytes.putInt(newData, pos, id.length); + pos = Bytes.putBytes(newData, pos, id, 0, id.length); + pos = Bytes.putBytes(newData, pos, data, 0, data.length); + + return newData; + } + + public long getSessionId() { + return zk.getSessionId(); + } + + public void close() throws InterruptedException { + zk.close(); + } + + public States getState() { + return zk.getState(); + } + + public ZooKeeper getZooKeeper() { + return zk; + } + + public byte[] getSessionPasswd() { + return zk.getSessionPasswd(); + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { + this.zk.sync(path, null, null); + } + + /** + * Filters the given node list by the given prefixes. + * This method is all-inclusive--if any element in the node list starts + * with any of the given prefixes, then it is included in the result. + * + * @param nodes the nodes to filter + * @param prefixes the prefixes to include in the result + * @return list of every element that starts with one of the prefixes + */ + private static List filterByPrefix(List nodes, + String... prefixes) { + List lockChildren = new ArrayList(); + for (String child : nodes){ + for (String prefix : prefixes){ + if (child.startsWith(prefix)){ + lockChildren.add(child); + break; + } + } + } + return lockChildren; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 57cd101..f237d3c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -70,20 +70,20 @@ public class ZKUtil { * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static ZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(properties); return connect(conf, ensemble, watcher); } - public static ZooKeeper connect(Configuration conf, String ensemble, + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher) throws IOException { return connect(conf, ensemble, watcher, ""); } - public static ZooKeeper connect(Configuration conf, String ensemble, + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher, final String descriptor) throws IOException { if(ensemble == null) { @@ -92,7 +92,11 @@ public class ZKUtil { int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000); LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" + ensemble + ")"); - return new ZooKeeper(ensemble, timeout, watcher); + int retry = conf.getInt("zookeeper.recovery.retry", 3); + int retryIntervalMillis = + conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + return new RecoverableZooKeeper(ensemble, timeout, watcher, + retry, retryIntervalMillis); } // @@ -214,7 +218,7 @@ public class ZKUtil { public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, zkw); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); LOG.debug(zkw.prefix("Set watcher on existing znode " + znode)); return s != null ? true : false; } catch (KeeperException e) { @@ -242,7 +246,7 @@ public class ZKUtil { public static int checkExists(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat s = zkw.getZooKeeper().exists(znode, null); + Stat s = zkw.getRecoverableZooKeeper().exists(znode, null); return s != null ? s.getVersion() : -1; } catch (KeeperException e) { LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e); @@ -279,7 +283,7 @@ public class ZKUtil { ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - List children = zkw.getZooKeeper().getChildren(znode, zkw); + List children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); return children; } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + @@ -339,7 +343,7 @@ public class ZKUtil { List children = null; try { // List the children without watching - children = zkw.getZooKeeper().getChildren(znode, null); + children = zkw.getRecoverableZooKeeper().getChildren(znode, null); } catch(KeeperException.NoNodeException nne) { return null; } catch(InterruptedException ie) { @@ -389,7 +393,7 @@ public class ZKUtil { public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - return !zkw.getZooKeeper().getChildren(znode, null).isEmpty(); + return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty(); } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + "because node does not exist (not an error)")); @@ -421,7 +425,7 @@ public class ZKUtil { public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - Stat stat = zkw.getZooKeeper().exists(znode, null); + Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null); return stat == null ? 0 : stat.getNumChildren(); } catch(KeeperException e) { LOG.warn(zkw.prefix("Unable to get children of node " + znode)); @@ -443,7 +447,7 @@ public class ZKUtil { public static byte [] getData(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, null, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -475,7 +479,7 @@ public class ZKUtil { public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null); logRetrievedMsg(zkw, znode, data, true); return data; } catch (KeeperException.NoNodeException e) { @@ -512,7 +516,7 @@ public class ZKUtil { Stat stat) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, null, stat); + byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -549,7 +553,7 @@ public class ZKUtil { byte [] data, int expectedVersion) throws KeeperException { try { - zkw.getZooKeeper().setData(znode, data, expectedVersion); + zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion); } catch(InterruptedException ie) { zkw.interruptedException(ie); } @@ -583,7 +587,7 @@ public class ZKUtil { byte [] data, int expectedVersion) throws KeeperException, KeeperException.NoNodeException { try { - return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null; + return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null; } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -654,7 +658,7 @@ public class ZKUtil { String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException.NodeExistsException nee) { if(!watchAndCheckExists(zkw, znode)) { @@ -693,11 +697,11 @@ public class ZKUtil { ZooKeeperWatcher zkw, String znode, byte [] data) throws KeeperException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { try { - zkw.getZooKeeper().exists(znode, zkw); + zkw.getRecoverableZooKeeper().exists(znode, zkw); } catch (InterruptedException e) { zkw.interruptedException(e); return false; @@ -730,9 +734,9 @@ public class ZKUtil { String znode, byte [] data) throws KeeperException, KeeperException.NodeExistsException { try { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - return zkw.getZooKeeper().exists(znode, zkw).getVersion(); + return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion(); } catch (InterruptedException e) { zkw.interruptedException(e); return -1; @@ -757,7 +761,7 @@ public class ZKUtil { public static void asyncCreate(ZooKeeperWatcher zkw, String znode, byte [] data, final AsyncCallback.StringCallback cb, final Object ctx) { - zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx); } @@ -775,7 +779,7 @@ public class ZKUtil { String znode) throws KeeperException { try { - ZooKeeper zk = zkw.getZooKeeper(); + RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); if (zk.exists(znode, false) == null) { zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -783,7 +787,7 @@ public class ZKUtil { } catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NoAuthException nee){ try { - if (null == zkw.getZooKeeper().exists(znode, false)) { + if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) { // If we failed to create the file and it does not already exist. throw(nee); } @@ -813,7 +817,7 @@ public class ZKUtil { if(znode == null) { return; } - zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch(KeeperException.NodeExistsException nee) { return; @@ -845,7 +849,7 @@ public class ZKUtil { int version) throws KeeperException { try { - zkw.getZooKeeper().delete(node, version); + zkw.getRecoverableZooKeeper().delete(node, version); return true; } catch(KeeperException.BadVersionException bve) { return false; @@ -864,7 +868,7 @@ public class ZKUtil { public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node) throws KeeperException { try { - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(KeeperException.NoNodeException nne) { } catch(InterruptedException ie) { zkw.interruptedException(ie); @@ -886,7 +890,7 @@ public class ZKUtil { deleteNodeRecursively(zkw, joinZNode(node, child)); } } - zkw.getZooKeeper().delete(node, -1); + zkw.getRecoverableZooKeeper().delete(node, -1); } catch(InterruptedException ie) { zkw.interruptedException(ie); } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 4fa82c0..30b8317 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; @@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; /** * Acts as the single ZooKeeper Watcher. One instance of this is instantiated @@ -58,7 +56,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { private String quorum; // zookeeper connection - private ZooKeeper zooKeeper; + private RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure private Abortable abortable; @@ -116,51 +114,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable { this.identifier = descriptor; this.abortable = abortable; setNodeNames(conf); - this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor); try { // Create all the necessary "directories" of znodes // TODO: Move this to an init method somewhere so not everyone calls it? - - // The first call against zk can fail with connection loss. Seems common. - // Apparently this is recoverable. Retry a while. - // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling - // TODO: Generalize out in ZKUtil. - long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, - HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME); - long finished = System.currentTimeMillis() + wait; - KeeperException ke = null; - do { - try { - ZKUtil.createAndFailSilent(this, baseZNode); - ke = null; - break; - } catch (KeeperException.ConnectionLossException e) { - if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) { - LOG.debug("Retrying zk create for another " + - (finished - System.currentTimeMillis()) + - "ms; set 'hbase.zookeeper.recoverable.waittime' to change " + - "wait time); " + e.getMessage()); - } - ke = e; - } - } while (isFinishedRetryingRecoverable(finished)); - // Convert connectionloss exception to ZKCE. - if (ke != null) { - try { - // If we don't close it, the zk connection managers won't be killed - this.zooKeeper.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while closing", e); - } - throw new ZooKeeperConnectionException("HBase is able to connect to" + - " ZooKeeper but the connection closes immediately. This could be" + - " a sign that the server has too many connections (30 is the" + - " default). Consider inspecting your ZK server logs for that" + - " error and then make sure you are reusing HBaseConfiguration" + - " as often as you can. See HTable's javadoc for more information.", - ke); - } + ZKUtil.createAndFailSilent(this, baseZNode); ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); @@ -235,8 +193,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { * Get the connection to ZooKeeper. * @return connection reference to zookeeper */ - public ZooKeeper getZooKeeper() { - return zooKeeper; + public RecoverableZooKeeper getRecoverableZooKeeper() { + return recoverableZooKeeper; } /** @@ -321,16 +279,16 @@ public class ZooKeeperWatcher implements Watcher, Abortable { this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); while (System.currentTimeMillis() < finished) { Threads.sleep(1); - if (this.zooKeeper != null) break; + if (this.recoverableZooKeeper != null) break; } - if (this.zooKeeper == null) { + if (this.recoverableZooKeeper == null) { LOG.error("ZK is null on connection event -- see stack trace " + "for the stack trace when constructor was called on this zkw", this.constructorCaller); throw new NullPointerException("ZK is null"); } this.identifier = this.identifier + "-0x" + - Long.toHexString(this.zooKeeper.getSessionId()); + Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. LOG.debug(this.identifier + " connected"); break; @@ -365,7 +323,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { * is up-to-date from when we begin the operation. */ public void sync(String path) { - this.zooKeeper.sync(path, null, null); + this.recoverableZooKeeper.sync(path, null, null); } /** @@ -408,8 +366,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { */ public void close() { try { - if (zooKeeper != null) { - zooKeeper.close(); + if (recoverableZooKeeper != null) { + recoverableZooKeeper.close(); // super.close(); } } catch (InterruptedException e) { diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 6af0ecf..57a100b 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1126,7 +1126,7 @@ public class HBaseTestingUtility { Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds - ZooKeeper zk = nodeZK.getZooKeeper(); + ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); byte[] password = zk.getSessionPasswd(); long sessionID = zk.getSessionId(); diff --git a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 088f9c0..2f136dc 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -99,9 +99,8 @@ public class TestZooKeeper { int sessionTimeout = 5 * 1000; // 5 seconds HConnection connection = HConnectionManager.getConnection(c); ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher(); - long sessionID = connectionZK.getZooKeeper().getSessionId(); - byte [] password = connectionZK.getZooKeeper().getSessionPasswd(); - + long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId(); + byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd(); ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, EmptyWatcher.instance, sessionID, password); LOG.info("Session timeout=" + zk.getSessionTimeout() + @@ -116,15 +115,16 @@ public class TestZooKeeper { // Check that the old ZK connection is closed, means we did expire System.err.println("ZooKeeper should have timed out"); - String state = connectionZK.getZooKeeper().getState().toString(); - Assert.assertTrue("State=" + state, - connectionZK.getZooKeeper().getState().equals(States.CLOSED)); + String state = connectionZK.getRecoverableZooKeeper().getState().toString(); + LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState(). + equals(States.CLOSED)); // Check that the client recovered ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher(); - LOG.info("state=" + newConnectionZK.getZooKeeper().getState()); - Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals( - States.CONNECTED)); + LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState()); + Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals( + States.CONNECTED)); } @Test @@ -272,4 +272,4 @@ public class TestZooKeeper { ZKUtil.createAndFailSilent(zk2, aclZnode); } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 4aab4b9..9a88855 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -198,7 +198,7 @@ public class TestSplitLogManager { LOG.info("TestOrphanTaskAcquisition"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); - zkw.getZooKeeper().create(tasknode, + zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -231,7 +231,7 @@ public class TestSplitLogManager { " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - zkw.getZooKeeper().create(tasknode, + zkw.getRecoverableZooKeeper().create(tasknode, TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); @@ -391,7 +391,7 @@ public class TestSplitLogManager { // create an orphan task in OWNED state String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); - zkw.getZooKeeper().create(tasknode1, + zkw.getRecoverableZooKeeper().create(tasknode1, TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index b54a11d..75ea09a 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -129,7 +129,7 @@ public class TestSplitLogWorker { LOG.info("testAcquireTaskAtStartup"); ZKSplitLog.Counters.resetCounters(); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"), TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -161,7 +161,7 @@ public class TestSplitLogWorker { LOG.info("testRaceForTask"); ZKSplitLog.Counters.resetCounters(); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -200,7 +200,7 @@ public class TestSplitLogWorker { Thread.sleep(100); // this time create a task node after starting the splitLogWorker - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -228,7 +228,7 @@ public class TestSplitLogWorker { Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -236,7 +236,7 @@ public class TestSplitLogWorker { // now the worker is busy doing the above task // create another task - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -264,7 +264,7 @@ public class TestSplitLogWorker { Thread.yield(); // let the worker start Thread.sleep(100); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -277,7 +277,7 @@ public class TestSplitLogWorker { waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); // create a RESCAN node - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), + zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index a750787..b8ab708 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -139,7 +139,7 @@ public class TestSplitTransactionOnCluster { String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(), hri.getEncodedName()); Stat stats = - t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats); RegionTransitionData rtd = ZKAssign.getData(t.getConnection().getZooKeeperWatcher(), @@ -162,7 +162,7 @@ public class TestSplitTransactionOnCluster { assertTrue(daughters.contains(r)); } // Finally assert that the ephemeral SPLIT znode was cleaned up. - stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false); + stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false); LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats); assertTrue(stats == null); } finally {