diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index ced074e..0daec04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -123,6 +124,9 @@ public class HFileOutputFormat2 private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; + public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy"; + public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { @@ -190,7 +194,9 @@ public class HFileOutputFormat2 // If this is a new column family, verify that the directory exists if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); + Path cfPath = new Path(outputdir, Bytes.toString(family)); + fs.mkdirs(cfPath); + configureStoragePolicy(conf, fs, family, cfPath); } // If any of the HFiles for the column families has reached @@ -345,6 +351,29 @@ public class HFileOutputFormat2 }; } + /** + * Configure block storage policy for CF after the directory is created. + */ + public static void configureStoragePolicy(final Configuration conf, final FileSystem fs, + byte[] family, Path cfPath) { + if (null == conf || null == fs || null == family || null == cfPath) { + return; + } + + String policy = + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family), + conf.get(STORAGE_POLICY_PROPERTY)); + if (null != policy && !policy.trim().isEmpty()) { + try { + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim()); + } + } catch (Throwable e) { + LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e); + } + } + } + /* * Data structure to hold a Writer and amount of data written on it. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index a824962..db9883f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -84,6 +85,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -1159,5 +1164,70 @@ public class TestHFileOutputFormat2 { } } + @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test + public void testBlockStoragePolicy() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); + conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]), + "ONE_SSD"); + Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); + Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); + util.startMiniDFSCluster(3); + FileSystem fs = util.getDFSCluster().getFileSystem(); + try { + fs.mkdirs(cf1Dir); + fs.mkdirs(cf2Dir); + + // the original block storage policy would be NULL + String spA = getStoragePolicyName(fs, cf1Dir); + String spB = getStoragePolicyName(fs, cf2Dir); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNull(spA); + assertNull(spB); + + // alter table cf schema to change storage policies + HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir); + HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir); + spA = getStoragePolicyName(fs, cf1Dir); + spB = getStoragePolicyName(fs, cf2Dir); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ONE_SSD", spA); + assertNotNull(spB); + assertEquals("ALL_SSD", spB); + } finally { + fs.delete(cf1Dir, true); + fs.delete(cf2Dir, true); + util.shutdownMiniDFSCluster(); + } + } + + private String getStoragePolicyName(FileSystem fs, Path path) { + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); + if (null != status) { + byte storagePolicyId = status.getStoragePolicy(); + if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) { + BlockStoragePolicy[] policies = dfs.getStoragePolicies(); + for (BlockStoragePolicy policy : policies) { + if (policy.getId() == storagePolicyId) { + return policy.getName(); + } + } + } + } + } + } catch (Throwable e) { + LOG.warn("failed to get block storage policy of [" + path + "]", e); + } + + return null; + } + }