diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index ad62d71..c9ef4b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -533,6 +533,47 @@ public class HFile { } /** + * Returns true if the specified file has a valid HFile Trailer. + * @param fs filesystem + * @param path Path to file to verify + * @return true if the file has a valid HFile Trailer, otherwise false + * @throw IOException if failed to read from the underlying stream + */ + public static boolean isHFileFormat(final FileSystem fs, final Path path) throws IOException { + return isHFileFormat(fs, fs.getFileStatus(path)); + } + + /** + * Returns true if the specified file has a valid HFile Trailer. + * @param fs filesystem + * @param fileStatus the file to verify + * @return true if the file has a valid HFile Trailer, otherwise false + * @throw IOException if failed to read from the underlying stream + */ + public static boolean isHFileFormat(final FileSystem fs, final FileStatus fileStatus) + throws IOException { + final Path path = fileStatus.getPath(); + final long size = fileStatus.getLen(); + FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); + try { + boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); + assert !isHBaseChecksum; // Initially we must read with FS checksum. + FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); + return true; + } catch (IllegalArgumentException e) { + return false; + } catch (IOException e) { + throw e; + } finally { + try { + fsdis.close(); + } catch (Throwable t) { + LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t); + } + } + } + + /** * Metadata for this file. Conjured by the writer. Read in by the reader. */ public static class FileInfo implements SortedMap { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 45f57b3..0bed818 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -89,6 +89,7 @@ import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -246,7 +247,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { { doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); } - + /** * Perform a bulk load of the given directory into the given * pre-existing table. This method is not threadsafe. @@ -282,15 +283,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { discoverLoadQueue(queue, hfofDir); // check whether there is invalid family name in HFiles to be bulkloaded Collection families = table.getTableDescriptor().getFamilies(); - ArrayList familyNames = new ArrayList(); + ArrayList familyNames = new ArrayList(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); } ArrayList unmatchedFamilies = new ArrayList(); - for (LoadQueueItem lqi : queue) { + Iterator queueIter = queue.iterator(); + while (queueIter.hasNext()) { + LoadQueueItem lqi = queueIter.next(); String familyNameInHFile = Bytes.toString(lqi.family); if (!familyNames.contains(familyNameInHFile)) { - unmatchedFamilies.add(familyNameInHFile); + if (HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath)) { + unmatchedFamilies.add(familyNameInHFile); + } else { + LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping"); + queueIter.remove(); + } } } if (unmatchedFamilies.size() > 0) { @@ -729,7 +737,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw e; } } - + private boolean isSecureBulkLoadEndpointAvailable() { String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); @@ -945,7 +953,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { HTable table = (HTable) connection.getTable(tableName);) { doBulkLoad(hfofDir, table); } - + return 0; }