diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 1fb64a2..63b8663 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -1767,20 +1767,28 @@ public class HBaseFsck extends Configured implements Closeable { throws IOException, KeeperException, InterruptedException { // Divide the checks in two phases. One for default/primary replicas and another // for the non-primary ones. Keeps code cleaner this way. + + List workItems = + new ArrayList(regionInfoMap.size()); for (java.util.Map.Entry e: regionInfoMap.entrySet()) { if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { - checkRegionConsistency(e.getKey(), e.getValue()); + workItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue())); } } + checkRegionConsistencyConcurrently(workItems); + boolean prevHdfsCheck = shouldCheckHdfs(); setCheckHdfs(false); //replicas don't have any hdfs data // Run a pass over the replicas and fix any assignment issues that exist on the currently // deployed/undeployed replicas. + List replicaworkItems = + new ArrayList(regionInfoMap.size()); for (java.util.Map.Entry e: regionInfoMap.entrySet()) { if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - checkRegionConsistency(e.getKey(), e.getValue()); + replicaworkItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue())); } } + checkRegionConsistencyConcurrently(replicaworkItems); setCheckHdfs(prevHdfsCheck); if (shouldCheckHdfs()) { @@ -1789,6 +1797,51 @@ public class HBaseFsck extends Configured implements Closeable { } /** + * Check consistency of all regions using mulitple threads concurrently. + */ + private void checkRegionConsistencyConcurrently( + final List workItems) + throws IOException, KeeperException, InterruptedException { + if (workItems.size() == 0) { + return; // nothing to check + } + + List> workFutures = executor.invokeAll(workItems); + for(Future f: workFutures) { + try { + f.get(); + } catch(ExecutionException e1) { + LOG.warn("Could not check region consistency " , e1.getCause()); + if (e1.getCause() instanceof IOException) { + throw (IOException)e1.getCause(); + } else if (e1.getCause() instanceof KeeperException) { + throw (KeeperException)e1.getCause(); + } else if (e1.getCause() instanceof InterruptedException) { + throw (InterruptedException)e1.getCause(); + } else { + throw new IOException(e1.getCause()); + } + } + } + } + + class CheckRegionConsistencyWorkItem implements Callable { + private final String key; + private final HbckInfo hbi; + + CheckRegionConsistencyWorkItem(String key, HbckInfo hbi) { + this.key = key; + this.hbi = hbi; + } + + @Override + public synchronized Void call() throws Exception { + checkRegionConsistency(key, hbi); + return null; + } + } + + /** * Check and fix table states, assumes full info available: * - tableInfos * - empty tables loaded @@ -2129,40 +2182,55 @@ public class HBaseFsck extends Configured implements Closeable { HRegionInfo hri = hbi.getHdfsHRI(); TableInfo tableInfo = tablesInfo.get(hri.getTable()); if (tableInfo.regionsFromMeta.isEmpty()) { - for (HbckInfo h : regionInfoMap.values()) { - if (hri.getTable().equals(h.getTableName())) { - if (h.metaEntry != null) tableInfo.regionsFromMeta - .add((HRegionInfo) h.metaEntry); + // Make sure that the list is populated thread-safe. Populating the list should + // only happen once per table. + synchronized(tableInfo.regionsFromMeta) { + if (tableInfo.regionsFromMeta.isEmpty()) { + for (HbckInfo h : regionInfoMap.values()) { + if (hri.getTable().equals(h.getTableName())) { + if (h.metaEntry != null) { + tableInfo.regionsFromMeta.add((HRegionInfo) h.metaEntry); + } + } + } + Collections.sort(tableInfo.regionsFromMeta); } - } - Collections.sort(tableInfo.regionsFromMeta); + } // end of synchronized(tableInfo.regionsFromMeta) } - for (HRegionInfo region : tableInfo.regionsFromMeta) { - if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0 + // Make sure that the list traverses thread-safe. + synchronized(tableInfo.regionsFromMeta) { + for (HRegionInfo region : tableInfo.regionsFromMeta) { + if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0 && (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(), hri.getEndKey()) >= 0) && Bytes.compareTo(region.getStartKey(), hri.getEndKey()) <= 0) { - if(region.isSplit() || region.isOffline()) continue; - Path regionDir = hbi.getHdfsRegionDir(); - FileSystem fs = regionDir.getFileSystem(getConf()); - List familyDirs = FSUtils.getFamilyDirs(fs, regionDir); - for (Path familyDir : familyDirs) { - List referenceFilePaths = FSUtils.getReferenceFilePaths(fs, familyDir); - for (Path referenceFilePath : referenceFilePaths) { - Path parentRegionDir = + if(region.isSplit() || region.isOffline()) { + continue; + } + Path regionDir = hbi.getHdfsRegionDir(); + FileSystem fs = regionDir.getFileSystem(getConf()); + List familyDirs = FSUtils.getFamilyDirs(fs, regionDir); + for (Path familyDir : familyDirs) { + List referenceFilePaths = FSUtils.getReferenceFilePaths(fs, familyDir); + for (Path referenceFilePath : referenceFilePaths) { + Path parentRegionDir = StoreFileInfo.getReferredToFile(referenceFilePath).getParent().getParent(); - if (parentRegionDir.toString().endsWith(region.getEncodedName())) { - LOG.warn(hri + " start and stop keys are in the range of " + region + if (parentRegionDir.toString().endsWith(region.getEncodedName())) { + LOG.warn(hri + " start and stop keys are in the range of " + region + ". The region might not be cleaned up from hdfs when region " + region + " split failed. Hence deleting from hdfs."); - HRegionFileSystem.deleteRegionFromFileSystem(getConf(), fs, - regionDir.getParent(), hri); - return; + HRegionFileSystem.deleteRegionFromFileSystem( + getConf(), + fs, + regionDir.getParent(), + hri); + return; + } } } } } - } + } // end of synchronized(tableInfo.regionsFromMeta) LOG.info("Patching hbase:meta with .regioninfo: " + hbi.getHdfsHRI()); int numReplicas = admin.getTableDescriptor(hbi.getTableName()).getRegionReplication(); HBaseFsckRepair.fixMetaHoleOnlineAndAddReplicas(getConf(), hbi.getHdfsHRI(), @@ -2493,7 +2561,8 @@ public class HBaseFsck extends Configured implements Closeable { TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp); // list of regions derived from meta entries. - final List regionsFromMeta = new ArrayList(); + final List regionsFromMeta = + Collections.synchronizedList(new ArrayList()); TableInfo(TableName name) { this.tableName = name;