From 02d82c924ea8e86adc635120736788a65548736b Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 30 Jul 2015 13:28:14 +0530 Subject: [PATCH] HBASE-14154 DFS Replication should be configurable at column family level --- .../org/apache/hadoop/hbase/HColumnDescriptor.java | 22 ++++++++ .../hadoop/hbase/io/hfile/AbstractHFileWriter.java | 2 +- .../org/apache/hadoop/hbase/master/HMaster.java | 8 +++ .../hbase/regionserver/HRegionFileSystem.java | 2 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 21 ++++---- .../apache/hadoop/hbase/TestHColumnDescriptor.java | 2 + .../org/apache/hadoop/hbase/client/TestAdmin1.java | 59 ++++++++++++++++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 5 +- .../org/apache/hadoop/hbase/util/TestFSUtils.java | 6 +-- hbase-shell/src/main/ruby/hbase/admin.rb | 3 ++ hbase-shell/src/main/ruby/shell/commands/create.rb | 1 + 11 files changed, 114 insertions(+), 17 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index e4fe896..c094dbc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -118,6 +118,8 @@ public class HColumnDescriptor implements WritableComparable public static final String DEFAULT_COMPRESSION = Compression.Algorithm.NONE.getName(); + public static final String DFS_REPLICATION = "DFS_REPLICATION"; + /** * Default value of the flag that enables data block encoding on disk, as * opposed to encoding in cache only. We encode blocks everywhere by default, @@ -1392,4 +1394,24 @@ public class HColumnDescriptor implements WritableComparable setValue(Bytes.toBytes(ENCRYPTION_KEY), keyBytes); return this; } + + /** + * @return replication factor set for this CF or 0 if not set. 0 indicates that user has + * explicitly not set any replication factor to this CF, hence use the default replication + * factor set in the file system. + */ + public short getDFSReplication() { + String rf = getValue(DFS_REPLICATION); + return rf == null ? 0 : Short.valueOf(rf); + } + + /** + * Set the replication factor to hfile(s) belonging to this family + * @param replication number of times the hfile(s) belonging to this CF to be replicated + * @return this (for chained invocation) + */ + public HColumnDescriptor setDFSReplication(short replication) { + setValue(DFS_REPLICATION, Short.toString(replication)); + return this; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java index b1f7038..8ccd9e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java @@ -263,6 +263,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer { FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); + return FSUtils.create(conf, fs, path, perms, favoredNodes); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6d21573..9fca49b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1967,6 +1967,14 @@ MasterServices, Server { warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); } + // check data replication factor + if (hcd.getDFSReplication() < 0) { + String message = + "HFile Replication for column family " + hcd.getNameAsString() + + " must be greater than zero."; + warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); + } + // TODO: should we check coprocessors and encryption ? } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index f743a01..d2d34d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -755,7 +755,7 @@ public class HRegionFileSystem { // First check to get the permissions FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // Write the RegionInfo file content - FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); try { out.write(content); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 57eeb7c..0795460 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -286,11 +286,12 @@ public abstract class FSUtils { *
  • overwrite the file if it exists
  • *
  • apply the umask in the configuration (if it is enabled)
  • *
  • use the fs configured buffer size (or 4096 if not set)
  • - *
  • use the default replication
  • + *
  • use the configured column family replication or default replication if 0
  • *
  • use the default block size
  • *
  • not track progress
  • * * + * @param conf configurations * @param fs {@link FileSystem} on which to write the file * @param path {@link Path} to the file to write * @param perm permissions @@ -298,23 +299,21 @@ public abstract class FSUtils { * @return output stream to the created file * @throws IOException if the file cannot be created */ - public static FSDataOutputStream create(FileSystem fs, Path path, + public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { if (fs instanceof HFileSystem) { FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); if (backingFs instanceof DistributedFileSystem) { // Try to use the favoredNodes version via reflection to allow backwards- // compatibility. + short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION, "0")); try { - return (FSDataOutputStream) (DistributedFileSystem.class - .getDeclaredMethod("create", Path.class, FsPermission.class, - boolean.class, int.class, short.class, long.class, - Progressable.class, InetSocketAddress[].class) - .invoke(backingFs, path, perm, true, - getDefaultBufferSize(backingFs), - getDefaultReplication(backingFs, path), - getDefaultBlockSize(backingFs, path), - null, favoredNodes)); + return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create", + Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class, + Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true, + getDefaultBufferSize(backingFs), + replication > 0 ? replication : getDefaultReplication(backingFs, path), + getDefaultBlockSize(backingFs, path), null, favoredNodes)); } catch (InvocationTargetException ite) { // Function was properly called, but threw it's own exception. throw new IOException(ite.getCause()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java index af9c519..9403bbc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java @@ -53,6 +53,7 @@ public class TestHColumnDescriptor { hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); hcd.setBloomFilterType(BloomType.ROW); hcd.setCompressionType(Algorithm.SNAPPY); + hcd.setDFSReplication((short) v); byte [] bytes = hcd.toByteArray(); @@ -69,6 +70,7 @@ public class TestHColumnDescriptor { assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY)); assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF)); assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW)); + assertEquals(v, deserializedHcd.getDFSReplication()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index efdc777..5206c9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -51,7 +51,11 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -1194,4 +1198,59 @@ public class TestAdmin1 { this.admin.deleteTable(tableName); } + + /* + * Test DFS replication for column families, where one CF has default replication(3) and the other + * is set to 1. + */ + @Test(timeout = 300000) + public void testHFileReplication() throws Exception { + TableName name = TableName.valueOf("testHFileReplication"); + String fn1 = "rep1"; + HColumnDescriptor hcd1 = new HColumnDescriptor(fn1); + hcd1.setDFSReplication((short) 1); + String fn = "defaultRep"; + HColumnDescriptor hcd = new HColumnDescriptor(fn); + HTableDescriptor htd = new HTableDescriptor(name); + htd.addFamily(hcd); + htd.addFamily(hcd1); + HTable table = TEST_UTIL.createTable(htd, null); + TEST_UTIL.waitTableAvailable(name.toBytes()); + Put p = new Put(Bytes.toBytes("defaultRep_rk")); + byte[] q1 = Bytes.toBytes("q1"); + byte[] v1 = Bytes.toBytes("v1"); + p.add(Bytes.toBytes(fn), q1, v1); + List puts = new ArrayList(2); + puts.add(p); + p = new Put(Bytes.toBytes("rep1_rk")); + p.add(Bytes.toBytes(fn1), q1, v1); + puts.add(p); + try { + table.put(puts); + admin.flush(name.getNameAsString()); + + List regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name); + for (HRegion r : regions) { + Store store = r.getStore(Bytes.toBytes(fn)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn)); + assertTrue("Column family " + fn + " should have 3 copies", + FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf + .getFileInfo().getFileStatus().getReplication())); + } + + store = r.getStore(Bytes.toBytes(fn1)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn1)); + assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo() + .getFileStatus().getReplication()); + } + } + } finally { + if (admin.isTableEnabled(name)) { + this.admin.disableTable(name); + this.admin.deleteTable(name); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 7eec037..dbc2404 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import org.apache.log4j.Level; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -102,6 +101,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.junit.After; @@ -5521,6 +5521,9 @@ public class TestFromClientSide { hcd.setScope(0); checkTableIsLegal(htd); + hcd.setDFSReplication((short) -1); + checkTableIsIllegal(htd); + // check the conf settings to disable sanity checks htd.setMemStoreFlushSize(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 8a6be4b..2b5b536 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -259,7 +259,7 @@ public class TestFSUtils { // then that the correct file is created Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); try { - FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); out.close(); FileStatus stat = fs.getFileStatus(p); assertEquals(new FsPermission("700"), stat.getPermission()); @@ -281,13 +281,13 @@ public class TestFSUtils { Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); try { - FSDataOutputStream out = FSUtils.create(fs, p, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); out.close(); assertTrue("The created file should be present", FSUtils.isExists(fs, p)); // delete the file with recursion as false. Only the file will be deleted. FSUtils.delete(fs, p, false); // Create another file - FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null); + FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); out1.close(); // delete the file with recursion as false. Still the file only will be deleted FSUtils.delete(fs, p1, true); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 151a566..00708e9 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -766,6 +766,9 @@ module Hbase set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] + family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION) arg.each_key do |unknown_key| puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key]) diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb index cd0fdea..9739bea 100644 --- a/hbase-shell/src/main/ruby/shell/commands/create.rb +++ b/hbase-shell/src/main/ruby/shell/commands/create.rb @@ -50,6 +50,7 @@ Examples: hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname) hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}} + hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1} You can also keep around a reference to the created table: -- 1.9.2.msysgit.0