diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index fb58360..b0c9d35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -43,10 +43,13 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -140,6 +143,54 @@ public class HFileSystem extends FilterFileSystem { } /** + * Set the source path (directory/file) to the specified storage policy.
+ * "LAZY_PERSIST", "ALL_SSD", "ONE_SSD", "HOT", "WARM", + * "COLD"
+ *
+ * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details. + * @param path The source path (directory/file). + * @param policyName The name of the storage policy. + */ + public void setStoragePolicy(Path path, String policyName) { + try { + if (this.fs instanceof DistributedFileSystem) { + ((DistributedFileSystem) this.fs).setStoragePolicy(path, policyName); + } + } catch (Throwable e) { + LOG.warn("failed to set block storage policy of [" + path + "] to [" + policyName + "]", e); + } + } + + /** + * Get the storage policy of the source path (directory/file). + * @param path The source path (directory/file). + * @return Storage policy name. + */ + public String getStoragePolicy(Path path) { + try { + if (this.fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) this.fs; + HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); + if (null != status) { + byte storagePolicyId = status.getStoragePolicy(); + if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) { + BlockStoragePolicy[] policies = dfs.getStoragePolicies(); + for (BlockStoragePolicy policy : policies) { + if (policy.getId() == storagePolicyId) { + return policy.getName(); + } + } + } + } + } + } catch (Throwable e) { + LOG.warn("failed to get block storage policy of [" + path + "]", e); + } + + return null; + } + + /** * Are we verifying checksums in HBase? * @return True, if hbase is configured to verify checksums, * otherwise false. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index a62b6d7..8ca50ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -175,6 +175,36 @@ public class HRegionFileSystem { } /** + * Set the directory of CF to the specified storage policy.
+ * "LAZY_PERSIST", "ALL_SSD", "ONE_SSD", "HOT", "WARM", + * "COLD"
+ *
+ * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details. + * @param familyName The name of column family. + * @param policyName The name of the storage policy. + */ + public void setStoragePolicy(String familyName, String policyName) { + if (this.fs instanceof HFileSystem) { + Path storeDir = getStoreDir(familyName); + ((HFileSystem) this.fs).setStoragePolicy(storeDir, policyName); + } + } + + /** + * Get the storage policy of the directory of CF. + * @param familyName The name of column family. + * @return Storage policy name. + */ + public String getStoragePolicy(String familyName) { + if (this.fs instanceof HFileSystem) { + Path storeDir = getStoreDir(familyName); + return ((HFileSystem) this.fs).getStoragePolicy(storeDir); + } + + return null; + } + + /** * Returns the store files available for the family. * This methods performs the filtering based on the valid store files. * @param familyName Column Family Name diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 57ca3f1..0a173d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -120,6 +120,7 @@ public class HStore implements Store { public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY = "hbase.server.compactchecker.interval.multiplier"; public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles"; + public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy"; public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000; public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7; @@ -219,6 +220,17 @@ public class HStore implements Store { .addBytesMap(family.getValues()); this.blocksize = family.getBlocksize(); + // set block storage policy for store directory + String policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY); + if (null != policyName && !policyName.trim().isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("set block storage policy of [" + family.getNameAsString() + "] to [" + + policyName + "]"); + } + + this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); + } + this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding()); @@ -993,9 +1005,10 @@ public class HStore implements Store { } HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, cryptoContext); + Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString()); StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem()) - .withFilePath(fs.createTempName()) + .withOutputDir(familyTempDir) .withComparator(comparator) .withBloomType(family.getBloomFilterType()) .withMaxKeyCount(maxKeyCount) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 891a59d..d169488 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -629,6 +630,18 @@ public class StoreFile { fs.mkdirs(dir); } + // set block storage policy for temp path + String policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); + if (null != policyName && !policyName.trim().isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]"); + } + + if (this.fs instanceof HFileSystem) { + ((HFileSystem)this.fs).setStoragePolicy(dir, policyName.trim()); + } + } + if (filePath == null) { filePath = getUniqueFile(fs, dir); if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index fcc9fc3..3119511 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -174,9 +174,13 @@ public class TestCompaction { assertEquals(compactionThreshold, s.getStorefilesCount()); assertTrue(s.getStorefilesSize() > 15*1000); // and no new store files persisted past compactStores() + // only one empty dir exists in temp dir FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + assertEquals(1, ls.length); + Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); + assertTrue(r.getFilesystem().exists(storeTempDir)); + ls = r.getFilesystem().listStatus(storeTempDir); assertEquals(0, ls.length); - } finally { // don't mess up future tests r.writestate.writesEnabled = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 23f5e48..2a87e14 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -818,7 +818,7 @@ public class TestHRegion { assertEquals(3, region.getStore(family).getStorefilesCount()); // now find the compacted file, and manually add it to the recovered edits - Path tmpDir = region.getRegionFileSystem().getTempDir(); + Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family)); FileStatus[] files = FSUtils.listStatus(fs, tmpDir); String errorMsg = "Expected to find 1 file in the region temp directory " + "from the compaction, could not find any"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index 5f792fa..6f83ae6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; import java.util.Collection; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,14 +39,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Progressable; - import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +60,105 @@ import org.junit.experimental.categories.Category; public class TestHRegionFileSystem { private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class); + private static final byte[][] FAMILIES = { + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; + private static final TableName TABLE_NAME = TableName.valueOf("TestTable"); + + @Test + public void testBlockStoragePolicy() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniCluster(); + HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES); + try(Admin admin = TEST_UTIL.getConnection().getAdmin()) { + assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table)); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName()); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + assertEquals(1, regionDirs.size()); + List familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0)); + assertEquals(2, familyDirs.size()); + HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo(); + HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri); + + // the original block storage policy would be NULL + String spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); + String spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNull(spA); + assertNull(spB); + + // alter table cf schema to change storage policies + HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0])); + hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD"); + admin.modifyColumnFamily(TABLE_NAME, hcdA); + while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .isRegionsInTransition()) { + Thread.sleep(200); + LOG.debug("Waiting on table to finish schema altering"); + } + HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1])); + hcdB.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ALL_SSD"); + admin.modifyColumnFamily(TABLE_NAME, hcdB); + while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .isRegionsInTransition()) { + Thread.sleep(200); + LOG.debug("Waiting on table to finish schema altering"); + } + spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ONE_SSD", spA); + assertNotNull(spB); + assertEquals("ALL_SSD", spB); + + // flush memstore snapshot into 3 files + for (long i = 0; i < 3; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i)); + table.put(put); + table.flushCommits(); + admin.flush(TABLE_NAME); + } + // there should be 3 files in store dir + Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0])); + FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath); + assertNotNull(storeFiles); + assertEquals(3, storeFiles.length); + // store temp dir still exists but empty + Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0])); + assertTrue(fs.exists(storeTempDir)); + FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir); + assertNull(tempFiles); + // storage policy of cf temp dir and 3 store files should be ONE_SSD + assertEquals("ONE_SSD", + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(storeTempDir)); + for (FileStatus status : storeFiles) { + assertEquals("ONE_SSD", + ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(status.getPath())); + } + + // change storage policies by calling raw api directly + regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD"); + regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD"); + spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0])); + spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1])); + LOG.debug("Storage policy of cf 0: [" + spA + "]."); + LOG.debug("Storage policy of cf 1: [" + spB + "]."); + assertNotNull(spA); + assertEquals("ALL_SSD", spA); + assertNotNull(spB); + assertEquals("ONE_SSD", spB); + } finally { + table.close(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + } @Test public void testOnDiskRegionCreation() throws IOException { diff --git a/pom.xml b/pom.xml index 5da039f..26bf475 100644 --- a/pom.xml +++ b/pom.xml @@ -1155,7 +1155,7 @@ 3.0.3 ${compileSource} - 2.5.1 + 2.6.0 3.0.0-SNAPSHOT 1.2 1.9