From 46b30d75009255ad5da7b103abcbb5fa799a0fec Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Fri, 19 Sep 2014 14:56:24 -0700 Subject: [PATCH] HBASE-11405 Multiple invocations of hbck in parallel disables balancer permanently (bharath v) Amending-Author: Sean Busbey --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 86 +++++++++++++++++++- .../apache/hadoop/hbase/HBaseTestingUtility.java | 59 +++++++++++++- .../apache/hadoop/hbase/util/TestHBaseFsck.java | 50 ++++++++++++ 3 files changed, 191 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index e028b56..469c98e 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.InetAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -43,6 +44,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -50,10 +52,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -97,7 +101,9 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; @@ -163,6 +169,8 @@ public class HBaseFsck extends Configured implements Tool { private static final int DEFAULT_OVERLAPS_TO_SIDELINE = 2; private static final int DEFAULT_MAX_MERGE = 5; private static final String TO_BE_LOADED = "to_be_loaded"; + private static final String HBCK_LOCK_FILE = "hbase-hbck.lock"; + /********************** * Internal resources @@ -177,6 +185,12 @@ public class HBaseFsck extends Configured implements Tool { private long startMillis = System.currentTimeMillis(); private HFileCorruptionChecker hfcc; private int retcode = 0; + private Path HBCK_LOCK_PATH; + private FSDataOutputStream hbckOutFd; + // This lock is to prevent cleanup of balancer resources twice between + // ShutdownHook and the main code. We cleanup only if the connect() is + // successful + private final AtomicBoolean hbckLockCleanup = new AtomicBoolean(false); /*********** * Options @@ -277,10 +291,78 @@ public class HBaseFsck extends Configured implements Tool { } /** + * This method maintains a lock using a file. If the creation fails we return null + * + * @return FSDataOutputStream object corresponding to the newly opened lock file + * @throws IOException + */ + private FSDataOutputStream checkAndMarkRunningHbck() throws IOException { + try { + FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); + FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(), + HConstants.DATA_FILE_UMASK_KEY); + Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY); + fs.mkdirs(tmpDir); + HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE); + final FSDataOutputStream out = FSUtils.create(fs, HBCK_LOCK_PATH, defaultPerms, false); + out.writeBytes(InetAddress.getLocalHost().toString()); + out.flush(); + return out; + } catch (IOException exception) { + RemoteException e = null; + if (exception instanceof RemoteException) { + e = (RemoteException)exception; + } else if (exception.getCause() instanceof RemoteException) { + e = (RemoteException)(exception.getCause()); + } + if(null != e && AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ + return null; + } + throw exception; + } + } + + private void unlockHbck() { + if(hbckLockCleanup.compareAndSet(true, false)){ + IOUtils.closeStream(hbckOutFd); + try{ + FSUtils.delete(FSUtils.getCurrentFileSystem(getConf()), HBCK_LOCK_PATH, true); + } catch(IOException ioe) { + LOG.warn("Failed to delete " + HBCK_LOCK_PATH); + LOG.debug(ioe); + } + } + } + + /** * To repair region consistency, one must call connect() in order to repair * online state. */ public void connect() throws IOException { + + // Check if another instance of balancer is running + hbckOutFd = checkAndMarkRunningHbck(); + if (hbckOutFd == null) { + setRetCode(-1); + LOG.error("Another instance of hbck is running, exiting this instance.[If you are sure" + + " no other instance is running, delete the lock file " + + HBCK_LOCK_PATH + " and rerun the tool]"); + throw new IOException("Duplicate hbck - Abort"); + } + + // Make sure to cleanup the lock + hbckLockCleanup.set(true); + + // Add a shutdown hook to this thread, incase user tries to + // kill the hbck with a ctrl-c, we want to cleanup the lock so that + // it is available for further calls + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + unlockHbck(); + } + }); + LOG.debug("Launching hbck"); + admin = new HBaseAdmin(getConf()); meta = new HTable(getConf(), HConstants.META_TABLE_NAME); status = admin.getMaster().getClusterStatus(); @@ -462,6 +544,9 @@ public class HBaseFsck extends Configured implements Tool { offlineReferenceFileRepair(); + // Remove the hbck lock + unlockHbck(); + // Print table summary printTableSummary(tablesInfo); return errors.summarize(); @@ -3691,7 +3776,6 @@ public class HBaseFsck extends Configured implements Tool { URI defaultFs = hbasedir.getFileSystem(conf).getUri(); conf.set("fs.defaultFS", defaultFs.toString()); // for hadoop 0.21+ conf.set("fs.default.name", defaultFs.toString()); // for hadoop 0.20 - int ret = ToolRunner.run(new HBaseFsck(conf), args); System.exit(ret); } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c2bcd80..fc54826 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -25,7 +25,9 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.UnknownHostException; @@ -88,6 +90,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; @@ -199,12 +202,18 @@ public class HBaseTestingUtility { // a hbase checksum verification failure will cause unit tests to fail ChecksumUtil.generateExceptionForChecksumFailureForTest(true); - setHDFSClientRetryProperty(); + setHDFSClientRetry(1); } - private void setHDFSClientRetryProperty() { - this.conf.setInt("hdfs.client.retries.number", 1); + /** + * Controls how many attempts we will make in the face of failures in HDFS. + */ + public void setHDFSClientRetry(final int retries) { + this.conf.setInt("hdfs.client.retries.number", retries); HBaseFileSystem.setRetryCounts(conf); + if (0 == retries) { + makeDFSClientNonRetrying(); + } } /** @@ -1924,6 +1933,50 @@ public class HBaseTestingUtility { } } + void makeDFSClientNonRetrying() { + if (null == this.dfsCluster) { + LOG.debug("dfsCluster has not started, can't make client non-retrying."); + return; + } + try { + final FileSystem filesystem = this.dfsCluster.getFileSystem(); + if (!(filesystem instanceof DistributedFileSystem)) { + LOG.debug("dfsCluster is not backed by a DistributedFileSystem, can't make client non-retrying."); + return; + } + // rely on FileSystem.CACHE to alter how we talk via DFSClient + final DistributedFileSystem fs = (DistributedFileSystem)filesystem; + // retrieve the backing DFSClient instance + final Field dfsField = fs.getClass().getDeclaredField("dfs"); + dfsField.setAccessible(true); + final Class dfsClazz = dfsField.getType(); + final DFSClient dfs = DFSClient.class.cast(dfsField.get(fs)); + + // expose the method for creating direct RPC connections. + final Method createRPCNamenode = dfsClazz.getDeclaredMethod("createRPCNamenode", InetSocketAddress.class, Configuration.class, UserGroupInformation.class); + createRPCNamenode.setAccessible(true); + + // grab the DFSClient instance's backing connection information + final Field nnField = dfsClazz.getDeclaredField("nnAddress"); + nnField.setAccessible(true); + final InetSocketAddress nnAddress = InetSocketAddress.class.cast(nnField.get(dfs)); + final Field confField = dfsClazz.getDeclaredField("conf"); + confField.setAccessible(true); + final Configuration conf = Configuration.class.cast(confField.get(dfs)); + final Field ugiField = dfsClazz.getDeclaredField("ugi"); + ugiField.setAccessible(true); + final UserGroupInformation ugi = UserGroupInformation.class.cast(ugiField.get(dfs)); + + // replace the proxy for the namenode rpc with a direct instance + final Field namenodeField = dfsClazz.getDeclaredField("namenode"); + namenodeField.setAccessible(true); + namenodeField.set(dfs, createRPCNamenode.invoke(null, nnAddress, conf, ugi)); + LOG.debug("Set DSFClient namenode to bare RPC"); + } catch (Exception exception) { + LOG.info("Could not alter DFSClient to be non-retrying.", exception); + } + } + /** * Wait until all regions for a table in .META. have a non-empty * info:server, up to 60 seconds. This means all regions have been deployed, diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 4ec70cf..3e95a5e 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -35,8 +35,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -116,6 +120,7 @@ public class TestHBaseFsck { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean("hbase.master.distributed.log.splitting", false); TEST_UTIL.startMiniCluster(3); + TEST_UTIL.setHDFSClientRetry(0); } @AfterClass @@ -539,6 +544,51 @@ public class TestHBaseFsck { } /** + * This test makes sure that parallel instances of Hbck is disabled. + * + * @throws Exception + */ + @Test + public void testParallelHbck() throws Exception { + final ExecutorService service; + final Future hbck1,hbck2; + + class RunHbck implements Callable{ + boolean fail = true; + public HBaseFsck call(){ + try{ + return doFsck(conf, false); + } catch(Exception e){ + if (e.getMessage().contains("Duplicate hbck")) { + fail = false; + } else { + LOG.fatal("hbck failed.", e); + } + } + // If we reach here, then an exception was caught + if (fail) fail(); + return null; + } + } + service = Executors.newFixedThreadPool(2); + hbck1 = service.submit(new RunHbck()); + hbck2 = service.submit(new RunHbck()); + service.shutdown(); + //wait for 15 seconds, for both hbck calls finish + service.awaitTermination(15, TimeUnit.SECONDS); + HBaseFsck h1 = hbck1.get(); + HBaseFsck h2 = hbck2.get(); + // Make sure only one of the calls was successful + assert(h1 == null || h2 == null); + if (h1 != null) { + assert(h1.getRetCode() >= 0); + } + if (h2 != null) { + assert(h2.getRetCode() >= 0); + } + } + + /** * This create and fixes a bad table with regions that have a duplicate * start key */ -- 1.7.10.2 (Apple Git-33)