diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 86a84a4..0084878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -118,6 +118,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; + // We use a '.' prefix which is ignored when walking directory trees + // above. It is invalid family name. + final static String TMP_DIR = ".tmp"; + private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; @@ -201,6 +205,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } Path familyDir = familyStat.getPath(); byte[] familyName = familyDir.getName().getBytes(); + // Skip invalid family + try { + HColumnDescriptor.isLegalFamilyName(familyName); + } + catch (IllegalArgumentException e) { + LOG.warn("Skipping invalid " + familyStat.getPath()); + continue; + } TFamily family = visitor.bulkFamily(familyName); FileStatus[] hfileStatuses = fs.listStatus(familyDir); @@ -632,9 +644,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool { byte[] splitKey) throws IOException { final Path hfilePath = item.hfilePath; - // We use a '_' prefix which is ignored when walking directory trees - // above. - final String TMP_DIR = "_tmp"; Path tmpDir = item.hfilePath.getParent(); if (!tmpDir.getName().equals(TMP_DIR)) { tmpDir = new Path(tmpDir, TMP_DIR); @@ -661,6 +670,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { lqis.add(new LoadQueueItem(item.family, botOut)); lqis.add(new LoadQueueItem(item.family, topOut)); + // If the current item is already the result of previous splits, + // we don't need it anymore. Clean up to save space. + // It is not part of the original input files. + try { + tmpDir = item.hfilePath.getParent(); + if (tmpDir.getName().equals(TMP_DIR)) { + fs.delete(item.hfilePath, false); + } + } catch (IOException e) { + LOG.warn("Unable to delete temporary split file " + item.hfilePath); + } LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); return lqis; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 32e3058..0975fd2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -425,6 +427,42 @@ public class TestLoadIncrementalHFilesSplitRecovery { } /** + * This test creates a table with many small regions. The bulk load files + * would be splitted multiple times before all of them can be loaded successfully. + */ + @Test (timeout=120000) + public void testSplitTmpFileCleanUp() throws Exception { + final TableName table = TableName.valueOf("splitTmpFileCleanUp"); + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), + Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), + Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")}; + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTableWithSplitkeys(table, 10, SPLIT_KEYS); + + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); + + // create HFiles + Path bulk = buildBulkFiles(table, 2); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk, admin, t, locator); + } + // family path + Path tmpPath = new Path(bulk, family(0)); + // TMP_DIR under family path + tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); + FileSystem fs = bulk.getFileSystem(util.getConfiguration()); + // HFiles have been splitted, there is TMP_DIR + assertTrue(fs.exists(tmpPath)); + // TMP_DIR should have been cleaned-up + assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", + FSUtils.listStatus(fs, tmpPath)); + assertExpectedTable(connection, table, ROWCOUNT, 2); + } + } + + /** * This simulates an remote exception which should cause LIHF to exit with an * exception. */