From 570fb1cd0e9f22bd0ef2b8e6abd879f163716e8c Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 11 Mar 2013 16:04:47 -0700 Subject: [PATCH] HBASE-4285 cleanup ImportTsv partitions file litter ImportTsv leaves a 'partitions_blah' file sitting in the user home directory after it completes. This patch cleans up creation of the partitions file, including moving it to /tmp and doing away with the symlink business. It also sets the file to be deleted on FS close/JVM exit, but local testing indicates this flag is not respected by HDFS. This will affect any job that makes use of HFileOutputFormat#configureIncrementalLoad. --- .../hbase/mapreduce/IntegrationTestImportTsv.java | 22 ++++++++- .../hadoop/hbase/mapreduce/HFileOutputFormat.java | 53 +++++++++++----------- 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java index 5b14deb..49540e6 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.Set; @@ -15,6 +16,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; @@ -25,6 +27,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -137,6 +140,18 @@ public class IntegrationTestImportTsv implements Configurable, Tool { } } + /** + * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. + */ + protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { + if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) + return; + + FileSystem fs = FileSystem.get(conf); + Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); + assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile)); + } + @Test public void testGenerateAndLoad() throws Exception { String table = NAME + "-" + UUID.randomUUID(); @@ -155,8 +170,13 @@ public class IntegrationTestImportTsv implements Configurable, Tool { // run the job, complete the load. util.createTable(table, cf); - TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); + Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); doLoadIncrementalHFiles(hfiles, table); + + // validate post-conditions + validateDeletedPartitionsFile(t.getConf()); + + // clean up after ourselves. util.deleteTable(table); util.cleanupDataTestDirOnTestFS(table); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 0eb2005..f86ea26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.util.ArrayList; @@ -37,7 +35,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -54,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; @@ -267,13 +264,12 @@ public class HFileOutputFormat extends FileOutputFormat startKeys) throws IOException { + LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { throw new IllegalArgumentException("No regions passed"); } @@ -325,7 +321,6 @@ public class HFileOutputFormat extends FileOutputFormat startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); - Path partitionsPath = new Path(job.getWorkingDirectory(), - "partitions_" + UUID.randomUUID()); - LOG.info("Writing partition information to " + partitionsPath); - - FileSystem fs = partitionsPath.getFileSystem(conf); - writePartitions(conf, partitionsPath, startKeys); - partitionsPath.makeQualified(fs); - - URI cacheUri; - try { - cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH); - } catch (URISyntaxException e) { - throw new IOException(e); - } - DistributedCache.addCacheFile(cacheUri, conf); - DistributedCache.createSymlink(conf); - + configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); @@ -415,7 +395,26 @@ public class HFileOutputFormat extends FileOutputFormatjob with a TotalOrderPartitioner, partitioning against + * splitPoints. Cleans up the partitions file after job exists. + */ + static void configurePartitioner(Job job, List splitPoints) + throws IOException { + + // create the partitions file + FileSystem fs = FileSystem.get(job.getConfiguration()); + Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); + fs.makeQualified(partitionsPath); + fs.deleteOnExit(partitionsPath); + writePartitions(job.getConfiguration(), partitionsPath, splitPoints); + + // configure job to use it + job.setPartitionerClass(TotalOrderPartitioner.class); + TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); + } + /** * Serialize column family to compression algorithm map to configuration. * Invoked while configuring the MR job for incremental load. -- 1.8.1