commit db0b95db5d1e7739bec728df1d77f423f92db44b Author: nspiegelberg Date: 52 seconds ago Make HBCK utility faster diff --git a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 8b64738..55423af 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -29,6 +29,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,6 +73,9 @@ public class HBaseFsck { public static final long DEFAULT_TIME_LAG = 60000; // default value of 1 minute public static final long DEFAULT_SLEEP_BEFORE_RERUN = 10000; + private static final int MAX_NUM_THREADS = 50; // #threads to contact regions + private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName()); private Configuration conf; @@ -89,6 +95,9 @@ public class HBaseFsck { private static boolean summary = false; // if we want to print less output // Empty regioninfo qualifiers in .META. private Set emptyRegionInfoQualifiers = new HashSet(); + private int numThreads = MAX_NUM_THREADS; + + ThreadPoolExecutor executor; // threads to retrieve data from regionservers /** * Constructor @@ -104,6 +113,11 @@ public class HBaseFsck { HBaseAdmin admin = new HBaseAdmin(conf); status = admin.getMaster().getClusterStatus(); connection = admin.getConnection(); + + numThreads = conf.getInt("hbasefsck.numthreads", numThreads); + executor = new ThreadPoolExecutor(0, numThreads, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue()); } /** @@ -236,7 +250,7 @@ public class HBaseFsck { * Scan HDFS for all regions, recording their information into * regionInfo */ - void checkHdfs() throws IOException { + void checkHdfs() throws IOException, InterruptedException { Path rootDir = new Path(conf.get(HConstants.HBASE_DIR)); FileSystem fs = rootDir.getFileSystem(conf); @@ -259,35 +273,19 @@ public class HBaseFsck { } // level 1: /* + WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()]; + int num = 0; for (FileStatus tableDir : tableDirs) { - String tableName = tableDir.getPath().getName(); - // ignore hidden files - if (tableName.startsWith(".") && - !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) - continue; - // level 2: //* - FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); - for (FileStatus regionDir : regionDirs) { - String encodedName = regionDir.getPath().getName(); - // ignore directories that aren't hexadecimal - if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue; - - HbckInfo hbi = getOrCreateInfo(encodedName); - hbi.foundRegionDir = regionDir; - - // Set a flag if this region contains only edits - // This is special case if a region is left after split - hbi.onlyEdits = true; - FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); - if (subDirs != null) { - Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath()); - for (FileStatus subDir : subDirs) { - String sdName = subDir.getPath().getName(); - if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { - hbi.onlyEdits = false; - break; - } - } + dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir); + executor.execute(dirs[num]); + num++; + } + + // wait for all directories to be done + for (int i = 0; i < num; i++) { + synchronized (dirs[i]) { + while (!dirs[i].isDone()) { + dirs[i].wait(); } } } @@ -322,37 +320,24 @@ public class HBaseFsck { * @throws IOException if a remote or network exception occurs */ void processRegionServers(Collection regionServerList) - throws IOException { + throws IOException, InterruptedException { - // loop to contact each region server - for (HServerInfo rsinfo: regionServerList) { - errors.progress(); - try { - HRegionInterface server = connection.getHRegionConnection( - rsinfo.getServerAddress()); + WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()]; + int num = 0; - // list all online regions from this region server - List regions = server.getOnlineRegions(); - if (details) { - errors.detail("RegionServer: " + rsinfo.getServerName() + - " number of regions: " + regions.size()); - for (HRegionInfo rinfo: regions) { - errors.detail(" " + rinfo.getRegionNameAsString() + - " id: " + rinfo.getRegionId() + - " encoded_name: " + rinfo.getEncodedName() + - " start: " + Bytes.toStringBinary(rinfo.getStartKey()) + - " end: " + Bytes.toStringBinary(rinfo.getEndKey())); - } - } - - // check to see if the existance of this region matches the region in META - for (HRegionInfo r:regions) { - HbckInfo hbi = getOrCreateInfo(r.getEncodedName()); - hbi.deployedOn.add(rsinfo.getServerAddress()); + // loop to contact each region server in parallel + for (HServerInfo rsinfo:regionServerList) { + work[num] = new WorkItemRegion(this, rsinfo, errors, connection); + executor.execute(work[num]); + num++; + } + + // wait for all submitted tasks to be done + for (int i = 0; i < num; i++) { + synchronized (work[i]) { + while (!work[i].isDone()) { + work[i].wait(); } - } catch (IOException e) { // unable to connect to the region server. - errors.reportError("\nRegionServer:" + rsinfo.getServerName() + - " Unable to fetch region information. " + e); } } } @@ -611,7 +596,7 @@ public class HBaseFsck { * region name. If the region has not been seen yet, a new entry is added * and returned. */ - private HbckInfo getOrCreateInfo(String name) { + private synchronized HbckInfo getOrCreateInfo(String name) { HbckInfo hbi = regionInfo.get(name); if (hbi == null) { hbi = new HbckInfo(null); @@ -779,7 +764,11 @@ public class HBaseFsck { this.metaEntry = metaEntry; } - public String toString() { + public synchronized void addServer(HServerAddress server) { + this.deployedOn.add(server); + } + + public synchronized String toString() { if (metaEntry != null) { return metaEntry.getRegionNameAsString(); } else if (foundRegionDir != null) { @@ -822,7 +811,7 @@ public class HBaseFsck { public int errorCount = 0; private int showProgress; - public void reportError(String message) { + public synchronized void reportError(String message) { if (!summary) { System.out.println("ERROR: " + message); } @@ -830,7 +819,7 @@ public class HBaseFsck { showProgress = 0; } - public int summarize() { + public synchronized int summarize() { System.out.println(Integer.toString(errorCount) + " inconsistencies detected."); if (errorCount == 0) { @@ -842,20 +831,20 @@ public class HBaseFsck { } } - public void print(String message) { + public synchronized void print(String message) { if (!summary) { System.out.println(message); } } - public void detail(String message) { + public synchronized void detail(String message) { if (details) { System.out.println(message); } showProgress = 0; } - public void progress() { + public synchronized void progress() { if (showProgress++ == 10) { if (!summary) { System.out.print("."); @@ -866,6 +855,137 @@ public class HBaseFsck { } /** + * Contact a region server and get all information from it + */ + static class WorkItemRegion implements Runnable { + private HBaseFsck hbck; + private HServerInfo rsinfo; + private ErrorReporter errors; + private HConnection connection; + private boolean done; + + WorkItemRegion(HBaseFsck hbck, HServerInfo info, + ErrorReporter errors, HConnection connection) { + this.hbck = hbck; + this.rsinfo = info; + this.errors = errors; + this.connection = connection; + this.done = false; + } + + // is this task done? + synchronized boolean isDone() { + return done; + } + + @Override + public synchronized void run() { + errors.progress(); + try { + HRegionInterface server = connection.getHRegionConnection( + rsinfo.getServerAddress()); + + // list all online regions from this region server + List regions = server.getOnlineRegions(); + if (details) { + errors.detail("RegionServer: " + rsinfo.getServerName() + + " number of regions: " + regions.size()); + for (HRegionInfo rinfo: regions) { + errors.detail(" " + rinfo.getRegionNameAsString() + + " id: " + rinfo.getRegionId() + + " encoded_name: " + rinfo.getEncodedName() + + " start: " + Bytes.toStringBinary(rinfo.getStartKey()) + + " end: " + Bytes.toStringBinary(rinfo.getEndKey())); + } + } + + // check to see if the existance of this region matches the region in META + for (HRegionInfo r:regions) { + HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName()); + hbi.addServer(rsinfo.getServerAddress()); + } + } catch (IOException e) { // unable to connect to the region server. + errors.reportError("RegionServer: " + rsinfo.getServerName() + + " Unable to fetch region information. " + e); + } finally { + done = true; + notifyAll(); // wakeup anybody waiting for this item to be done + } + } + } + + /** + * Contact hdfs and get all information about spcified table directory. + */ + static class WorkItemHdfsDir implements Runnable { + private HBaseFsck hbck; + private FileStatus tableDir; + private ErrorReporter errors; + private FileSystem fs; + private boolean done; + + WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, + FileStatus status) { + this.hbck = hbck; + this.fs = fs; + this.tableDir = status; + this.errors = errors; + this.done = false; + } + + synchronized boolean isDone() { + return done; + } + + @Override + public synchronized void run() { + try { + String tableName = tableDir.getPath().getName(); + // ignore hidden files + if (tableName.startsWith(".") && + !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) + return; + // level 2: /
/* + FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); + for (FileStatus regionDir : regionDirs) { + String encodedName = regionDir.getPath().getName(); + + // ignore directories that aren't hexadecimal + if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue; + + HbckInfo hbi = hbck.getOrCreateInfo(encodedName); + synchronized (hbi) { + if (hbi.foundRegionDir != null) { + errors.print("Directory " + encodedName + " duplicate??" + + hbi.foundRegionDir); + } + hbi.foundRegionDir = regionDir; + + // Set a flag if this region contains only edits + // This is special case if a region is left after split + hbi.onlyEdits = true; + FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); + Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath()); + for (FileStatus subDir : subDirs) { + String sdName = subDir.getPath().getName(); + if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { + hbi.onlyEdits = false; + break; + } + } + } + } + } catch (IOException e) { // unable to connect to the region server. + errors.reportError("Table Directory: " + tableDir.getPath().getName() + + " Unable to fetch region information. " + e); + } finally { + done = true; + notifyAll(); + } + } + } + + /** * Display the full report from fsck. * This displays all live and dead region servers, and all known regions. */