diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 8ddac6e..ff909d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -23,8 +23,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.util.BasicRetryAccountant; +import org.apache.hadoop.hbase.util.RetryAccountant; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -76,7 +76,7 @@ public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); // the actual ZooKeeper client instance volatile private ZooKeeper zk; - private final RetryCounterFactory retryCounterFactory; + private final BasicRetryAccountantFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; private final byte[] id; @@ -111,7 +111,7 @@ public class RecoverableZooKeeper { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); this.retryCounterFactory = - new RetryCounterFactory(maxRetries, retryIntervalMillis); + new BasicRetryAccountantFactory(maxRetries, retryIntervalMillis); if (identifier == null || identifier.length() == 0) { // the identifier = processID@hostName @@ -146,7 +146,7 @@ public class RecoverableZooKeeper { */ public void delete(String path, int version) throws InterruptedException, KeeperException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { @@ -173,8 +173,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); isRetry = true; } } @@ -185,7 +184,7 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { return zk.exists(path, watcher); @@ -201,8 +200,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -212,7 +210,7 @@ public class RecoverableZooKeeper { */ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { return zk.exists(path, watch); @@ -228,17 +226,15 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } - private void retryOrThrow(RetryCounter retryCounter, KeeperException e, + private void retryOrThrow(RetryAccountant retryCounter, KeeperException e, String opName) throws KeeperException { LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e); - if (!retryCounter.shouldRetry()) { - LOG.error("ZooKeeper " + opName + " failed after " - + retryCounter.getMaxRetries() + " retries"); + if (!retryCounter.retry()) { + LOG.error("ZooKeeper " + opName + " failed " + retryCounter); throw e; } } @@ -249,7 +245,7 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { return zk.getChildren(path, watcher); @@ -265,8 +261,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -276,7 +271,7 @@ public class RecoverableZooKeeper { */ public List getChildren(String path, boolean watch) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { return zk.getChildren(path, watch); @@ -292,8 +287,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -303,7 +297,7 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { byte[] revData = zk.getData(path, watcher, stat); @@ -320,8 +314,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -331,7 +324,7 @@ public class RecoverableZooKeeper { */ public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); while (true) { try { byte[] revData = zk.getData(path, watch, stat); @@ -348,8 +341,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -361,7 +353,7 @@ public class RecoverableZooKeeper { */ public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); byte[] newData = appendMetaData(data); boolean isRetry = false; while (true) { @@ -394,8 +386,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); isRetry = true; } } @@ -436,7 +427,7 @@ public class RecoverableZooKeeper { private String createNonSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { @@ -473,8 +464,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); isRetry = true; } } @@ -482,7 +472,7 @@ public class RecoverableZooKeeper { private String createSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); boolean first = true; String newPath = path+this.identifier; while (true) { @@ -508,8 +498,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } /** @@ -545,7 +534,7 @@ public class RecoverableZooKeeper { */ public List multi(Iterable ops) throws KeeperException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); + RetryAccountant retryCounter = retryCounterFactory.create(); Iterable multiOps = prepareZKMulti(ops); while (true) { try { @@ -562,8 +551,7 @@ public class RecoverableZooKeeper { throw e; } } - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); + retryCounter.sleep(); } } @@ -671,4 +659,22 @@ public class RecoverableZooKeeper { public String getIdentifier() { return identifier; } -} + + /** + * Factory to create {@link BasicRetryAccountant}s. Saves having to have maxRetries and + * retryIntervalMillis available everywhere. + */ + static class BasicRetryAccountantFactory { + private final int maxRetries; + private final int retryIntervalMillis; + + BasicRetryAccountantFactory(int maxRetries, int retryIntervalMillis) { + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + } + + RetryAccountant create() { + return new BasicRetryAccountant(maxRetries, retryIntervalMillis); + } + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BasicRetryAccountant.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BasicRetryAccountant.java new file mode 100644 index 0000000..701e80c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BasicRetryAccountant.java @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A retry accountant that does maxRetries waiting + * retryIntervalMillis ^ 2 between each attempt. + */ +@InterfaceAudience.Private +public class BasicRetryAccountant implements RetryAccountant { + private static final Log LOG = LogFactory.getLog(RetryAccountant.class); + private final int maxRetries; + private int retriesRemaining; + final int retryIntervalMillis; + final long retryStart = System.currentTimeMillis(); + private final TimeUnit timeUnit = TimeUnit.MILLISECONDS; + + public BasicRetryAccountant(int maxRetries, int retryIntervalMillis) { + this.maxRetries = maxRetries; + this.retriesRemaining = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + } + + int getMaxRetries() { + return maxRetries; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.util.RetryCounter#sleepUntilNextRetry() + */ + @Override + public void sleep() throws InterruptedException { + long sleepTime = calculateSleepTime(); + if (sleepTime > 0) { + logSleep(sleepTime); + timeUnit.sleep(sleepTime); + } + useRetry(); + } + + void logSleep(final long sleepTime) { + LOG.info("Sleeping " + sleepTime + "ms after retry #" + getAttempt() + "..."); + } + + long calculateSleepTime() { + int attempt = getAttempt(); + attempt = attempt <= 0? 1: attempt; + return (long) (retryIntervalMillis * Math.pow(2, attempt)); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.util.RetryCounter#shouldRetry() + */ + @Override + public boolean retry() { + return retriesRemaining > 0; + } + + void useRetry() { + retriesRemaining--; + } + + int getAttempt() { + return maxRetries - retriesRemaining; + } + + public String toString() { + return "Done=" + !retry() + ", maxRetries=" + maxRetries + ", tried=" + getAttempt() + + ", elapsedTime=" + (System.currentTimeMillis() - this.retryStart) + "ms"; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/MaxTimeRetryAccountant.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/MaxTimeRetryAccountant.java new file mode 100644 index 0000000..6cd06a1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/MaxTimeRetryAccountant.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; + +import com.google.common.base.Preconditions; + +/** + * A retry accountant that does maxRetries or maxTime, which ever + * comes first, waiting retryIntervalMillis * n where n is a function of + * how many retries have happened so far. See {@link HContants#RETRY_BACKOFF} for input on + * backoff function. + */ +@InterfaceAudience.Private +public class MaxTimeRetryAccountant extends BasicRetryAccountant { + private static final Log LOG = LogFactory.getLog(MaxTimeRetryAccountant.class); + private static final Random RANDOM = new Random(); + final long endTime; + + public MaxTimeRetryAccountant(int maxRetries, int maxTime, int retryIntervalMillis) { + super(maxRetries, retryIntervalMillis); + Preconditions.checkArgument(maxTime > 0); + this.endTime = this.retryStart + maxTime; + } + + @Override + long calculateSleepTime() { + long sleepTime = getPauseTime(this.retryIntervalMillis, getAttempt()); + long now = System.currentTimeMillis(); + long diff = endTime - now; + diff = diff < 0? 0: diff; + return Math.min(diff, sleepTime); + } + + @Override + public boolean retry() { + return System.currentTimeMillis() < endTime && super.retry(); + } + + public String toString() { + return super.toString() + ", remaining=" + (this.endTime - System.currentTimeMillis()) + "ms"; + } + + /** + * Calculate pause time. + * Built on {@link HConstants#RETRY_BACKOFF}. + * @param pause + * @param tries + * @return How long to wait after tries retries + */ + static long getPauseTime(final long pause, final int tries) { + // Copied from ConnectionUtil in hbase-client. + int ntries = tries; + if (ntries >= HConstants.RETRY_BACKOFF.length) { + ntries = HConstants.RETRY_BACKOFF.length - 1; + } + + long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; + long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter + return normalPause + jitter; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryAccountant.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryAccountant.java new file mode 100644 index 0000000..c173e43 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryAccountant.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +/** + * Keep account of retries. Call {@link #retry()} to test if a retry should happen. After + * an unsuccessful retry, call {@link #useRetry()} so we have a chance internally to up the + * internal counters. Then call {@link #sleep()} so we wait before time to do next retry. + */ +public interface RetryAccountant { + /** + * Sleep between retries (how long we sleep is implementation specific). Calls to sleep will + * also up internal counters because it is presumed that the call to sleep is because a + * retry failed + * @throws InterruptedException + */ + public abstract void sleep() throws InterruptedException; + + /** + * @return True if we should retry + */ + public abstract boolean retry(); +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java deleted file mode 100644 index 7790362..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; - -@InterfaceAudience.Private -public class RetryCounter { - private static final Log LOG = LogFactory.getLog(RetryCounter.class); - private final int maxRetries; - private int retriesRemaining; - private final int retryIntervalMillis; - private final TimeUnit timeUnit; - - public RetryCounter(int maxRetries, - int retryIntervalMillis, TimeUnit timeUnit) { - this.maxRetries = maxRetries; - this.retriesRemaining = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; - this.timeUnit = timeUnit; - } - - public int getMaxRetries() { - return maxRetries; - } - - /** - * Sleep for a exponentially back off time - * @throws InterruptedException - */ - public void sleepUntilNextRetry() throws InterruptedException { - int attempts = getAttemptTimes(); - long sleepTime = (long) (retryIntervalMillis * Math.pow(2, attempts)); - LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "..."); - timeUnit.sleep(sleepTime); - } - - public boolean shouldRetry() { - return retriesRemaining > 0; - } - - public void useRetry() { - retriesRemaining--; - } - - public int getAttemptTimes() { - return maxRetries-retriesRemaining+1; - } -} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java deleted file mode 100644 index 59edf96..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.classification.InterfaceAudience; - -@InterfaceAudience.Private -public class RetryCounterFactory { - private final int maxRetries; - private final int retryIntervalMillis; - - public RetryCounterFactory(int maxRetries, int retryIntervalMillis) { - this.maxRetries = maxRetries; - this.retryIntervalMillis = retryIntervalMillis; - } - - public RetryCounter create() { - return new RetryCounter( - maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS - ); - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestMaxTimeRetryAccountant.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestMaxTimeRetryAccountant.java new file mode 100644 index 0000000..331c5d7 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestMaxTimeRetryAccountant.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestMaxTimeRetryAccountant { + /** + * Assert that even if interval is large, we'll not wait that long if max time is < interval + * @throws InterruptedException + */ + @Test + public void testMaxTimeBigInterval() throws InterruptedException { + final int maxTime = 10; + final int interval = 1000; + RetryAccountant accountant = new MaxTimeRetryAccountant(1000, maxTime, interval); + long startTime = System.currentTimeMillis(); + while (accountant.retry()) { + accountant.sleep(); + } + long endTime = System.currentTimeMillis(); + Log.info(accountant.toString()); + long elapsed = endTime - startTime; + assertTrue(elapsed >= maxTime); + assertTrue(elapsed < interval); + } + + @Test + public void testMaxTime() throws InterruptedException { + final int maxTime = 10; + RetryAccountant accountant = new MaxTimeRetryAccountant(1000, maxTime, 1); + long startTime = System.currentTimeMillis(); + while (accountant.retry()) { + accountant.sleep(); + } + long endTime = System.currentTimeMillis(); + Log.info(accountant.toString()); + assertTrue("" + (endTime - startTime), (endTime - startTime) >= maxTime); + } + + @Test + public void testMaxTries() throws InterruptedException { + final int maxTime = 100000; + RetryAccountant accountant = new MaxTimeRetryAccountant(4, maxTime, 1); + long startTime = System.currentTimeMillis(); + while (accountant.retry()) { + accountant.sleep(); + } + long endTime = System.currentTimeMillis(); + Log.info(accountant.toString()); + assertTrue((endTime - startTime) < maxTime); + } +} \ No newline at end of file