diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index fb58360..b0c9d35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -43,10 +43,13 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -140,6 +143,54 @@ public class HFileSystem extends FilterFileSystem {
}
/**
+ * Set the source path (directory/file) to the specified storage policy.
+ * "LAZY_PERSIST", "ALL_SSD", "ONE_SSD", "HOT", "WARM",
+ * "COLD"
+ *
+ * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
+ * @param path The source path (directory/file).
+ * @param policyName The name of the storage policy.
+ */
+ public void setStoragePolicy(Path path, String policyName) {
+ try {
+ if (this.fs instanceof DistributedFileSystem) {
+ ((DistributedFileSystem) this.fs).setStoragePolicy(path, policyName);
+ }
+ } catch (Throwable e) {
+ LOG.warn("failed to set block storage policy of [" + path + "] to [" + policyName + "]", e);
+ }
+ }
+
+ /**
+ * Get the storage policy of the source path (directory/file).
+ * @param path The source path (directory/file).
+ * @return Storage policy name.
+ */
+ public String getStoragePolicy(Path path) {
+ try {
+ if (this.fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
+ HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
+ if (null != status) {
+ byte storagePolicyId = status.getStoragePolicy();
+ if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
+ BlockStoragePolicy[] policies = dfs.getStoragePolicies();
+ for (BlockStoragePolicy policy : policies) {
+ if (policy.getId() == storagePolicyId) {
+ return policy.getName();
+ }
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("failed to get block storage policy of [" + path + "]", e);
+ }
+
+ return null;
+ }
+
+ /**
* Are we verifying checksums in HBase?
* @return True, if hbase is configured to verify checksums,
* otherwise false.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index a62b6d7..8ca50ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -175,6 +175,36 @@ public class HRegionFileSystem {
}
/**
+ * Set the directory of CF to the specified storage policy.
+ * "LAZY_PERSIST", "ALL_SSD", "ONE_SSD", "HOT", "WARM",
+ * "COLD"
+ *
+ * See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
+ * @param familyName The name of column family.
+ * @param policyName The name of the storage policy.
+ */
+ public void setStoragePolicy(String familyName, String policyName) {
+ if (this.fs instanceof HFileSystem) {
+ Path storeDir = getStoreDir(familyName);
+ ((HFileSystem) this.fs).setStoragePolicy(storeDir, policyName);
+ }
+ }
+
+ /**
+ * Get the storage policy of the directory of CF.
+ * @param familyName The name of column family.
+ * @return Storage policy name.
+ */
+ public String getStoragePolicy(String familyName) {
+ if (this.fs instanceof HFileSystem) {
+ Path storeDir = getStoreDir(familyName);
+ return ((HFileSystem) this.fs).getStoragePolicy(storeDir);
+ }
+
+ return null;
+ }
+
+ /**
* Returns the store files available for the family.
* This methods performs the filtering based on the valid store files.
* @param familyName Column Family Name
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 57ca3f1..0a173d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -120,6 +120,7 @@ public class HStore implements Store {
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
+ public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
@@ -219,6 +220,17 @@ public class HStore implements Store {
.addBytesMap(family.getValues());
this.blocksize = family.getBlocksize();
+ // set block storage policy for store directory
+ String policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY);
+ if (null != policyName && !policyName.trim().isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("set block storage policy of [" + family.getNameAsString() + "] to ["
+ + policyName + "]");
+ }
+
+ this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
+ }
+
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
@@ -993,9 +1005,10 @@ public class HStore implements Store {
}
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
cryptoContext);
+ Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
this.getFileSystem())
- .withFilePath(fs.createTempName())
+ .withOutputDir(familyTempDir)
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 891a59d..d169488 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -629,6 +630,18 @@ public class StoreFile {
fs.mkdirs(dir);
}
+ // set block storage policy for temp path
+ String policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
+ if (null != policyName && !policyName.trim().isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]");
+ }
+
+ if (this.fs instanceof HFileSystem) {
+ ((HFileSystem)this.fs).setStoragePolicy(dir, policyName.trim());
+ }
+ }
+
if (filePath == null) {
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index fcc9fc3..3119511 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -174,9 +174,13 @@ public class TestCompaction {
assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
+ // only one empty dir exists in temp dir
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
+ assertEquals(1, ls.length);
+ Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
+ assertTrue(r.getFilesystem().exists(storeTempDir));
+ ls = r.getFilesystem().listStatus(storeTempDir);
assertEquals(0, ls.length);
-
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 23f5e48..2a87e14 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -818,7 +818,7 @@ public class TestHRegion {
assertEquals(3, region.getStore(family).getStorefilesCount());
// now find the compacted file, and manually add it to the recovered edits
- Path tmpDir = region.getRegionFileSystem().getTempDir();
+ Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
String errorMsg = "Expected to find 1 file in the region temp directory "
+ "from the compaction, could not find any";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index 5f792fa..6f83ae6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,14 +39,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Progressable;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -51,6 +60,105 @@ import org.junit.experimental.categories.Category;
public class TestHRegionFileSystem {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
+ private static final byte[][] FAMILIES = {
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+ private static final TableName TABLE_NAME = TableName.valueOf("TestTable");
+
+ @Test
+ public void testBlockStoragePolicy() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniCluster();
+ HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
+ try(Admin admin = TEST_UTIL.getConnection().getAdmin()) {
+ assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
+ FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
+ List regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+ assertEquals(1, regionDirs.size());
+ List familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
+ assertEquals(2, familyDirs.size());
+ HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo();
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
+
+ // the original block storage policy would be NULL
+ String spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
+ String spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNull(spA);
+ assertNull(spB);
+
+ // alter table cf schema to change storage policies
+ HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0]));
+ hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD");
+ admin.modifyColumnFamily(TABLE_NAME, hcdA);
+ while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .isRegionsInTransition()) {
+ Thread.sleep(200);
+ LOG.debug("Waiting on table to finish schema altering");
+ }
+ HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1]));
+ hcdB.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ALL_SSD");
+ admin.modifyColumnFamily(TABLE_NAME, hcdB);
+ while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .isRegionsInTransition()) {
+ Thread.sleep(200);
+ LOG.debug("Waiting on table to finish schema altering");
+ }
+ spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
+ spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNotNull(spA);
+ assertEquals("ONE_SSD", spA);
+ assertNotNull(spB);
+ assertEquals("ALL_SSD", spB);
+
+ // flush memstore snapshot into 3 files
+ for (long i = 0; i < 3; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i));
+ table.put(put);
+ table.flushCommits();
+ admin.flush(TABLE_NAME);
+ }
+ // there should be 3 files in store dir
+ Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0]));
+ FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath);
+ assertNotNull(storeFiles);
+ assertEquals(3, storeFiles.length);
+ // store temp dir still exists but empty
+ Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0]));
+ assertTrue(fs.exists(storeTempDir));
+ FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir);
+ assertNull(tempFiles);
+ // storage policy of cf temp dir and 3 store files should be ONE_SSD
+ assertEquals("ONE_SSD",
+ ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(storeTempDir));
+ for (FileStatus status : storeFiles) {
+ assertEquals("ONE_SSD",
+ ((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(status.getPath()));
+ }
+
+ // change storage policies by calling raw api directly
+ regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
+ regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
+ spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
+ spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNotNull(spA);
+ assertEquals("ALL_SSD", spA);
+ assertNotNull(spB);
+ assertEquals("ONE_SSD", spB);
+ } finally {
+ table.close();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
@Test
public void testOnDiskRegionCreation() throws IOException {
diff --git a/pom.xml b/pom.xml
index 5da039f..26bf475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1155,7 +1155,7 @@
3.0.3
${compileSource}
- 2.5.1
+ 2.6.0
3.0.0-SNAPSHOT
1.2
1.9