From f27c0a94557bb767f4532f543f110976a91101c6 Mon Sep 17 00:00:00 2001 From: Victoria Dudin Date: Fri, 27 Feb 2015 19:49:08 -0800 Subject: [PATCH] Added unit test, corrected existing parallel execution test --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 82 +++++++++++++++++++--- .../apache/hadoop/hbase/util/TestHBaseFsck.java | 46 +++++++++++- 2 files changed, 116 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index a8b60cd..06168b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.PrintWriter; import java.io.StringWriter; import java.net.InetAddress; @@ -59,6 +60,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.TreeMultimap; import com.google.protobuf.ServiceException; + import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -197,7 +199,8 @@ public class HBaseFsck extends Configured implements Closeable { private static final int DEFAULT_MAX_MERGE = 5; private static final String TO_BE_LOADED = "to_be_loaded"; private static final String HBCK_LOCK_FILE = "hbase-hbck.lock"; - + private static final int MAX_LOCK_FILE_ATTEMPTS = 5; + private static final int LOCK_FILE_ATTEMPT_SLEEP_INTERVAL = 200; /********************** * Internal resources @@ -290,6 +293,8 @@ public class HBaseFsck extends Configured implements Closeable { new HashMap>(); private Map tableStates = new HashMap(); + private final RetryCounterFactory lockFileRetryCounterFactory; + /** * Constructor @@ -311,6 +316,9 @@ public class HBaseFsck extends Configured implements Closeable { int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS); executor = new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck")); + lockFileRetryCounterFactory = new RetryCounterFactory( + getConf().getInt("hbase.hbck.lockfile.attempts", MAX_LOCK_FILE_ATTEMPTS), + getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval", LOCK_FILE_ATTEMPT_SLEEP_INTERVAL)); } /** @@ -328,9 +336,17 @@ public class HBaseFsck extends Configured implements Closeable { super(conf); errors = getErrorReporter(getConf()); this.executor = exec; + lockFileRetryCounterFactory = new RetryCounterFactory( + getConf().getInt("hbase.hbck.lockfile.attempts", MAX_LOCK_FILE_ATTEMPTS), + getConf().getInt("hbase.hbck.lockfile.attempt.sleep.interval", + LOCK_FILE_ATTEMPT_SLEEP_INTERVAL)); } private class FileLockCallable implements Callable { + RetryCounter retryCounter; + public FileLockCallable(RetryCounter retryCounter) { + this.retryCounter = retryCounter; + } @Override public FSDataOutputStream call() throws IOException { try { @@ -340,7 +356,7 @@ public class HBaseFsck extends Configured implements Closeable { Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY); fs.mkdirs(tmpDir); HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE); - final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false); + final FSDataOutputStream out = createFileWithRetries(fs, HBCK_LOCK_PATH, defaultPerms); out.writeBytes(InetAddress.getLocalHost().toString()); out.flush(); return out; @@ -352,6 +368,33 @@ public class HBaseFsck extends Configured implements Closeable { } } } + + private FSDataOutputStream createFileWithRetries(final FileSystem fs, + final Path hbckLockFilePath, final FsPermission defaultPerms) + throws IOException { + + IOException exception = null; + do { + try { + return FSUtils.create(fs, hbckLockFilePath, defaultPerms, false); + } catch (IOException ioe) { + LOG.info("Failed to create lock file " + hbckLockFilePath.getName() + + ", try=" + (retryCounter.getAttemptTimes() + 1) + " of " + + retryCounter.getMaxAttempts()); + LOG.debug(ioe); + try { + exception = ioe; + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + throw (InterruptedIOException) new InterruptedIOException( + "Can't create lock file " + hbckLockFilePath.getName()) + .initCause(ie); + } + } + } while (retryCounter.shouldRetry()); + + throw exception; + } } /** @@ -361,7 +404,8 @@ public class HBaseFsck extends Configured implements Closeable { * @throws IOException */ private FSDataOutputStream checkAndMarkRunningHbck() throws IOException { - FileLockCallable callable = new FileLockCallable(); + RetryCounter retryCounter = lockFileRetryCounterFactory.create(); + FileLockCallable callable = new FileLockCallable(retryCounter); ExecutorService executor = Executors.newFixedThreadPool(1); FutureTask futureTask = new FutureTask(callable); executor.execute(futureTask); @@ -385,14 +429,30 @@ public class HBaseFsck extends Configured implements Closeable { } private void unlockHbck() { - if(hbckLockCleanup.compareAndSet(true, false)){ - IOUtils.closeStream(hbckOutFd); - try{ - FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), HBCK_LOCK_PATH, true); - } catch(IOException ioe) { - LOG.warn("Failed to delete " + HBCK_LOCK_PATH); - LOG.debug(ioe); - } + if (hbckLockCleanup.compareAndSet(true, false)) { + RetryCounter retryCounter = lockFileRetryCounterFactory.create(); + do { + try { + IOUtils.closeStream(hbckOutFd); + FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), + HBCK_LOCK_PATH, true); + return; + } catch (IOException ioe) { + LOG.warn("Failed to delete " + HBCK_LOCK_PATH + ", try=" + + (retryCounter.getAttemptTimes() + 1) + " of " + + retryCounter.getMaxAttempts()); + LOG.debug(ioe); + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while deleting lock file" + + HBCK_LOCK_PATH); + return; + } + } + } while (retryCounter.shouldRetry()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 0d3a94e..8023549 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -555,8 +555,10 @@ public class TestHBaseFsck { boolean fail = true; @Override public HBaseFsck call(){ + Configuration c = new Configuration(conf); + c.setInt("hbase.hbck.lockfile.attempts", 1); try{ - return doFsck(conf, false); + return doFsck(c, false); } catch(Exception e){ if (e.getMessage().contains("Duplicate hbck")) { fail = false; @@ -584,6 +586,48 @@ public class TestHBaseFsck { assert(h2.getRetCode() >= 0); } } + + /** + * This test makes sure that parallel instances of Hbck is disabled. + * + * @throws Exception + */ + @Test (timeout=180000) + public void testParallelWithRetriesHbck() throws Exception { + final ExecutorService service; + final Future hbck1,hbck2; + + class RunHbck implements Callable{ + boolean fail = true; + @Override + public HBaseFsck call(){ + try{ + return doFsck(conf, false); + } catch(Exception e){ + if (e.getMessage().contains("Duplicate hbck")) { + fail = false; + } + } + // If we reach here, then an exception was caught + if (fail) fail(); + return null; + } + } + service = Executors.newFixedThreadPool(2); + hbck1 = service.submit(new RunHbck()); + hbck2 = service.submit(new RunHbck()); + service.shutdown(); + //wait for 15 seconds, for both hbck calls finish + service.awaitTermination(15, TimeUnit.SECONDS); + HBaseFsck h1 = hbck1.get(); + HBaseFsck h2 = hbck2.get(); + // Both should be successful + assertNotNull(h1); + assertNotNull(h2); + assert(h1.getRetCode() >= 0); + assert(h2.getRetCode() >= 0); + + } /** * This create and fixes a bad table with regions that have a duplicate -- 2.2.2