From 2ceade1a6666b10a0290159be0e5130534976b3c Mon Sep 17 00:00:00 2001 From: Konstantin Ryakhovskiy Date: Sat, 2 Jul 2016 17:34:35 +0200 Subject: [PATCH] HBASE-14422 enhance logging and change threshold --- .../client/PreemptiveFastFailInterceptor.java | 55 +- .../hbase/client/TestFastFailWithoutTestUtil.java | 638 +++++++++++++++++++++ hbase-client/src/test/resources/log4j.properties | 3 +- 3 files changed, 692 insertions(+), 4 deletions(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index fed87c1..d8de321 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; @@ -237,9 +239,22 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { // if fInfo is null --> The server is considered good. // If the server is bad, wait long enough to believe that the server is // down. - return (fInfo != null && - EnvironmentEdgeManager.currentTime() > - (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); + + long currentTime = EnvironmentEdgeManager.currentTime(); + long firstFailure = fInfo == null ? 0 : fInfo.timeOfFirstFailureMilliSec; + long ffThreshold = this.fastFailThresholdMilliSec; + + TraceTempUtil.log("intercept#inFastFail();"); + TraceTempUtil.log("current: %d; first failure: %d; threshold: %d", + currentTime, firstFailure, ffThreshold); + long diff = currentTime - (firstFailure + ffThreshold); + TraceTempUtil.log("difference: %d", diff); + + return (fInfo != null && currentTime > (firstFailure + ffThreshold)); + +// return (fInfo != null && +// EnvironmentEdgeManager.currentTime() > +// (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); } /** @@ -357,3 +372,37 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { return "PreemptiveFastFailInterceptor"; } } + +class TraceTempUtil { + + private static final boolean enabled = true; + + public static ThreadFactory getNamedThreadFactory(final String prefix) { + return new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, prefix + "-" + counter.incrementAndGet()); + } + }; + } + + public static void log(String message) { + if (!enabled) + return; + final String tname = Thread.currentThread().getName(); + + if (tname.endsWith("-2")) + message = String.format("\t\t\t\t\t\t\t\t\t\t\t%s %s", tname, message); + else + message = String.format("%s %s", tname, message); + System.out.println(message); + } + + public static void log(String format, Object... args) { + if (!enabled) + return; + String message = String.format(format, args); + log(message); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java new file mode 100644 index 0000000..04488e9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.EOFException; +import java.io.IOException; +import java.io.SyncFailedException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.*; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestFastFailWithoutTestUtil { + private static final Log LOG = LogFactory.getLog(TestFastFailWithoutTestUtil.class); + + @Test + public void testInterceptorFactoryMethods() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( + conf); + + RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory + .build(); + assertTrue("We should be getting a PreemptiveFastFailInterceptor", + interceptorBeforeCast instanceof PreemptiveFastFailInterceptor); + PreemptiveFastFailInterceptor interceptor = (PreemptiveFastFailInterceptor) interceptorBeforeCast; + + RetryingCallerInterceptorContext contextBeforeCast = interceptor + .createEmptyContext(); + assertTrue( + "We should be getting a FastFailInterceptorContext since we are interacting with the" + + " PreemptiveFastFailInterceptor", + contextBeforeCast instanceof FastFailInterceptorContext); + + FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast; + assertTrue(context != null); + + conf = HBaseConfiguration.create(); + interceptorFactory = new RetryingCallerInterceptorFactory(conf); + + interceptorBeforeCast = interceptorFactory.build(); + assertTrue( + "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE", + interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor); + + contextBeforeCast = interceptorBeforeCast.createEmptyContext(); + assertTrue( + "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor", + contextBeforeCast instanceof NoOpRetryingInterceptorContext); + + assertTrue(context != null); + } + + @Test + public void testInterceptorContextClear() { + PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor(); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + context.clear(); + assertFalse(context.getCouldNotCommunicateWithServer().booleanValue()); + assertEquals(context.didTry(), false); + assertEquals(context.getFailureInfo(), null); + assertEquals(context.getServer(), null); + assertEquals(context.getTries(), 0); + } + + @Test + public void testInterceptorContextPrepare() throws IOException { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + RetryingCallable callable = new RegionServerCallable(null, + null, null) { + @Override + public Boolean call(int callTimeout) throws Exception { + return true; + } + + @Override + protected HRegionLocation getLocation() { + return new HRegionLocation(null, ServerName.valueOf("localhost", 1234, + 987654321)); + } + }; + context.prepare(callable); + ServerName server = getSomeServerName(); + assertEquals(context.getServer(), server); + context.clear(); + context.prepare(callable, 2); + assertEquals(context.getServer(), server); + } + + @Ignore @Test + public void testInterceptorIntercept50Times() throws IOException, + InterruptedException { + for (int i = 0; i < 50; i++) { + testInterceptorIntercept(); + } + } + + public void testInterceptorIntercept() throws IOException, + InterruptedException { + Configuration conf = HBaseConfiguration.create(); + long CLEANUP_TIMEOUT = 50; + long FAST_FAIL_THRESHOLD = 10; + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + CLEANUP_TIMEOUT); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + FAST_FAIL_THRESHOLD); + + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); + + // Lets simulate some work flow here. + int tries = 0; + context.prepare(callable, tries); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("Interceptor should have updated didTry to true", + context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNull( + "Once a failure is identified, the first time the FailureInfo is generated for the server," + + " but it is not assigned to the context yet. It would be assigned on the next" + + " intercept.", context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertFalse( + "We are still in the first attempt and so we dont set this variable to true yet.", + context.isRetryDespiteFastFailMode()); + + Thread.sleep(FAST_FAIL_THRESHOLD + 1); // We sleep so as to make sure that + // we + // actually consider this server as a + // dead server in the next attempt. + tries++; + + context.prepare(callable, tries); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("didTru should remain true", context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNotNull( + "The context this time is updated with a failureInfo, since we already gave it a try.", + context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertTrue( + "Since we are alone here we would be given the permission to retryDespiteFailures.", + context.isRetryDespiteFastFailMode()); + context.clear(); + + Thread.sleep(CLEANUP_TIMEOUT); // Lets try and cleanup the data in the fast + // fail failure maps. + + tries++; + + context.clear(); + context.prepare(callable, tries); + interceptor.occasionallyCleanupFailureInformation(); + assertNull("The cleanup should have cleared the server", + interceptor.repeatedFailuresMap.get(context.getServer())); + interceptor.intercept(context); + interceptor.handleFailure(context, new ConnectException( + "Failed to connect to server")); + interceptor.updateFailureInfo(context); + assertTrue("didTru should remain true", context.didTry()); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + assertNull("The failureInfo is cleared off from the maps.", + context.getFailureInfo()); + assertEquals(context.getTries(), tries); + assertFalse( + "Since we are alone here we would be given the permission to retryDespiteFailures.", + context.isRetryDespiteFastFailMode()); + context.clear(); + + } + + private RetryingCallable getDummyRetryingCallable( + ServerName someServerName) { + return new RegionServerCallable(null, null, null) { + @Override + public T call(int callTimeout) throws Exception { + return null; + } + + @Override + protected HRegionLocation getLocation() { + return new HRegionLocation(null, serverName); + } + }; + } + + @Test + public void testExceptionsIdentifiedByInterceptor() throws IOException { + Throwable[] networkexceptions = new Throwable[] { + new ConnectException("Mary is unwell"), + new SocketTimeoutException("Mike is too late"), + new ClosedChannelException(), + new SyncFailedException("Dave is not on the same page"), + new TimeoutException("Mike is late again"), + new EOFException("This is the end... "), + new ConnectionClosingException("Its closing") }; + final String INDUCED = "Induced"; + Throwable[] nonNetworkExceptions = new Throwable[] { + new IOException("Bob died"), + new RemoteException("Bob's cousin died", null), + new NoSuchMethodError(INDUCED), new NullPointerException(INDUCED), + new DoNotRetryIOException(INDUCED), new Error(INDUCED) }; + + Configuration conf = HBaseConfiguration.create(); + long CLEANUP_TIMEOUT = 0; + long FAST_FAIL_THRESHOLD = 1000000; + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + CLEANUP_TIMEOUT); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + FAST_FAIL_THRESHOLD); + for (Throwable e : networkexceptions) { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); + context.prepare(callable, 0); + interceptor.intercept(context); + interceptor.handleFailure(context, e); + interceptor.updateFailureInfo(context); + assertTrue( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + } + for (Throwable e : nonNetworkExceptions) { + try { + PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil + .createPreemptiveInterceptor(conf); + FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor + .createEmptyContext(); + + RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); + context.prepare(callable, 0); + interceptor.intercept(context); + interceptor.handleFailure(context, e); + interceptor.updateFailureInfo(context); + assertFalse( + "The call shouldn't have been successful if there was a ConnectException", + context.getCouldNotCommunicateWithServer().booleanValue()); + } catch (NoSuchMethodError t) { + assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); + } catch (NullPointerException t) { + assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); + } catch (DoNotRetryIOException t) { + assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); + } catch (Error t) { + assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); + } + } + } + + protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor( + Configuration conf) { + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( + conf); + RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory + .build(); + return (PreemptiveFastFailInterceptor) interceptorBeforeCast; + } + + static PreemptiveFastFailInterceptor createPreemptiveInterceptor() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + return createPreemptiveInterceptor(conf); + } + + @Test(timeout = 120000) + public void testPreemptiveFastFailException50Times() + throws InterruptedException, ExecutionException { + for (int i = 0; i < 50; i++) { + TraceTempUtil.log("#%d=======================================================================", i); + testPreemptiveFastFailException(); + } + } + + /*** + * This test tries to create a thread interleaving of the 2 threads trying to do a + * Retrying operation using a {@link PreemptiveFastFailInterceptor}. The goal here is to make sure + * that the second thread will be attempting the operation while the first thread is in the + * process of making an attempt after it has marked the server in fast fail. + * + * The thread execution is as follows : + * The PreemptiveFastFailInterceptor is extended in this test to achieve a good interleaving + * behavior without using any thread sleeps. + * + * Privileged Thread 1 NonPrivileged Thread 2 + * + * Retry 0 : intercept + * + * Retry 0 : handleFailure + * latches[0].countdown + * latches2[0].await + * latches[0].await + * intercept : Retry 0 + * + * handleFailure : Retry 0 + * + * updateFailureinfo : Retry 0 + * latches2[0].countdown + * + * Retry 0 : updateFailureInfo + * + * Retry 1 : intercept + * + * Retry 1 : handleFailure + * latches[1].countdown + * latches2[1].await + * + * latches[1].await + * intercept : Retry 1 + * (throws PFFE) + * handleFailure : Retry 1 + * + * updateFailureinfo : Retry 1 + * latches2[1].countdown + * Retry 1 : updateFailureInfo + * + * + * See getInterceptor() for more details on the interceptor implementation to make sure this + * thread interleaving is achieved. + * + * We need 2 sets of latches of size MAX_RETRIES. We use an AtomicInteger done to make sure that + * we short circuit the Thread 1 after we hit the PFFE on Thread 2 + * + * + * @throws InterruptedException + * @throws ExecutionException + */ + private void testPreemptiveFastFailException() throws InterruptedException, + ExecutionException { + LOG.debug("Setting up the counters to start the test"); + priviRetryCounter.set(0); + nonPriviRetryCounter.set(0); + done.set(0); + + for (int i = 0; i <= RETRIES; i++) { + latches[i] = new CountDownLatch(1); + latches2[i] = new CountDownLatch(1); + } + + PreemptiveFastFailInterceptor interceptor = getInterceptor(); + + final RpcRetryingCaller priviCaller = getRpcRetryingCaller( + PAUSE_TIME, RETRIES, interceptor); + final RpcRetryingCaller nonPriviCaller = getRpcRetryingCaller( + PAUSE_TIME, RETRIES, interceptor); + + LOG.debug("Submitting the thread 1"); + Future priviFuture = executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + try { + isPriviThreadLocal.get().set(true); + priviCaller + .callWithRetries( + getRetryingCallable(serverName, exception), + CLEANUP_TIMEOUT); + } catch (RetriesExhaustedException e) { + return true; + } catch (PreemptiveFastFailException e) { + return false; + } + return false; + } + }); + LOG.debug("Submitting the thread 2"); + Future nonPriviFuture = executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + try { + isPriviThreadLocal.get().set(false); + nonPriviCaller.callWithRetries( + getRetryingCallable(serverName, exception), + CLEANUP_TIMEOUT); + } catch (PreemptiveFastFailException e) { + return true; + } + return false; + } + }); + LOG.debug("Waiting for Thread 2 to finish"); + try { + nonPriviFuture.get(30, TimeUnit.SECONDS); + assertTrue(nonPriviFuture.get()); + } catch (TimeoutException e) { + Threads.printThreadInfo(System.out, + "This should not hang but seems to sometimes...FIX! Here is a thread dump!"); + } + + LOG.debug("Waiting for Thread 1 to finish"); + try { + priviFuture.get(30, TimeUnit.SECONDS); + assertTrue(priviFuture.get()); + } catch (TimeoutException e) { + // There is something wrong w/ the latching but don't have time to fix. If timesout, just + // let it go for now till someone has time to look. Meantime, here is thread dump. + Threads.printThreadInfo(System.out, + "This should not hang but seems to sometimes...FIX! Here is a thread dump!"); + } + + // Now that the server in fast fail mode. Lets try to make contact with the + // server with a third thread. And make sure that when there is no + // exception, + // the fast fail gets cleared up. + assertTrue(interceptor.isServerInFailureMap(serverName)); + final RpcRetryingCaller priviCallerNew = getRpcRetryingCaller( + PAUSE_TIME, RETRIES, interceptor); + executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + priviCallerNew.callWithRetries( + getRetryingCallable(serverName, null), CLEANUP_TIMEOUT); + return false; + } + }).get(); + assertFalse("The server was supposed to be removed from the map", + interceptor.isServerInFailureMap(serverName)); + } + + ExecutorService executor = + Executors.newCachedThreadPool(TraceTempUtil.getNamedThreadFactory("TT")); + + /** + * Some timeouts to make the test execution resonable. + */ + final int PAUSE_TIME = 10; + final int RETRIES = 3; + final int CLEANUP_TIMEOUT = 10000; + final long FAST_FAIL_THRESHOLD = PAUSE_TIME / 5; + final int RPC_TIMEOUT = 1; + + /** + * The latches necessary to make the thread interleaving possible. + */ + final CountDownLatch[] latches = new CountDownLatch[RETRIES + 1]; + final CountDownLatch[] latches2 = new CountDownLatch[RETRIES + 1]; + final AtomicInteger done = new AtomicInteger(0); + + /** + * Global retry counters that give us an idea about which iteration of the retry we are in + */ + final AtomicInteger priviRetryCounter = new AtomicInteger(); + final AtomicInteger nonPriviRetryCounter = new AtomicInteger(); + final ServerName serverName = getSomeServerName(); + + /** + * The variable which is used as an identifier within the 2 threads. + */ + public final ThreadLocal isPriviThreadLocal = new ThreadLocal() { + @Override + public AtomicBoolean initialValue() { + return new AtomicBoolean(true); + } + }; + final Exception exception = new ConnectionClosingException("The current connection is closed"); + + public PreemptiveFastFailInterceptor getInterceptor() { + final Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, + CLEANUP_TIMEOUT); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, + FAST_FAIL_THRESHOLD); + + return new PreemptiveFastFailInterceptor( + conf) { + @Override + public void updateFailureInfo(RetryingCallerInterceptorContext context) { + boolean pffe = false; + if (!isPriviThreadLocal.get().get()) { + pffe = !((FastFailInterceptorContext)context).isRetryDespiteFastFailMode(); + } + TraceTempUtil.log("pffe: %b; updateFailureInfo()", pffe); + if (isPriviThreadLocal.get().get()) { + try { + // Thread 2 should be done by 2 iterations. We should short circuit Thread 1 because + // Thread 2 would be dead and can't do a countdown. + if (done.get() <= 1) { + TraceTempUtil.log("L2[%d].await(); updateFailureInfo()", priviRetryCounter.get()); + latches2[priviRetryCounter.get()].await(); + } + } catch (InterruptedException e) { + fail(); + } + } + super.updateFailureInfo(context); + if (!isPriviThreadLocal.get().get()) { + TraceTempUtil.log("pffe: %b, done: %d, increment: %s", pffe, done.get(), + pffe ? "done = " + (done.get() + 1) : "done=" + done.get()); + if (pffe) done.incrementAndGet(); + TraceTempUtil.log("L2[%d].countDown(); updateFailureInfo()", nonPriviRetryCounter.get()); + latches2[nonPriviRetryCounter.get()].countDown(); + } + } + + @Override + public void intercept(RetryingCallerInterceptorContext context) + throws PreemptiveFastFailException { + if (!isPriviThreadLocal.get().get()) { + try { + TraceTempUtil.log("L1[%d].await(); intercept()", nonPriviRetryCounter.get()); + latches[nonPriviRetryCounter.getAndIncrement()].await(); + } catch (InterruptedException e) { + fail(); + } + } + super.intercept(context); + } + + @Override + public void handleFailure(RetryingCallerInterceptorContext context, + Throwable t) throws IOException { + super.handleFailure(context, t); + if (isPriviThreadLocal.get().get()) { + TraceTempUtil.log("L1[%d].countDown(); handleFailure()", priviRetryCounter.get()); + latches[priviRetryCounter.getAndIncrement()].countDown(); + } + } + }; + } + + public RpcRetryingCaller getRpcRetryingCaller(int pauseTime, + int retries, RetryingCallerInterceptor interceptor) { + return new RpcRetryingCallerImpl(pauseTime, retries, interceptor, 9, RPC_TIMEOUT) { + @Override + public Void callWithRetries(RetryingCallable callable, + int callTimeout) throws IOException, RuntimeException { + Void ret = super.callWithRetries(callable, callTimeout); + return ret; + } + }; + } + + protected static ServerName getSomeServerName() { + return ServerName.valueOf("localhost", 1234, 987654321); + } + + private RegionServerCallable getRetryingCallable( + final ServerName serverName, final Exception e) { + return new RegionServerCallable(null, null, null) { + @Override + public void prepare(boolean reload) throws IOException { + this.location = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, + serverName); + } + + @Override + public Void call(int callTimeout) throws Exception { + if (e != null) + throw e; + return null; + } + + @Override + protected HRegionLocation getLocation() { + return new HRegionLocation(null, serverName); + } + + @Override + public void throwable(Throwable t, boolean retrying) { + // Do nothing + } + + @Override + public long sleep(long pause, int tries) { + return ConnectionUtils.getPauseTime(pause, tries + 1); + } + }; + } + + +} diff --git a/hbase-client/src/test/resources/log4j.properties b/hbase-client/src/test/resources/log4j.properties index 4eeeb2c..d6d97bf 100644 --- a/hbase-client/src/test/resources/log4j.properties +++ b/hbase-client/src/test/resources/log4j.properties @@ -56,7 +56,8 @@ log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): log4j.logger.org.apache.hadoop=WARN log4j.logger.org.apache.zookeeper=ERROR -log4j.logger.org.apache.hadoop.hbase=DEBUG +log4j.logger.org.apache.hadoop.hbase=WARN +log4j.logger.org.apache.hadoop.hbase.client=TRACE #These two settings are workarounds against spurious logs from the minicluster. #See HBASE-4709 -- 1.8.3.1