From 8c26e2fbce8e10fa2f07e96ff9ce0e4113ce90d1 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 30 Jul 2015 13:22:45 +0530 Subject: [PATCH] HBASE-14154 DFS Replication should be configurable at column family level --- .../org/apache/hadoop/hbase/HColumnDescriptor.java | 22 ++++++++ .../apache/hadoop/hbase/TestHColumnDescriptor.java | 2 + .../apache/hadoop/hbase/TestHTableDescriptor.java | 4 ++ .../hadoop/hbase/io/hfile/AbstractHFileWriter.java | 2 +- .../org/apache/hadoop/hbase/master/HMaster.java | 8 +++ .../hbase/regionserver/HRegionFileSystem.java | 2 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 21 ++++---- .../org/apache/hadoop/hbase/client/TestAdmin1.java | 58 ++++++++++++++++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 6 ++- .../org/apache/hadoop/hbase/util/TestFSUtils.java | 6 +-- hbase-shell/src/main/ruby/hbase/admin.rb | 3 ++ hbase-shell/src/main/ruby/shell/commands/create.rb | 1 + 12 files changed, 117 insertions(+), 18 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 47bafc4..53e4f37 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -130,6 +130,8 @@ public class HColumnDescriptor implements WritableComparable public static final String DEFAULT_COMPRESSION = Compression.Algorithm.NONE.getName(); + public static final String DFS_REPLICATION = "DFS_REPLICATION"; + /** * Default value of the flag that enables data block encoding on disk, as * opposed to encoding in cache only. We encode blocks everywhere by default, @@ -1526,4 +1528,24 @@ public class HColumnDescriptor implements WritableComparable setValue(Bytes.toBytes(ENCRYPTION_KEY), keyBytes); return this; } + + /** + * @return replication factor set for this CF or 0 if not set. 0 indicates that user has + * explicitly not set any replication factor to this CF, hence use the default replication + * factor set in the file system. + */ + public short getDFSReplication() { + String rf = getValue(DFS_REPLICATION); + return rf == null ? 0 : Short.valueOf(rf); + } + + /** + * Set the replication factor to hfile(s) belonging to this family + * @param replication number of times the hfile(s) belonging to this CF to be replicated + * @return this (for chained invocation) + */ + public HColumnDescriptor setDFSReplication(short replication) { + setValue(DFS_REPLICATION, Short.toString(replication)); + return this; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java index 8e23f97..2314ec3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java @@ -53,6 +53,7 @@ public class TestHColumnDescriptor { hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); hcd.setBloomFilterType(BloomType.ROW); hcd.setCompressionType(Algorithm.SNAPPY); + hcd.setDFSReplication((short) v); byte [] bytes = hcd.toByteArray(); @@ -69,6 +70,7 @@ public class TestHColumnDescriptor { assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY)); assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF)); assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW)); + assertEquals(v, deserializedHcd.getDFSReplication()); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index 9bf06fb..0e580d8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -231,12 +231,16 @@ public class TestHTableDescriptor { byte[] familyName = Bytes.toBytes("cf"); HColumnDescriptor hcd = new HColumnDescriptor(familyName); hcd.setBlocksize(1000); + hcd.setDFSReplication((short) 3); htd.addFamily(hcd); assertEquals(1000, htd.getFamily(familyName).getBlocksize()); + assertEquals(3, htd.getFamily(familyName).getDFSReplication()); hcd = new HColumnDescriptor(familyName); hcd.setBlocksize(2000); + hcd.setDFSReplication((short) 1); htd.modifyFamily(hcd); assertEquals(2000, htd.getFamily(familyName).getBlocksize()); + assertEquals(1, htd.getFamily(familyName).getDFSReplication()); } @Test(expected=IllegalArgumentException.class) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java index 52491e6..93e1837 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java @@ -261,6 +261,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer { FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); + return FSUtils.create(conf, fs, path, perms, favoredNodes); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb2470c..d805c8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1580,6 +1580,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server { warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); } + // check data replication factor + if (hcd.getDFSReplication() < 0) { + String message = + "HFile Replication for column family " + hcd.getNameAsString() + + " must be greater than zero."; + warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); + } + // TODO: should we check coprocessors and encryption ? } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index ac3e512..8a93c19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -770,7 +770,7 @@ public class HRegionFileSystem { // First check to get the permissions FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // Write the RegionInfo file content - FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); try { out.write(content); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 5540569..8fc6af7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -362,11 +362,12 @@ public abstract class FSUtils { *
  • overwrite the file if it exists
  • *
  • apply the umask in the configuration (if it is enabled)
  • *
  • use the fs configured buffer size (or 4096 if not set)
  • - *
  • use the default replication
  • + *
  • use the configured column family replication or default replication if 0
  • *
  • use the default block size
  • *
  • not track progress
  • * * + * @param conf configurations * @param fs {@link FileSystem} on which to write the file * @param path {@link Path} to the file to write * @param perm permissions @@ -374,23 +375,21 @@ public abstract class FSUtils { * @return output stream to the created file * @throws IOException if the file cannot be created */ - public static FSDataOutputStream create(FileSystem fs, Path path, + public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { if (fs instanceof HFileSystem) { FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); if (backingFs instanceof DistributedFileSystem) { // Try to use the favoredNodes version via reflection to allow backwards- // compatibility. + short replication = Short.parseShort(conf.get(HColumnDescriptor.DFS_REPLICATION, "0")); try { - return (FSDataOutputStream) (DistributedFileSystem.class - .getDeclaredMethod("create", Path.class, FsPermission.class, - boolean.class, int.class, short.class, long.class, - Progressable.class, InetSocketAddress[].class) - .invoke(backingFs, path, perm, true, - getDefaultBufferSize(backingFs), - getDefaultReplication(backingFs, path), - getDefaultBlockSize(backingFs, path), - null, favoredNodes)); + return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create", + Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class, + Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true, + getDefaultBufferSize(backingFs), + replication > 0 ? replication : getDefaultReplication(backingFs, path), + getDefaultBlockSize(backingFs, path), null, favoredNodes)); } catch (InvocationTargetException ite) { // Function was properly called, but threw it's own exception. throw new IOException(ite.getCause()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index a2d8690..a365220 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; @@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; @@ -1349,4 +1352,59 @@ public class TestAdmin1 { this.admin.deleteTable(tableName); } + + /* + * Test DFS replication for column families, where one CF has default replication(3) and the other + * is set to 1. + */ + @Test(timeout = 300000) + public void testHFileReplication() throws Exception { + TableName name = TableName.valueOf("testHFileReplication"); + String fn1 = "rep1"; + HColumnDescriptor hcd1 = new HColumnDescriptor(fn1); + hcd1.setDFSReplication((short) 1); + String fn = "defaultRep"; + HColumnDescriptor hcd = new HColumnDescriptor(fn); + HTableDescriptor htd = new HTableDescriptor(name); + htd.addFamily(hcd); + htd.addFamily(hcd1); + Table table = TEST_UTIL.createTable(htd, null); + TEST_UTIL.waitTableAvailable(name); + Put p = new Put(Bytes.toBytes("defaultRep_rk")); + byte[] q1 = Bytes.toBytes("q1"); + byte[] v1 = Bytes.toBytes("v1"); + p.addColumn(Bytes.toBytes(fn), q1, v1); + List puts = new ArrayList(2); + puts.add(p); + p = new Put(Bytes.toBytes("rep1_rk")); + p.addColumn(Bytes.toBytes(fn1), q1, v1); + puts.add(p); + try { + table.put(puts); + admin.flush(name); + + List regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name); + for (HRegion r : regions) { + Store store = r.getStore(Bytes.toBytes(fn)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn)); + assertTrue("Column family " + fn + " should have 3 copies", + FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf + .getFileInfo().getFileStatus().getReplication())); + } + + store = r.getStore(Bytes.toBytes(fn1)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn1)); + assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo() + .getFileStatus().getReplication()); + } + } + } finally { + if (admin.isTableEnabled(name)) { + this.admin.disableTable(name); + this.admin.deleteTable(name); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 0b793b4..1ac6b2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; -import org.apache.log4j.Level; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -92,7 +91,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.Region; @@ -103,6 +101,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.junit.After; @@ -5581,6 +5580,9 @@ public class TestFromClientSide { hcd.setScope(0); checkTableIsLegal(htd); + hcd.setDFSReplication((short) -1); + checkTableIsIllegal(htd); + // check the conf settings to disable sanity checks htd.setMemStoreFlushSize(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index c501477..2699292 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -259,7 +259,7 @@ public class TestFSUtils { // then that the correct file is created Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); try { - FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); out.close(); FileStatus stat = fs.getFileStatus(p); assertEquals(new FsPermission("700"), stat.getPermission()); @@ -281,13 +281,13 @@ public class TestFSUtils { Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); try { - FSDataOutputStream out = FSUtils.create(fs, p, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); out.close(); assertTrue("The created file should be present", FSUtils.isExists(fs, p)); // delete the file with recursion as false. Only the file will be deleted. FSUtils.delete(fs, p, false); // Create another file - FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null); + FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); out1.close(); // delete the file with recursion as false. Still the file only will be deleted FSUtils.delete(fs, p1, true); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 84cf619..451f924 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -788,6 +788,9 @@ module Hbase set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] + family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION) arg.each_key do |unknown_key| puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key]) diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb index ab3a3d1..a5a125e 100644 --- a/hbase-shell/src/main/ruby/shell/commands/create.rb +++ b/hbase-shell/src/main/ruby/shell/commands/create.rb @@ -50,6 +50,7 @@ Examples: hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname) hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}} + hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1} You can also keep around a reference to the created table: -- 1.9.2.msysgit.0