Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (revision 1471030) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (working copy) @@ -187,7 +187,7 @@ /** * Perform a bulk load of the given directory into the given * pre-existing table. This method is not threadsafe. - * + * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat * @param table the table to load into @@ -220,6 +220,26 @@ Deque queue = new LinkedList(); try { discoverLoadQueue(queue, hfofDir); + // check whether there is invalid family name in HFiles to be bulkloaded + Collection families = table.getTableDescriptor().getFamilies(); + ArrayList familyNames = new ArrayList(); + for (HColumnDescriptor family : families) { + familyNames.add(family.getNameAsString()); + } + ArrayList unmatchedFamilies = new ArrayList(); + for (LoadQueueItem lqi : queue) { + String familyNameInHFile = Bytes.toString(lqi.family); + if (!familyNames.contains(familyNameInHFile)) { + unmatchedFamilies.add(familyNameInHFile); + } + } + if (unmatchedFamilies.size() > 0) { + String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + + Bytes.toString(table.getTableName()) + " are: " + familyNames; + LOG.error(msg); + throw new IOException(msg); + } int count = 0; if (queue.isEmpty()) { @@ -358,7 +378,7 @@ Set>> splittingFutures = new HashSet>>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); - + final Callable> call = new Callable>() { public List call() throws Exception { List splits = groupOrSplit(regionGroups, item, table, startEndKeys); @@ -492,12 +512,12 @@ * Attempts to do an atomic load of many hfiles into a region. If it fails, * it returns a list of hfiles that need to be retried. If it is successful * it will return an empty list. - * + * * NOTE: To maintain row atomicity guarantees, region server callable should * succeed atomically and fails atomically. - * + * * Protected for testing. - * + * * @return empty list if success, list of items to retry on recoverable * failure */ @@ -650,7 +670,7 @@ private boolean doesTableExist(String tableName) throws Exception { return hbAdmin.tableExists(tableName); } - + /* * Infers region boundaries for a new table. * Parameter: @@ -658,29 +678,29 @@ * If a key is a start key of a file, then it maps to +1 * If a key is an end key of a file, then it maps to -1 * Algo: - * 1) Poll on the keys in order: - * a) Keep adding the mapped values to these keys (runningSum) + * 1) Poll on the keys in order: + * a) Keep adding the mapped values to these keys (runningSum) * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list. - * 2) Return the boundary list. + * 2) Return the boundary list. */ public static byte[][] inferBoundaries(TreeMap bdryMap) { ArrayList keysArray = new ArrayList(); int runningValue = 0; byte[] currStartKey = null; boolean firstBoundary = true; - + for (Map.Entry item: bdryMap.entrySet()) { if (runningValue == 0) currStartKey = item.getKey(); runningValue += item.getValue(); if (runningValue == 0) { if (!firstBoundary) keysArray.add(currStartKey); firstBoundary = false; - } + } } - + return keysArray.toArray(new byte[0][0]); } - + /* * If the table is created for the first time, then "completebulkload" reads the files twice. * More modifications necessary if we want to avoid doing it. @@ -706,7 +726,7 @@ // Build a set of keys byte[][] keys; TreeMap map = new TreeMap(Bytes.BYTES_COMPARATOR); - + for (FileStatus stat : familyDirStatuses) { if (!stat.isDir()) { LOG.warn("Skipping non-directory " + stat.getPath()); @@ -716,10 +736,10 @@ // Skip _logs, etc if (familyDir.getName().startsWith("_")) continue; byte[] family = familyDir.getName().getBytes(); - + hcd = new HColumnDescriptor(family); htd.addFamily(hcd); - + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); for (Path hfile : hfiles) { if (hfile.getName().startsWith("_")) continue; @@ -739,7 +759,7 @@ LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); - + // To eventually infer start key-end key boundaries Integer value = map.containsKey(first)? map.get(first):0; map.put(first, value+1); @@ -751,7 +771,7 @@ } } } - + keys = LoadIncrementalHFiles.inferBoundaries(map); this.hbAdmin.createTable(htd,keys); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (revision 1471030) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (working copy) @@ -53,6 +53,7 @@ public class TestLoadIncrementalHFiles { private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); private static final byte[] FAMILY = Bytes.toBytes("myfam"); + private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "invalid family name found"; private static final byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("ddd"), @@ -188,6 +189,11 @@ HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); HTableDescriptor htd = new HTableDescriptor(TABLE); + // set real family name to upper case in purpose to simulate the case that + // family name in HFiles is invalid + HColumnDescriptor family = + new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase())); + htd.addFamily(family); admin.createTable(htd, SPLIT_KEYS); HTable table = new HTable(util.getConfiguration(), TABLE); @@ -198,6 +204,11 @@ assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException); + // further check whether the exception message is correct + String errMsg = e.getMessage(); + assertTrue("Incorrect exception message, expected message: [" + + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]", + errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); } table.close(); admin.close();