From 4d191eb28bad84faef0b607607a7d67a17d66309 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Fri, 24 Jul 2015 16:56:55 +0530 Subject: [PATCH] HBASE-14154 DFS Replication should be configurable at column family level --- .../org/apache/hadoop/hbase/HColumnDescriptor.java | 13 +++++ .../apache/hadoop/hbase/TestHColumnDescriptor.java | 3 +- .../apache/hadoop/hbase/TestHTableDescriptor.java | 4 ++ .../hadoop/hbase/io/hfile/HFileWriterImpl.java | 8 +-- .../org/apache/hadoop/hbase/master/HMaster.java | 8 +++ .../hbase/regionserver/HRegionFileSystem.java | 2 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 23 ++++----- .../org/apache/hadoop/hbase/client/TestAdmin1.java | 59 +++++++++++++++++++++- .../hadoop/hbase/client/TestFromClientSide.java | 5 ++ .../org/apache/hadoop/hbase/util/TestFSUtils.java | 8 +-- hbase-shell/src/main/ruby/hbase/admin.rb | 3 ++ hbase-shell/src/main/ruby/shell/commands/create.rb | 1 + 12 files changed, 114 insertions(+), 23 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index ece5fb8..ea33ce8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -129,6 +129,8 @@ public class HColumnDescriptor implements Comparable { public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD); public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k + public static final String DFS_REPLICATION = "DFS_REPLICATION"; + /** * Default compression type. */ @@ -1226,4 +1228,15 @@ public class HColumnDescriptor implements Comparable { setValue(IS_MOB_BYTES, Bytes.toBytes(isMobEnabled)); return this; } + + /** Return the replication factor for the family, or null if not set */ + public String getDFSReplication() { + return getValue(DFS_REPLICATION); + } + + /** Set the replication factor to hfile(s) belonging to this family */ + public HColumnDescriptor setDFSReplication(short replication) { + setValue(DFS_REPLICATION, Short.toString(replication)); + return this; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java index 1180954..4bc4d2d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java @@ -62,7 +62,7 @@ public class TestHColumnDescriptor { hcd.setCompressionType(Algorithm.SNAPPY); hcd.setMobEnabled(true); hcd.setMobThreshold(1000L); - + hcd.setDFSReplication((short) v); byte [] bytes = hcd.toByteArray(); HColumnDescriptor deserializedHcd = HColumnDescriptor.parseFrom(bytes); @@ -80,6 +80,7 @@ public class TestHColumnDescriptor { assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW)); assertEquals(hcd.isMobEnabled(), deserializedHcd.isMobEnabled()); assertEquals(hcd.getMobThreshold(), deserializedHcd.getMobThreshold()); + assertEquals(v, Integer.parseInt(deserializedHcd.getDFSReplication())); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index 418afe2..75496f6 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -232,12 +232,16 @@ public class TestHTableDescriptor { byte[] familyName = Bytes.toBytes("cf"); HColumnDescriptor hcd = new HColumnDescriptor(familyName); hcd.setBlocksize(1000); + hcd.setDFSReplication((short) 3); htd.addFamily(hcd); assertEquals(1000, htd.getFamily(familyName).getBlocksize()); + assertEquals("3", htd.getFamily(familyName).getDFSReplication()); hcd = new HColumnDescriptor(familyName); hcd.setBlocksize(2000); + hcd.setDFSReplication((short) 1); htd.modifyFamily(hcd); assertEquals(2000, htd.getFamily(familyName).getBlocksize()); + assertEquals("1", htd.getFamily(familyName).getDFSReplication()); } @Test(expected=IllegalArgumentException.class) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 6a5a9d1..0d1c441 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -268,7 +268,7 @@ public class HFileWriterImpl implements HFile.Writer { FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); + return FSUtils.create(conf, fs, path, perms, favoredNodes); } /** Additional initialization steps */ @@ -326,13 +326,13 @@ public class HFileWriterImpl implements HFile.Writer { doCacheOnWrite(lastDataBlockOffset); } } - + /** * Try to return a Cell that falls between left and * right but that is shorter; i.e. takes up less space. This * trick is used building HFile block index. Its an optimization. It does not * always work. In this case we'll just return the right cell. - * + * * @param comparator * Comparator to use. * @param left @@ -415,7 +415,7 @@ public class HFileWriterImpl implements HFile.Writer { // No opportunity for optimization. Just return right key. return right; } - + /** * @param leftArray * @param leftOffset diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f7d839b..c81c335 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1583,6 +1583,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server { warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); } + // check data replication factor + if (hcd.getDFSReplication() != null && Integer.parseInt(hcd.getDFSReplication()) <= 0) { + String message = + "HFile Replication for column family " + hcd.getNameAsString() + + " must be greater than zero."; + warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null); + } + // TODO: should we check coprocessors and encryption ? } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index a62b6d7..b16738f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -770,7 +770,7 @@ public class HRegionFileSystem { // First check to get the permissions FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // Write the RegionInfo file content - FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); try { out.write(content); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 6d10351..7eef67c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -363,11 +363,12 @@ public abstract class FSUtils { *
  • overwrite the file if it exists
  • *
  • apply the umask in the configuration (if it is enabled)
  • *
  • use the fs configured buffer size (or 4096 if not set)
  • - *
  • use the default replication
  • + *
  • use the configured column family replication or default replication if not set
  • *
  • use the default block size
  • *
  • not track progress
  • * * + * @param conf configurations * @param fs {@link FileSystem} on which to write the file * @param path {@link Path} to the file to write * @param perm permissions @@ -375,23 +376,21 @@ public abstract class FSUtils { * @return output stream to the created file * @throws IOException if the file cannot be created */ - public static FSDataOutputStream create(FileSystem fs, Path path, + public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path, FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { if (fs instanceof HFileSystem) { FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); if (backingFs instanceof DistributedFileSystem) { // Try to use the favoredNodes version via reflection to allow backwards- // compatibility. + String replication = conf.get(HColumnDescriptor.DFS_REPLICATION); try { - return (FSDataOutputStream) (DistributedFileSystem.class - .getDeclaredMethod("create", Path.class, FsPermission.class, - boolean.class, int.class, short.class, long.class, - Progressable.class, InetSocketAddress[].class) - .invoke(backingFs, path, perm, true, - getDefaultBufferSize(backingFs), - getDefaultReplication(backingFs, path), - getDefaultBlockSize(backingFs, path), - null, favoredNodes)); + return (FSDataOutputStream) (DistributedFileSystem.class.getDeclaredMethod("create", + Path.class, FsPermission.class, boolean.class, int.class, short.class, long.class, + Progressable.class, InetSocketAddress[].class).invoke(backingFs, path, perm, true, + getDefaultBufferSize(backingFs), replication != null ? Short.parseShort(replication) + : getDefaultReplication(backingFs, path), getDefaultBlockSize(backingFs, path), + null, favoredNodes)); } catch (InvocationTargetException ite) { // Function was properly called, but threw it's own exception. throw new IOException(ite.getCause()); @@ -2059,7 +2058,7 @@ public abstract class FSUtils { /** * @param c * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. - * @throws IOException + * @throws IOException */ public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java index 572b72a..33c151d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin1.java @@ -30,7 +30,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -57,9 +56,12 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; @@ -1336,4 +1338,59 @@ public class TestAdmin1 { this.admin.deleteTable(tableName); } + + /* + * Test DFS replication for column families, where one CF has default replication(3) and the other + * is set to 1. + */ + @Test(timeout = 300000) + public void testHFileReplication() throws Exception { + TableName name = TableName.valueOf("testHFileReplication"); + String fn1 = "rep1"; + HColumnDescriptor hcd1 = new HColumnDescriptor(fn1); + hcd1.setDFSReplication((short) 1); + String fn = "defaultRep"; + HColumnDescriptor hcd = new HColumnDescriptor(fn); + HTableDescriptor htd = new HTableDescriptor(name); + htd.addFamily(hcd); + htd.addFamily(hcd1); + Table table = TEST_UTIL.createTable(htd, null); + TEST_UTIL.waitTableAvailable(name); + Put p = new Put(Bytes.toBytes("defaultRep_rk")); + byte[] q1 = Bytes.toBytes("q1"); + byte[] v1 = Bytes.toBytes("v1"); + p.addColumn(Bytes.toBytes(fn), q1, v1); + List puts = new ArrayList(2); + puts.add(p); + p = new Put(Bytes.toBytes("rep1_rk")); + p.addColumn(Bytes.toBytes(fn1), q1, v1); + puts.add(p); + try { + table.put(puts); + admin.flush(name); + + List regions = TEST_UTIL.getMiniHBaseCluster().getRegions(name); + for (HRegion r : regions) { + Store store = r.getStore(Bytes.toBytes(fn)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn)); + assertTrue("Column family " + fn + " should have 3 copies", + FSUtils.getDefaultReplication(TEST_UTIL.getTestFileSystem(), sf.getPath()) == (sf + .getFileInfo().getFileStatus().getReplication())); + } + + store = r.getStore(Bytes.toBytes(fn1)); + for (StoreFile sf : store.getStorefiles()) { + assertTrue(sf.toString().contains(fn1)); + assertTrue("Column family " + fn1 + " should have only 1 copy", 1 == sf.getFileInfo() + .getFileStatus().getReplication()); + } + } + } finally { + if (admin.isTableEnabled(name)) { + this.admin.disableTable(name); + this.admin.deleteTable(name); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 6dc4394..2a20ce1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -5521,6 +5521,11 @@ public class TestFromClientSide { hcd.setScope(0); checkTableIsLegal(htd); + hcd.setDFSReplication((short) 0); + checkTableIsIllegal(htd); + hcd.setDFSReplication((short) -1); + checkTableIsIllegal(htd); + // check the conf settings to disable sanity checks htd.setMemStoreFlushSize(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 173b0f9..6a35e84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -266,7 +266,7 @@ public class TestFSUtils { // then that the correct file is created Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); try { - FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, filePerm, null); out.close(); FileStatus stat = fs.getFileStatus(p); assertEquals(new FsPermission("700"), stat.getPermission()); @@ -288,13 +288,13 @@ public class TestFSUtils { Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); try { - FSDataOutputStream out = FSUtils.create(fs, p, perms, null); + FSDataOutputStream out = FSUtils.create(conf, fs, p, perms, null); out.close(); assertTrue("The created file should be present", FSUtils.isExists(fs, p)); // delete the file with recursion as false. Only the file will be deleted. FSUtils.delete(fs, p, false); // Create another file - FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null); + FSDataOutputStream out1 = FSUtils.create(conf, fs, p1, perms, null); out1.close(); // delete the file with recursion as false. Still the file only will be deleted FSUtils.delete(fs, p1, true); @@ -487,7 +487,7 @@ public class TestFSUtils { res = e; } assertTrue("Error reading beyond file boundary.", res != null); - + stm.close(); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index f23ec88..ed4f093 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -790,6 +790,9 @@ module Hbase set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] + family.setDFSReplication(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION))) if arg.include?(org.apache.hadoop.hbase. + HColumnDescriptor::DFS_REPLICATION) arg.each_key do |unknown_key| puts("Unknown argument ignored for column family %s: %s" % [name, unknown_key]) diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb index fca42cb..ab149bf 100644 --- a/hbase-shell/src/main/ruby/shell/commands/create.rb +++ b/hbase-shell/src/main/ruby/shell/commands/create.rb @@ -50,6 +50,7 @@ Examples: hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname) hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}} + hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1} You can also keep around a reference to the created table: -- 1.9.2.msysgit.0