Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -19,20 +19,6 @@ */ package org.apache.hadoop.hbase; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Field; -import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.UUID; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Jdk14Logger; @@ -40,22 +26,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; -import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -70,18 +44,26 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.KeeperException.NodeExistsException; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.security.MessageDigest; +import java.util.*; + +import static org.junit.Assert.assertTrue; + /** - * Facility for testing HBase. Replacement for - * old HBaseTestCase and HBaseClusterTestCase functionality. - * Create an instance and keep it around testing HBase. This class is - * meant to be your one-stop shop for anything you might need testing. Manages - * one cluster at a time only. - * Depends on log4j being on classpath and - * hbase-site.xml for logging and test-run configuration. It does not set - * logging levels nor make changes to configuration parameters. + * Facility for testing HBase. Replacement for old HBaseTestCase and + * HBaseClusterTestCase functionality. Create an instance and keep it + * around testing HBase. This class is meant to be your one-stop shop + * for anything you might need testing. Manages one cluster at a time + * only. Depends on log4j being on classpath and hbase-site.xml for + * logging and test-run configuration. It does not set logging levels + * nor make changes to configuration parameters. */ public class HBaseTestingUtility { private static final Log LOG = LogFactory.getLog(HBaseTestingUtility.class); @@ -105,11 +87,12 @@ private File clusterTestDir = null; /** - * System property key to get test directory value. - * Name is as it is because mini dfs has hard-codings to put test data here. - * It should NOT be used directly in HBase, as it's a property used in - * mini dfs. - * @deprecated can be used only with mini dfs + * System property key to get test directory value. Name is as it is + * because mini dfs has hard-codings to put test data here. It should + * NOT be used directly in HBase, as it's a property used in mini + * dfs. + * + * @deprecated can be used only with mini dfs */ private static final String TEST_DIRECTORY_KEY = "test.build.data"; @@ -124,18 +107,28 @@ */ public static final String DEFAULT_BASE_TEST_DIRECTORY = "target/test-data"; - /** Compression algorithms to use in parameterized JUnit 4 tests */ + /** + * Compression algorithms to use in parameterized JUnit 4 tests + */ public static final List COMPRESSION_ALGORITHMS_PARAMETERIZED = - Arrays.asList(new Object[][] { - { Compression.Algorithm.NONE }, - { Compression.Algorithm.GZ } + Arrays.asList(new Object[][]{ + {Compression.Algorithm.NONE}, + {Compression.Algorithm.GZ} }); - /** Compression algorithms to use in testing */ - public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={ - Compression.Algorithm.NONE, Compression.Algorithm.GZ - }; + /** + * Compression algorithms to use in testing + */ + public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { + Compression.Algorithm.NONE, Compression.Algorithm.GZ + }; + // Copied from HBaseTestCase to allow us to generate storefiles in compaction test + protected static final char FIRST_CHAR = 'a'; + protected static final char LAST_CHAR = 'z'; + protected static final String PUNCTUATION = "~`@#$%^&*()-_+=:;',.<>/?[]{}|"; + protected static final byte[] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + public HBaseTestingUtility() { this(HBaseConfiguration.create()); } @@ -145,14 +138,16 @@ } /** - * Returns this classes's instance of {@link Configuration}. Be careful how - * you use the returned Configuration since {@link HConnection} instances - * can be shared. The Map of HConnections is keyed by the Configuration. If - * say, a Connection was being used against a cluster that had been shutdown, - * see {@link #shutdownMiniCluster()}, then the Connection will no longer - * be wholesome. Rather than use the return direct, its usually best to - * make a copy and use that. Do - * Configuration c = new Configuration(INSTANCE.getConfiguration()); + * Returns this classes's instance of {@link Configuration}. Be + * careful how you use the returned Configuration since {@link + * HConnection} instances can be shared. The Map of HConnections is + * keyed by the Configuration. If say, a Connection was being used + * against a cluster that had been shutdown, see {@link + * #shutdownMiniCluster()}, then the Connection will no longer be + * wholesome. Rather than use the return direct, its usually best to + * make a copy and use that. Do Configuration c = new + * Configuration(INSTANCE.getConfiguration()); + * * @return Instance of Configuration. */ public Configuration getConfiguration() { @@ -161,9 +156,9 @@ /** * @return Where to write test data on local filesystem; usually - * {@link #DEFAULT_BASE_TEST_DIRECTORY} - * Should not be used by the unit tests, hence its's private. - * Unit test will use a subdirectory of this directory. + * {@link #DEFAULT_BASE_TEST_DIRECTORY} Should not be used by + * the unit tests, hence its's private. Unit test will use a + * subdirectory of this directory. * @see #setupDataTestDir() * @see #getTestFileSystem() */ @@ -176,24 +171,24 @@ /** * @return Where to write test data on local filesystem, specific to - * the test. Useful for tests that do not use a cluster. - * Creates it if it does not exist already. + * the test. Useful for tests that do not use a cluster. + * Creates it if it does not exist already. * @see #getTestFileSystem() */ public Path getDataTestDir() { - if (dataTestDir == null){ + if (dataTestDir == null) { setupDataTestDir(); } return new Path(dataTestDir.getAbsolutePath()); } /** - * @return Where the DFS cluster will write data on the local subsystem. - * Creates it if it does not exist already. + * @return Where the DFS cluster will write data on the local + * subsystem. Creates it if it does not exist already. * @see #getTestFileSystem() */ public Path getClusterTestDir() { - if (clusterTestDir == null){ + if (clusterTestDir == null) { setupClusterTestDir(); } return new Path(clusterTestDir.getAbsolutePath()); @@ -202,8 +197,8 @@ /** * @param subdirName * @return Path to a subdirectory named subdirName under - * {@link #getDataTestDir()}. - * Does *NOT* create it if it does not exist. + * {@link #getDataTestDir()}. Does *NOT* create it if it does + * not exist. */ public Path getDataTestDir(final String subdirName) { return new Path(getDataTestDir(), subdirName); @@ -213,11 +208,12 @@ * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. * Give it a random name so can have many concurrent tests running if * we need to. It needs to amend the {@link #TEST_DIRECTORY_KEY} - * System property, as it's what minidfscluster bases - * it data dir on. Moding a System property is not the way to do concurrent - * instances -- another instance could grab the temporary - * value unintentionally -- but not anything can do about it at moment; - * single instance only is how the minidfscluster works. + * System property, as it's what minidfscluster bases it data dir on. + * Moding a System property is not the way to do concurrent instances + * -- another instance could grab the temporary value unintentionally + * -- but not anything can do about it at moment; single instance only + * is how the minidfscluster works. + * * @return The calculated data test build directory. */ private void setupDataTestDir() { @@ -228,7 +224,7 @@ } String randomStr = UUID.randomUUID().toString(); - Path testPath= new Path( + Path testPath = new Path( getBaseTestDir(), randomStr ); @@ -256,7 +252,8 @@ } /** - * @throws IOException If a cluster -- zk, dfs, or hbase -- already running. + * @throws IOException If a cluster -- zk, dfs, or hbase -- already + * running. */ public void isRunningCluster() throws IOException { if (dfsCluster == null) return; @@ -266,29 +263,30 @@ /** * Start a minidfscluster. + * * @param servers How many DNs to start. + * @return The mini dfs cluster created. * @throws Exception * @see {@link #shutdownMiniDFSCluster()} - * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { return startMiniDFSCluster(servers, null); } /** - * Start a minidfscluster. - * This is useful if you want to run datanode on distinct hosts for things - * like HDFS block location verification. - * If you start MiniDFSCluster without host names, all instances of the - * datanodes will have the same host name. + * Start a minidfscluster. This is useful if you want to run datanode + * on distinct hosts for things like HDFS block location verification. + * If you start MiniDFSCluster without host names, all instances of + * the datanodes will have the same host name. + * * @param hosts hostnames DNs to run on. + * @return The mini dfs cluster created. * @throws Exception * @see {@link #shutdownMiniDFSCluster()} - * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(final String hosts[]) throws Exception { - if ( hosts != null && hosts.length != 0) { + if (hosts != null && hosts.length != 0) { return startMiniDFSCluster(hosts.length, hosts); } else { return startMiniDFSCluster(1, null); @@ -296,16 +294,16 @@ } /** - * Start a minidfscluster. - * Can only create one. + * Start a minidfscluster. Can only create one. + * * @param servers How many DNs to start. - * @param hosts hostnames DNs to run on. + * @param hosts hostnames DNs to run on. + * @return The mini dfs cluster created. * @throws Exception * @see {@link #shutdownMiniDFSCluster()} - * @return The mini dfs cluster created. */ public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) - throws Exception { + throws Exception { // Check that there is not already a cluster running isRunningCluster(); @@ -340,8 +338,9 @@ } /** - * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} - * or does nothing. + * Shuts down instance created by call to {@link + * #startMiniDFSCluster(int)} or does nothing. + * * @throws Exception */ public void shutdownMiniDFSCluster() throws Exception { @@ -355,51 +354,56 @@ /** * Call this if you only want a zk cluster. - * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * + * @return zk cluster started. * @throws Exception + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini + * cluster. * @see #shutdownMiniZKCluster() - * @return zk cluster started. */ public MiniZooKeeperCluster startMiniZKCluster() throws Exception { return startMiniZKCluster(1); } - + /** * Call this if you only want a zk cluster. + * * @param zooKeeperServerNum - * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @return zk cluster started. * @throws Exception + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini + * cluster. * @see #shutdownMiniZKCluster() - * @return zk cluster started. */ - public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) - throws Exception { + public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) + throws Exception { File zkClusterFile = new File(getClusterTestDir().toString()); return startMiniZKCluster(zkClusterFile, zooKeeperServerNum); } - + private MiniZooKeeperCluster startMiniZKCluster(final File dir) throws Exception { - return startMiniZKCluster(dir,1); + return startMiniZKCluster(dir, 1); } - - private MiniZooKeeperCluster startMiniZKCluster(final File dir, - int zooKeeperServerNum) - throws Exception { + + private MiniZooKeeperCluster startMiniZKCluster(final File dir, + int zooKeeperServerNum) + throws Exception { if (this.zkCluster != null) { throw new IOException("Cluster already running at " + dir); } this.passedZkCluster = false; this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration()); - int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum); + int clientPort = this.zkCluster.startup(dir, zooKeeperServerNum); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); return this.zkCluster; } /** - * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)} - * or does nothing. + * Shuts down zk cluster created by call to {@link + * #startMiniZKCluster(File)} or does nothing. + * * @throws IOException * @see #startMiniZKCluster() */ @@ -412,8 +416,9 @@ /** * Start up a minicluster of hbase, dfs, and zookeeper. + * + * @return Mini hbase cluster instance created. * @throws Exception - * @return Mini hbase cluster instance created. * @see {@link #shutdownMiniDFSCluster()} */ public MiniHBaseCluster startMiniCluster() throws Exception { @@ -422,70 +427,78 @@ /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. - * Modifies Configuration. Homes the cluster data directory under a random - * subdirectory in a directory under System property test.build.data. - * Directory is cleaned up on exit. - * @param numSlaves Number of slaves to start up. We'll start this many - * datanodes and regionservers. If numSlaves is > 1, then make sure - * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise - * bind errors. + * Modifies Configuration. Homes the cluster data directory under a + * random subdirectory in a directory under System property + * test.build.data. Directory is cleaned up on exit. + * + * @param numSlaves Number of slaves to start up. We'll start this + * many datanodes and regionservers. If numSlaves is + * > 1, then make sure hbase.regionserver.info.port + * is -1 (i.e. no ui per regionserver) otherwise bind + * errors. + * @return Mini hbase cluster instance created. * @throws Exception * @see {@link #shutdownMiniCluster()} - * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numSlaves) - throws Exception { + throws Exception { return startMiniCluster(1, numSlaves); } - + /** * start minicluster + * + * @return Mini hbase cluster instance created. * @throws Exception * @see {@link #shutdownMiniCluster()} - * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves) - throws Exception { + final int numSlaves) + throws Exception { return startMiniCluster(numMasters, numSlaves, null); } - - + + /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. - * Modifies Configuration. Homes the cluster data directory under a random - * subdirectory in a directory under System property test.build.data. - * Directory is cleaned up on exit. - * @param numMasters Number of masters to start up. We'll start this many - * hbase masters. If numMasters > 1, you can find the active/primary master - * with {@link MiniHBaseCluster#getMaster()}. - * @param numSlaves Number of slaves to start up. We'll start this many - * regionservers. If dataNodeHosts == null, this also indicates the number of - * datanodes to start. If dataNodeHosts != null, the number of datanodes is - * based on dataNodeHosts.length. - * If numSlaves is > 1, then make sure - * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise - * bind errors. - * @param dataNodeHosts hostnames DNs to run on. - * This is useful if you want to run datanode on distinct hosts for things - * like HDFS block location verification. - * If you start MiniDFSCluster without host names, - * all instances of the datanodes will have the same host name. + * Modifies Configuration. Homes the cluster data directory under a + * random subdirectory in a directory under System property + * test.build.data. Directory is cleaned up on exit. + * + * @param numMasters Number of masters to start up. We'll start + * this many hbase masters. If numMasters > 1, + * you can find the active/primary master with + * {@link MiniHBaseCluster#getMaster()}. + * @param numSlaves Number of slaves to start up. We'll start + * this many regionservers. If dataNodeHosts == + * null, this also indicates the number of + * datanodes to start. If dataNodeHosts != null, + * the number of datanodes is based on + * dataNodeHosts.length. If numSlaves is > 1, + * then make sure hbase.regionserver.info.port is + * -1 (i.e. no ui per regionserver) otherwise + * bind errors. + * @param dataNodeHosts hostnames DNs to run on. This is useful if you + * want to run datanode on distinct hosts for + * things like HDFS block location verification. + * If you start MiniDFSCluster without host + * names, all instances of the datanodes will + * have the same host name. + * @return Mini hbase cluster instance created. * @throws Exception * @see {@link #shutdownMiniCluster()} - * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves, final String[] dataNodeHosts) - throws Exception { + final int numSlaves, final String[] dataNodeHosts) + throws Exception { int numDataNodes = numSlaves; - if ( dataNodeHosts != null && dataNodeHosts.length != 0) { + if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } - + LOG.info("Starting up minicluster with " + numMasters + " master(s) and " + - numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)"); + numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)"); // If we already put up a cluster, fail. isRunningCluster(); @@ -504,9 +517,11 @@ } /** - * Starts up mini hbase cluster. Usually used after call to - * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters. - * Usually you won't want this. You'll usually want {@link #startMiniCluster()}. + * Starts up mini hbase cluster. Usually used after call to {@link + * #startMiniCluster(int, int)} when doing stepped startup of + * clusters. Usually you won't want this. You'll usually want {@link + * #startMiniCluster()}. + * * @param numMasters * @param numSlaves * @return Reference to the hbase mini hbase cluster. @@ -515,8 +530,8 @@ * @see {@link #startMiniCluster()} */ public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, - final int numSlaves) - throws IOException, InterruptedException { + final int numSlaves) + throws IOException, InterruptedException { // Now do the mini hbase cluster. Set the hbase.rootdir in config. createRootDir(); Configuration c = new Configuration(this.conf); @@ -535,8 +550,10 @@ } /** - * Starts the hbase cluster up again after shutting it down previously in a - * test. Use this if you want to keep dfs/zk up and just stop/start hbase. + * Starts the hbase cluster up again after shutting it down previously + * in a test. Use this if you want to keep dfs/zk up and just + * stop/start hbase. + * * @param servers number of region servers * @throws IOException */ @@ -554,8 +571,8 @@ } /** - * @return Current mini hbase cluster. Only has something in it after a call - * to {@link #startMiniCluster()}. + * @return Current mini hbase cluster. Only has something in it after + * a call to {@link #startMiniCluster()}. * @see #startMiniCluster() */ public MiniHBaseCluster getMiniHBaseCluster() { @@ -564,13 +581,14 @@ /** * Stops mini hbase, zk, and hdfs clusters. + * * @throws IOException * @see {@link #startMiniCluster(int)} */ public void shutdownMiniCluster() throws Exception { LOG.info("Shutting down minicluster"); shutdownMiniHBaseCluster(); - if (!this.passedZkCluster){ + if (!this.passedZkCluster) { shutdownMiniZKCluster(); } shutdownMiniDFSCluster(); @@ -579,7 +597,7 @@ if (this.clusterTestDir != null && this.clusterTestDir.exists()) { // Need to use deleteDirectory because File.delete required dir is empty. if (!FSUtils.deleteDirectory(FileSystem.getLocal(this.conf), - new Path(this.clusterTestDir.toString()))) { + new Path(this.clusterTestDir.toString()))) { LOG.warn("Failed delete of " + this.clusterTestDir.toString()); } this.clusterTestDir = null; @@ -588,7 +606,9 @@ } /** - * Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. + * Shutdown HBase mini cluster. Does not shutdown zk or dfs if + * running. + * * @throws IOException */ public void shutdownMiniHBaseCluster() throws IOException { @@ -603,19 +623,21 @@ /** * Returns the path to the default root dir the minicluster uses. * Note: this does not cause the root dir to be created. + * * @return Fully qualified path for the default hbase root dir * @throws IOException */ public Path getDefaultRootDirPath() throws IOException { - FileSystem fs = FileSystem.get(this.conf); - return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase"); + FileSystem fs = FileSystem.get(this.conf); + return new Path(fs.makeQualified(fs.getHomeDirectory()), "hbase"); } /** - * Creates an hbase rootdir in user home directory. Also creates hbase - * version file. Normally you won't make use of this method. Root hbasedir - * is created for you as part of mini cluster startup. You'd only use this - * method if you were doing manual operation. + * Creates an hbase rootdir in user home directory. Also creates + * hbase version file. Normally you won't make use of this method. + * Root hbasedir is created for you as part of mini cluster startup. + * You'd only use this method if you were doing manual operation. + * * @return Fully qualified path to hbase root dir * @throws IOException */ @@ -630,6 +652,7 @@ /** * Flushes all caches in the mini hbase cluster + * * @throws IOException */ public void flush() throws IOException { @@ -638,51 +661,55 @@ /** * Flushes all caches in the mini hbase cluster + * * @throws IOException */ - public void flush(byte [] tableName) throws IOException { + public void flush(byte[] tableName) throws IOException { this.hbaseCluster.flushcache(tableName); } /** * Create a table. + * * @param tableName * @param family * @return An HTable instance for the created table. * @throws IOException */ public HTable createTable(byte[] tableName, byte[] family) - throws IOException{ + throws IOException { return createTable(tableName, new byte[][]{family}); } /** * Create a table. + * * @param tableName * @param families * @return An HTable instance for the created table. * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families) - throws IOException { + throws IOException { return createTable(tableName, families, - new Configuration(getConfiguration())); + new Configuration(getConfiguration())); } /** * Create a table. + * * @param tableName * @param families - * @param c Configuration to use + * @param c Configuration to use * @return An HTable instance for the created table. * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families, - final Configuration c) - throws IOException { + final Configuration c) + throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); - for(byte[] family : families) { + for (byte[] family : families) { desc.addFamily(new HColumnDescriptor(family)); } HBaseAdmin admin = getHBaseAdmin(); @@ -693,25 +720,26 @@ /** * Create a table. + * * @param tableName * @param families - * @param c Configuration to use + * @param c Configuration to use * @param numVersions * @return An HTable instance for the created table. * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families, - final Configuration c, int numVersions) - throws IOException { + final Configuration c, int numVersions) + throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); - for(byte[] family : families) { + for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions, - HColumnDescriptor.DEFAULT_COMPRESSION, - HColumnDescriptor.DEFAULT_IN_MEMORY, - HColumnDescriptor.DEFAULT_BLOCKCACHE, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, - HColumnDescriptor.DEFAULT_BLOOMFILTER, - HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); } HBaseAdmin admin = getHBaseAdmin(); @@ -722,6 +750,7 @@ /** * Create a table. + * * @param tableName * @param family * @param numVersions @@ -729,12 +758,13 @@ * @throws IOException */ public HTable createTable(byte[] tableName, byte[] family, int numVersions) - throws IOException { + throws IOException { return createTable(tableName, new byte[][]{family}, numVersions); } /** * Create a table. + * * @param tableName * @param families * @param numVersions @@ -742,17 +772,17 @@ * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families, - int numVersions) - throws IOException { + int numVersions) + throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions, - HColumnDescriptor.DEFAULT_COMPRESSION, - HColumnDescriptor.DEFAULT_IN_MEMORY, - HColumnDescriptor.DEFAULT_BLOCKCACHE, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, - HColumnDescriptor.DEFAULT_BLOOMFILTER, - HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); } HBaseAdmin admin = getHBaseAdmin(); @@ -763,6 +793,7 @@ /** * Create a table. + * * @param tableName * @param families * @param numVersions @@ -770,16 +801,16 @@ * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families, - int numVersions, int blockSize) throws IOException { + int numVersions, int blockSize) throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions, - HColumnDescriptor.DEFAULT_COMPRESSION, - HColumnDescriptor.DEFAULT_IN_MEMORY, - HColumnDescriptor.DEFAULT_BLOCKCACHE, - blockSize, HColumnDescriptor.DEFAULT_TTL, - HColumnDescriptor.DEFAULT_BLOOMFILTER, - HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + blockSize, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); } HBaseAdmin admin = getHBaseAdmin(); @@ -790,6 +821,7 @@ /** * Create a table. + * * @param tableName * @param families * @param numVersions @@ -797,18 +829,18 @@ * @throws IOException */ public HTable createTable(byte[] tableName, byte[][] families, - int[] numVersions) - throws IOException { + int[] numVersions) + throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); int i = 0; for (byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions[i], - HColumnDescriptor.DEFAULT_COMPRESSION, - HColumnDescriptor.DEFAULT_IN_MEMORY, - HColumnDescriptor.DEFAULT_BLOCKCACHE, - HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, - HColumnDescriptor.DEFAULT_BLOOMFILTER, - HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); i++; } @@ -820,6 +852,7 @@ /** * Drop an existing table + * * @param tableName existing table */ public void deleteTable(byte[] tableName) throws IOException { @@ -831,15 +864,16 @@ /** * Provide an existing table name to truncate + * * @param tableName existing table * @return HTable to that new table * @throws IOException */ - public HTable truncateTable(byte [] tableName) throws IOException { + public HTable truncateTable(byte[] tableName) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Scan scan = new Scan(); ResultScanner resScan = table.getScanner(scan); - for(Result res : resScan) { + for (Result res : resScan) { Delete del = new Delete(res.getRow()); table.delete(del); } @@ -850,6 +884,7 @@ /** * Load table with rows from 'aaa' to 'zzz'. + * * @param t Table * @param f Family * @return Count of rows loaded. @@ -875,15 +910,17 @@ t.flushCommits(); return rowCount; } + /** * Load region with rows from 'aaa' to 'zzz'. + * * @param r Region * @param f Family * @return Count of rows loaded. * @throws IOException */ public int loadRegion(final HRegion r, final byte[] f) - throws IOException { + throws IOException { byte[] k = new byte[3]; int rowCount = 0; for (byte b1 = 'a'; b1 <= 'z'; b1++) { @@ -934,13 +971,13 @@ /** * Creates many regions names "aaa" to "zzz". * - * @param table The table to use for the data. - * @param columnFamily The family to insert the data into. + * @param table The table to use for the data. + * @param columnFamily The family to insert the data into. * @return count of regions created. * @throws IOException When creating the regions fails. */ public int createMultiRegions(HTable table, byte[] columnFamily) - throws IOException { + throws IOException { return createMultiRegions(getConfiguration(), table, columnFamily); } @@ -958,20 +995,22 @@ /** * Creates many regions names "aaa" to "zzz". - * @param c Configuration to use. - * @param table The table to use for the data. - * @param columnFamily The family to insert the data into. + * + * @param c Configuration to use. + * @param table The table to use for the data. + * @param columnFamily The family to insert the data into. * @return count of regions created. * @throws IOException When creating the regions fails. */ public int createMultiRegions(final Configuration c, final HTable table, - final byte[] columnFamily) - throws IOException { + final byte[] columnFamily) + throws IOException { return createMultiRegions(c, table, columnFamily, KEYS); } /** * Creates the specified number of regions in the specified table. + * * @param c * @param table * @param family @@ -980,27 +1019,28 @@ * @throws IOException */ public int createMultiRegions(final Configuration c, final HTable table, - final byte [] family, int numRegions) - throws IOException { - if (numRegions < 3) throw new IOException("Must create at least 3 regions"); - byte [] startKey = Bytes.toBytes("aaaaa"); - byte [] endKey = Bytes.toBytes("zzzzz"); - byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); - byte [][] regionStartKeys = new byte[splitKeys.length+1][]; - for (int i=0;i createMultiRegionsInMeta(final Configuration conf, - final HTableDescriptor htd, byte [][] startKeys) - throws IOException { + final HTableDescriptor htd, byte[][] startKeys) + throws IOException { HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); List newRegions = new ArrayList(startKeys.length); @@ -1067,7 +1108,7 @@ for (int i = 0; i < startKeys.length; i++) { int j = (i + 1) % startKeys.length; HRegionInfo hri = new HRegionInfo(htd.getName(), startKeys[i], - startKeys[j]); + startKeys[j]); Put put = new Put(hri.getRegionName()); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); @@ -1112,10 +1153,10 @@ ResultScanner s = t.getScanner(new Scan()); for (Result result : s) { HRegionInfo info = Writables.getHRegionInfo( - result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); if (Bytes.compareTo(info.getTableName(), tableName) == 0) { LOG.info("getMetaTableRows: row -> " + - Bytes.toStringBinary(result.getRow())); + Bytes.toStringBinary(result.getRow())); rows.add(result.getRow()); } } @@ -1125,24 +1166,24 @@ } /** - * Tool to get the reference to the region server object that holds the - * region of the specified user table. - * It first searches for the meta rows that contain the region of the - * specified table, then gets the index of that RS, and finally retrieves - * the RS's reference. + * Tool to get the reference to the region server object that holds + * the region of the specified user table. It first searches for the + * meta rows that contain the region of the specified table, then gets + * the index of that RS, and finally retrieves the RS's reference. + * * @param tableName user table to lookup in .META. * @return region server that holds it, null if the row doesn't exist * @throws IOException */ public HRegionServer getRSForFirstRegionInTable(byte[] tableName) - throws IOException { + throws IOException { List metaRows = getMetaTableRows(tableName); if (metaRows == null || metaRows.isEmpty()) { return null; } LOG.debug("Found " + metaRows.size() + " rows for table " + Bytes.toString(tableName)); - byte [] firstrow = metaRows.get(0); + byte[] firstrow = metaRows.get(0); LOG.debug("FirstRow=" + Bytes.toString(firstrow)); int index = hbaseCluster.getServerWith(firstrow); return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); @@ -1161,7 +1202,7 @@ /** * Starts a MiniMRCluster. * - * @param servers The number of TaskTracker's to start. + * @param servers The number of TaskTracker's to start. * @throws IOException When starting the cluster fails. */ public void startMiniMapReduceCluster(final int servers) throws IOException { @@ -1174,7 +1215,7 @@ FileSystem.get(c).getUri().toString(), 1); LOG.info("Mini mapreduce cluster started"); c.set("mapred.job.tracker", - mrCluster.createJobConf().get("mapred.job.tracker")); + mrCluster.createJobConf().get("mapred.job.tracker")); /* this for mrv2 support */ conf.set("mapreduce.framework.name", "yarn"); } @@ -1196,7 +1237,7 @@ /** * Switches the logger for the given class to DEBUG level. * - * @param clazz The class for which to switch to debug logging. + * @param clazz The class for which to switch to debug logging. */ public void enableDebug(Class clazz) { Log l = LogFactory.getLog(clazz); @@ -1209,6 +1250,7 @@ /** * Expire the Master's session + * * @throws Exception */ public void expireMasterSession() throws Exception { @@ -1218,6 +1260,7 @@ /** * Expire a region server's session + * * @param index which RS * @throws Exception */ @@ -1232,7 +1275,7 @@ } public void expireSession(ZooKeeperWatcher nodeZK, Server server, - boolean checkStatus) throws Exception { + boolean checkStatus) throws Exception { Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds @@ -1241,7 +1284,7 @@ long sessionID = zk.getSessionId(); ZooKeeper newZK = new ZooKeeper(quorumServers, - sessionTimeout, EmptyWatcher.instance, sessionID, password); + sessionTimeout, EmptyWatcher.instance, sessionID, password); newZK.close(); final long sleep = sessionTimeout * 5L; LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) + @@ -1271,14 +1314,14 @@ * @throws IOException */ public HBaseAdmin getHBaseAdmin() - throws IOException { + throws IOException { return new HBaseAdmin(new Configuration(getConfiguration())); } /** * Closes the named region. * - * @param regionName The region to close. + * @param regionName The region to close. * @throws IOException */ public void closeRegion(String regionName) throws IOException { @@ -1288,7 +1331,7 @@ /** * Closes the named region. * - * @param regionName The region to close. + * @param regionName The region to close. * @throws IOException */ public void closeRegion(byte[] regionName) throws IOException { @@ -1300,8 +1343,8 @@ /** * Closes the region containing the given row. * - * @param row The row to find the containing region. - * @param table The table to find the region. + * @param row The row to find the containing region. + * @param table The table to find the region. * @throws IOException */ public void closeRegionByRow(String row, HTable table) throws IOException { @@ -1311,8 +1354,8 @@ /** * Closes the region containing the given row. * - * @param row The row to find the containing region. - * @param table The table to find the region. + * @param row The row to find the containing region. + * @param table The table to find the region. * @throws IOException */ public void closeRegionByRow(byte[] row, HTable table) throws IOException { @@ -1342,7 +1385,7 @@ * @throws IOException */ public boolean cleanupTestDir() throws IOException { - if (dataTestDir == null ){ + if (dataTestDir == null) { return false; } else { boolean ret = deleteDir(getDataTestDir()); @@ -1357,7 +1400,7 @@ * @throws IOException */ public boolean cleanupTestDir(final String subdir) throws IOException { - if (dataTestDir == null){ + if (dataTestDir == null) { return false; } return deleteDir(getDataTestDir(subdir)); @@ -1377,29 +1420,30 @@ } public void waitTableAvailable(byte[] table, long timeoutMillis) - throws InterruptedException, IOException { + throws InterruptedException, IOException { HBaseAdmin admin = getHBaseAdmin(); long startWait = System.currentTimeMillis(); while (!admin.isTableAvailable(table)) { assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table), - System.currentTimeMillis() - startWait < timeoutMillis); + System.currentTimeMillis() - startWait < timeoutMillis); Thread.sleep(200); } admin.close(); } /** - * Make sure that at least the specified number of region servers - * are running + * Make sure that at least the specified number of region servers are + * running + * * @param num minimum number of region servers that should be running * @return True if we started some servers * @throws IOException */ public boolean ensureSomeRegionServersAvailable(final int num) - throws IOException { + throws IOException { boolean startedServer = false; - for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i - * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... + * 2010-06-15 11:52:28,511 WARN [DataStreamer for file + * /hbase/.logs/hlog.1276627923013 block + * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): + * Error + * Recovery for block + * blk_928005470262850423_1021 failed because recovery from primary + * datanode + * 127.0.0.1:53683 failed 4 times. + * Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... * + * * @param stream A DFSClient.DFSOutputStream. * @param max * @throws NoSuchFieldException @@ -1449,10 +1504,10 @@ * @throws IllegalArgumentException */ public static void setMaxRecoveryErrorCount(final OutputStream stream, - final int max) { + final int max) { try { - Class [] clazzes = DFSClient.class.getDeclaredClasses(); - for (Class clazz: clazzes) { + Class[] clazzes = DFSClient.class.getDeclaredClasses(); + for (Class clazz : clazzes) { String className = clazz.getSimpleName(); if (className.equals("DFSOutputStream")) { if (clazz.isInstance(stream)) { @@ -1472,21 +1527,22 @@ /** * Wait until countOfRegion in .META. have a non-empty - * info:server. This means all regions have been deployed, master has been - * informed and updated .META. with the regions deployed server. + * info:server. This means all regions have been deployed, master has + * been informed and updated .META. with the regions deployed server. + * * @param countOfRegions How many regions in .META. * @throws IOException */ public void waitUntilAllRegionsAssigned(final int countOfRegions) - throws IOException { + throws IOException { HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME); while (true) { int rows = 0; Scan scan = new Scan(); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = + for (Result r = null; (r = s.next()) != null; ) { + byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); if (b == null || b.length <= 0) { break; @@ -1504,15 +1560,16 @@ } /** - * Do a small get/scan against one store. This is required because store - * has no actual methods of querying itself, and relies on StoreScanner. + * Do a small get/scan against one store. This is required because + * store has no actual methods of querying itself, and relies on + * StoreScanner. */ public static List getFromStoreFile(Store store, Get get) throws IOException { MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, - scan.getFamilyMap().get(store.getFamily().getName())); + scan.getFamilyMap().get(store.getFamily().getName())); List result = new ArrayList(); scanner.next(result); @@ -1528,47 +1585,50 @@ } /** - * Do a small get/scan against one store. This is required because store - * has no actual methods of querying itself, and relies on StoreScanner. + * Do a small get/scan against one store. This is required because + * store has no actual methods of querying itself, and relies on + * StoreScanner. */ public static List getFromStoreFile(Store store, - byte [] row, + byte[] row, NavigableSet columns - ) throws IOException { + ) throws IOException { Get get = new Get(row); Map> s = get.getFamilyMap(); s.put(store.getFamily().getName(), columns); - return getFromStoreFile(store,get); + return getFromStoreFile(store, get); } - + /** * Gets a ZooKeeperWatcher. + * * @param TEST_UTIL */ public static ZooKeeperWatcher getZooKeeperWatcher( - HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException, - IOException { + HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException, + IOException { ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), - "unittest", new Abortable() { - boolean aborted = false; + "unittest", new Abortable() { + boolean aborted = false; - @Override - public void abort(String why, Throwable e) { - aborted = true; - throw new RuntimeException("Fatal ZK error, why=" + why, e); - } + @Override + public void abort(String why, Throwable e) { + aborted = true; + throw new RuntimeException("Fatal ZK error, why=" + why, e); + } - @Override - public boolean isAborted() { - return aborted; - } - }); + @Override + public boolean isAborted() { + return aborted; + } + }); return zkw; } - + /** * Creates a znode with OPENED state. + * * @param TEST_UTIL * @param region * @param serverName @@ -1579,16 +1639,375 @@ * @throws NodeExistsException */ public static ZooKeeperWatcher createAndForceNodeToOpenedState( - HBaseTestingUtility TEST_UTIL, HRegion region, - ServerName serverName) throws ZooKeeperConnectionException, - IOException, KeeperException, NodeExistsException { + HBaseTestingUtility TEST_UTIL, HRegion region, + ServerName serverName) throws ZooKeeperConnectionException, + IOException, KeeperException, NodeExistsException { ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL); ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName); int version = ZKAssign.transitionNodeOpening(zkw, region - .getRegionInfo(), serverName); + .getRegionInfo(), serverName); ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName, - version); + version); return zkw; } - + + // Copied from HBaseTestCase to allow us to generate storefiles in compaction test + + /** + * Implementors can flushcache. + */ + public static interface FlushCache { + /** + * @throws IOException + */ + public void flushcache() throws IOException; + } + + /** + * Interface used by tests so can do common operations against an + * HTable or an HRegion. + *

+ * TOOD: Come up w/ a better name for this interface. + */ + public static interface Incommon { + /** + * @param delete + * @param lockid + * @param writeToWAL + * @throws IOException + */ + public void delete(Delete delete, Integer lockid, boolean writeToWAL) + throws IOException; + + /** + * @param put + * @throws IOException + */ + public void put(Put put) throws IOException; + + public Result get(Get get) throws IOException; + + /** + * @param family + * @param qualifiers + * @param firstRow + * @param ts + * @return scanner for specified columns, first row and timestamp + * @throws IOException + */ + public ScannerIncommon getScanner(byte[] family, byte[][] qualifiers, + byte[] firstRow, long ts) + throws IOException; + } + + /** + * A class that makes a {@link Incommon} out of a {@link HRegion} + */ + public static class HRegionIncommon implements Incommon, FlushCache { + final HRegion region; + + /** + * @param HRegion + */ + public HRegionIncommon(final HRegion HRegion) { + this.region = HRegion; + } + + public void put(Put put) throws IOException { + region.put(put); + } + + public void delete(Delete delete, Integer lockid, boolean writeToWAL) + throws IOException { + this.region.delete(delete, lockid, writeToWAL); + } + + public Result get(Get get) throws IOException { + return region.get(get, null); + } + + public ScannerIncommon getScanner(byte[] family, byte[][] qualifiers, + byte[] firstRow, long ts) + throws IOException { + Scan scan = new Scan(firstRow); + if (qualifiers == null || qualifiers.length == 0) { + scan.addFamily(family); + } else { + for (int i = 0; i < qualifiers.length; i++) { + scan.addColumn(HConstants.CATALOG_FAMILY, qualifiers[i]); + } + } + scan.setTimeRange(0, ts); + return new + InternalScannerIncommon(region.getScanner(scan)); + } + + public Result get(Get get, Integer lockid) throws IOException { + return this.region.get(get, lockid); + } + + + public void flushcache() throws IOException { + this.region.flushcache(); + } + } + + /** + * A class that makes a {@link Incommon} out of a {@link HTable} + */ + public static class HTableIncommon implements Incommon { + final HTable table; + + /** + * @param table + */ + public HTableIncommon(final HTable table) { + super(); + this.table = table; + } + + public void put(Put put) throws IOException { + table.put(put); + } + + + public void delete(Delete delete, Integer lockid, boolean writeToWAL) + throws IOException { + this.table.delete(delete); + } + + public Result get(Get get) throws IOException { + return table.get(get); + } + + public ScannerIncommon getScanner(byte[] family, byte[][] qualifiers, + byte[] firstRow, long ts) + throws IOException { + Scan scan = new Scan(firstRow); + if (qualifiers == null || qualifiers.length == 0) { + scan.addFamily(family); + } else { + for (int i = 0; i < qualifiers.length; i++) { + scan.addColumn(HConstants.CATALOG_FAMILY, qualifiers[i]); + } + } + scan.setTimeRange(0, ts); + return new + ClientScannerIncommon(table.getScanner(scan)); + } + } + + public interface ScannerIncommon + extends Iterable { + public boolean next(List values) + throws IOException; + + public void close() throws IOException; + } + + public static class ClientScannerIncommon implements ScannerIncommon { + ResultScanner scanner; + + public ClientScannerIncommon(ResultScanner scanner) { + this.scanner = scanner; + } + + public boolean next(List values) + throws IOException { + Result results = scanner.next(); + if (results == null) { + return false; + } + values.clear(); + values.addAll(results.list()); + return true; + } + + public void close() throws IOException { + scanner.close(); + } + + @SuppressWarnings("unchecked") + public Iterator iterator() { + return scanner.iterator(); + } + } + + public static class InternalScannerIncommon implements ScannerIncommon { + InternalScanner scanner; + + public InternalScannerIncommon(InternalScanner scanner) { + this.scanner = scanner; + } + + public boolean next(List results) + throws IOException { + return scanner.next(results); + } + + public void close() throws IOException { + scanner.close(); + } + + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + } + + + /** + * Add content to region r on the passed column + * column. Adds data of the from 'aaa', 'aab', etc where + * key and value are the same. + * + * @param r + * @param columnFamily + * @return count of what we added. + * @throws IOException + */ + public static long addContent(final HRegion r, final byte[] columnFamily) + throws IOException { + byte[] startKey = r.getRegionInfo().getStartKey(); + byte[] endKey = r.getRegionInfo().getEndKey(); + byte[] startKeyBytes = startKey; + if (startKeyBytes == null || startKeyBytes.length == 0) { + startKeyBytes = START_KEY_BYTES; + } + return addContent(new HRegionIncommon(r), Bytes.toString(columnFamily), null, + startKeyBytes, endKey, -1); + } + + + /** + * Add content to region r on the passed column + * column. Adds data of the from 'aaa', 'aab', etc where + * key and value are the same. + * + * @param updater An instance of {@link Incommon}. + * @param columnFamily + * @return count of what we added. + * @throws IOException + */ + public static long addContent(final Incommon updater, + final String columnFamily) throws IOException { + return addContent(updater, columnFamily, START_KEY_BYTES, null); + } + + public static long addContent(final Incommon updater, final String family, + final String column) throws IOException { + return addContent(updater, family, column, START_KEY_BYTES, null); + } + + /** + * Add content to region r on the passed column + * column. Adds data of the from 'aaa', 'aab', etc where + * key and value are the same. + * + * @param updater An instance of {@link Incommon}. + * @param columnFamily + * @param startKeyBytes Where to start the rows inserted + * @param endKey Where to stop inserting rows. + * @return count of what we added. + * @throws IOException + */ + public static long addContent(final Incommon updater, final String columnFamily, + final byte[] startKeyBytes, final byte[] endKey) + throws IOException { + return addContent(updater, columnFamily, null, startKeyBytes, endKey, -1); + } + + public static long addContent(final Incommon updater, final String family, + final String column, final byte[] startKeyBytes, + final byte[] endKey) throws IOException { + return addContent(updater, family, column, startKeyBytes, endKey, -1); + } + + /** + * Add content to region r on the passed column + * column. Adds data of the from 'aaa', 'aab', etc where + * key and value are the same. + * + * @param updater An instance of {@link Incommon}. + * @param column + * @param startKeyBytes Where to start the rows inserted + * @param endKey Where to stop inserting rows. + * @param ts Timestamp to write the content with. + * @return count of what we added. + * @throws IOException + */ + public static long addContent(final Incommon updater, + final String columnFamily, + final String column, + final byte[] startKeyBytes, final byte[] endKey, final long ts) + throws IOException { + long count = 0; + // Add rows of three characters. The first character starts with the + // 'a' character and runs up to 'z'. Per first character, we run the + // second character over same range. And same for the third so rows + // (and values) look like this: 'aaa', 'aab', 'aac', etc. + char secondCharStart = (char) startKeyBytes[1]; + char thirdCharStart = (char) startKeyBytes[2]; + EXIT: + for (char c = (char) startKeyBytes[0]; c <= LAST_CHAR; c++) { + for (char d = secondCharStart; d <= LAST_CHAR; d++) { + for (char e = thirdCharStart; e <= LAST_CHAR; e++) { + byte[] t = new byte[]{(byte) c, (byte) d, (byte) e}; + if (endKey != null && endKey.length > 0 + && Bytes.compareTo(endKey, t) <= 0) { + break EXIT; + } + try { + Put put; + if (ts != -1) { + put = new Put(t, ts, null); + } else { + put = new Put(t); + } + try { + StringBuilder sb = new StringBuilder(); + if (column != null && column.contains(":")) { + sb.append(column); + } else { + if (columnFamily != null) { + sb.append(columnFamily); + if (!columnFamily.endsWith(":")) { + sb.append(":"); + } + if (column != null) { + sb.append(column); + } + } + } + byte[][] split = + KeyValue.parseColumn(Bytes.toBytes(sb.toString())); + if (split.length == 1) { + put.add(split[0], new byte[0], t); + } else { + put.add(split[0], split[1], t); + } + updater.put(put); + count++; + } catch (RuntimeException ex) { + ex.printStackTrace(); + throw ex; + } catch (IOException ex) { + ex.printStackTrace(); + throw ex; + } + } catch (RuntimeException ex) { + ex.printStackTrace(); + throw ex; + } catch (IOException ex) { + ex.printStackTrace(); + throw ex; + } + } + // Set start character back to FIRST_CHAR after we've done first loop. + thirdCharStart = FIRST_CHAR; + } + secondCharStart = FIRST_CHAR; + } + return count; + } + } Index: src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.Iterator; import java.util.List; import java.util.NavigableMap; @@ -39,10 +38,8 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -59,11 +56,6 @@ **/ private static final String TEST_DIRECTORY_KEY = "test.build.data"; -/* - protected final static byte [] fam1 = Bytes.toBytes("colfamily1"); - protected final static byte [] fam2 = Bytes.toBytes("colfamily2"); - protected final static byte [] fam3 = Bytes.toBytes("colfamily3"); -*/ protected final static byte [] fam1 = Bytes.toBytes("colfamily11"); protected final static byte [] fam2 = Bytes.toBytes("colfamily21"); protected final static byte [] fam3 = Bytes.toBytes("colfamily31"); @@ -238,357 +230,7 @@ return htd; } - /** - * Add content to region r on the passed column - * column. - * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param r - * @param columnFamily - * @throws IOException - * @return count of what we added. - */ - protected static long addContent(final HRegion r, final byte [] columnFamily) - throws IOException { - byte [] startKey = r.getRegionInfo().getStartKey(); - byte [] endKey = r.getRegionInfo().getEndKey(); - byte [] startKeyBytes = startKey; - if (startKeyBytes == null || startKeyBytes.length == 0) { - startKeyBytes = START_KEY_BYTES; - } - return addContent(new HRegionIncommon(r), Bytes.toString(columnFamily), null, - startKeyBytes, endKey, -1); - } - - /** - * Add content to region r on the passed column - * column. - * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Incommon}. - * @param columnFamily - * @throws IOException - * @return count of what we added. - */ - protected static long addContent(final Incommon updater, - final String columnFamily) throws IOException { - return addContent(updater, columnFamily, START_KEY_BYTES, null); - } - - protected static long addContent(final Incommon updater, final String family, - final String column) throws IOException { - return addContent(updater, family, column, START_KEY_BYTES, null); - } - - /** - * Add content to region r on the passed column - * column. - * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Incommon}. - * @param columnFamily - * @param startKeyBytes Where to start the rows inserted - * @param endKey Where to stop inserting rows. - * @return count of what we added. - * @throws IOException - */ - protected static long addContent(final Incommon updater, final String columnFamily, - final byte [] startKeyBytes, final byte [] endKey) - throws IOException { - return addContent(updater, columnFamily, null, startKeyBytes, endKey, -1); - } - - protected static long addContent(final Incommon updater, final String family, - final String column, final byte [] startKeyBytes, - final byte [] endKey) throws IOException { - return addContent(updater, family, column, startKeyBytes, endKey, -1); - } - - /** - * Add content to region r on the passed column - * column. - * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Incommon}. - * @param column - * @param startKeyBytes Where to start the rows inserted - * @param endKey Where to stop inserting rows. - * @param ts Timestamp to write the content with. - * @return count of what we added. - * @throws IOException - */ - protected static long addContent(final Incommon updater, - final String columnFamily, - final String column, - final byte [] startKeyBytes, final byte [] endKey, final long ts) - throws IOException { - long count = 0; - // Add rows of three characters. The first character starts with the - // 'a' character and runs up to 'z'. Per first character, we run the - // second character over same range. And same for the third so rows - // (and values) look like this: 'aaa', 'aab', 'aac', etc. - char secondCharStart = (char)startKeyBytes[1]; - char thirdCharStart = (char)startKeyBytes[2]; - EXIT: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) { - for (char d = secondCharStart; d <= LAST_CHAR; d++) { - for (char e = thirdCharStart; e <= LAST_CHAR; e++) { - byte [] t = new byte [] {(byte)c, (byte)d, (byte)e}; - if (endKey != null && endKey.length > 0 - && Bytes.compareTo(endKey, t) <= 0) { - break EXIT; - } - try { - Put put; - if(ts != -1) { - put = new Put(t, ts, null); - } else { - put = new Put(t); - } - try { - StringBuilder sb = new StringBuilder(); - if (column != null && column.contains(":")) { - sb.append(column); - } else { - if (columnFamily != null) { - sb.append(columnFamily); - if (!columnFamily.endsWith(":")) { - sb.append(":"); - } - if (column != null) { - sb.append(column); - } - } - } - byte[][] split = - KeyValue.parseColumn(Bytes.toBytes(sb.toString())); - if(split.length == 1) { - put.add(split[0], new byte[0], t); - } else { - put.add(split[0], split[1], t); - } - updater.put(put); - count++; - } catch (RuntimeException ex) { - ex.printStackTrace(); - throw ex; - } catch (IOException ex) { - ex.printStackTrace(); - throw ex; - } - } catch (RuntimeException ex) { - ex.printStackTrace(); - throw ex; - } catch (IOException ex) { - ex.printStackTrace(); - throw ex; - } - } - // Set start character back to FIRST_CHAR after we've done first loop. - thirdCharStart = FIRST_CHAR; - } - secondCharStart = FIRST_CHAR; - } - return count; - } - - /** - * Implementors can flushcache. - */ - public static interface FlushCache { - /** - * @throws IOException - */ - public void flushcache() throws IOException; - } - - /** - * Interface used by tests so can do common operations against an HTable - * or an HRegion. - * - * TOOD: Come up w/ a better name for this interface. - */ - public static interface Incommon { - /** - * - * @param delete - * @param lockid - * @param writeToWAL - * @throws IOException - */ - public void delete(Delete delete, Integer lockid, boolean writeToWAL) - throws IOException; - - /** - * @param put - * @throws IOException - */ - public void put(Put put) throws IOException; - - public Result get(Get get) throws IOException; - - /** - * @param family - * @param qualifiers - * @param firstRow - * @param ts - * @return scanner for specified columns, first row and timestamp - * @throws IOException - */ - public ScannerIncommon getScanner(byte [] family, byte [][] qualifiers, - byte [] firstRow, long ts) - throws IOException; - } - - /** - * A class that makes a {@link Incommon} out of a {@link HRegion} - */ - public static class HRegionIncommon implements Incommon, FlushCache { - final HRegion region; - - /** - * @param HRegion - */ - public HRegionIncommon(final HRegion HRegion) { - this.region = HRegion; - } - - public void put(Put put) throws IOException { - region.put(put); - } - - public void delete(Delete delete, Integer lockid, boolean writeToWAL) - throws IOException { - this.region.delete(delete, lockid, writeToWAL); - } - - public Result get(Get get) throws IOException { - return region.get(get, null); - } - - public ScannerIncommon getScanner(byte [] family, byte [][] qualifiers, - byte [] firstRow, long ts) - throws IOException { - Scan scan = new Scan(firstRow); - if(qualifiers == null || qualifiers.length == 0) { - scan.addFamily(family); - } else { - for(int i=0; i { - public boolean next(List values) - throws IOException; - - public void close() throws IOException; - } - - public static class ClientScannerIncommon implements ScannerIncommon { - ResultScanner scanner; - public ClientScannerIncommon(ResultScanner scanner) { - this.scanner = scanner; - } - - public boolean next(List values) - throws IOException { - Result results = scanner.next(); - if (results == null) { - return false; - } - values.clear(); - values.addAll(results.list()); - return true; - } - - public void close() throws IOException { - scanner.close(); - } - - @SuppressWarnings("unchecked") - public Iterator iterator() { - return scanner.iterator(); - } - } - - public static class InternalScannerIncommon implements ScannerIncommon { - InternalScanner scanner; - - public InternalScannerIncommon(InternalScanner scanner) { - this.scanner = scanner; - } - - public boolean next(List results) - throws IOException { - return scanner.next(results); - } - - public void close() throws IOException { - scanner.close(); - } - - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - } - -// protected void assertCellEquals(final HRegion region, final byte [] row, + // protected void assertCellEquals(final HRegion region, final byte [] row, // final byte [] column, final long timestamp, final String value) // throws IOException { // Map result = region.getFull(row, null, timestamp, 1, null); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java (working copy) @@ -27,13 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -47,6 +41,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.HBaseTestingUtility.HRegionIncommon; +import static org.apache.hadoop.hbase.HBaseTestingUtility.addContent; +import org.apache.hadoop.hbase.HBaseTestingUtility.ScannerIncommon; /** * Test of a long-lived scanner validating as we go. */ Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; @@ -162,7 +163,7 @@ long ... expected) throws IOException { store.forceMajor = forcemajor; - List actual = store.compactSelection(candidates); + List actual = store.compactSelection(candidates, Store.NO_PRIORITY); store.forceMajor = false; assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); } @@ -192,7 +193,7 @@ */ // don't exceed max file compact threshold assertEquals(maxFiles, - store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size()); + store.compactSelection(sfCreate(7,6,5,4,3,2,1), Store.NO_PRIORITY).size()); /* MAJOR COMPACTION */ // if a major compaction has been forced, then compact everything @@ -204,7 +205,7 @@ // don't exceed max file compact threshold, even with major compaction store.forceMajor = true; assertEquals(maxFiles, - store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size()); + store.compactSelection(sfCreate(7,6,5,4,3,2,1), Store.NO_PRIORITY).size()); store.forceMajor = false; // if we exceed maxCompactSize, downgrade to minor @@ -225,11 +226,13 @@ compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); // reference files should obey max file compact to avoid OOM assertEquals(maxFiles, - store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size()); + store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), Store.NO_PRIORITY).size()); // empty case compactEquals(new ArrayList() /* empty */); // empty case (because all files are too big) compactEquals(sfCreate(tooBig, tooBig) /* empty */); + } + } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -19,19 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.fail; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,23 +26,27 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.BeforeClass; + +import org.apache.hadoop.hbase.HBaseTestingUtility.HRegionIncommon; +import static org.apache.hadoop.hbase.HBaseTestingUtility.addContent; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + + /** * Test compactions */ @@ -66,22 +57,26 @@ private HRegion r = null; private Path compactionDir = null; private Path regionCompactionDir = null; - private static final byte [] COLUMN_FAMILY = fam1; - private final byte [] STARTROW = Bytes.toBytes(START_KEY); - private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; + private static final byte[] COLUMN_FAMILY = fam1; + private final byte[] STARTROW = Bytes.toBytes(START_KEY); + private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; private int compactionThreshold; private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; final private byte[] col1, col2; + private static final long MAX_FILES_TO_COMPACT = 10; - /** constructor */ + /** + * constructor + */ public TestCompaction() throws Exception { super(); // Set cache flush size to 1MB - conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024); + conf.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024); conf.setInt("hbase.hregion.memstore.block.multiplier", 100); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + this.conf.setLong("hbase.hstore.compaction.max.size", MAX_FILES_TO_COMPACT); firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); @@ -110,9 +105,9 @@ } /** - * Test that on a major compaction, if all cells are expired or deleted, then - * we'll end up with no product. Make sure scanner over region returns - * right answer in this case - and that it just basically works. + * Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no product. Make sure + * scanner over region returns right answer in this case - and that it just basically works. + * * @throws IOException */ public void testMajorCompactingToNoOutput() throws IOException { @@ -127,7 +122,7 @@ boolean result = s.next(results); r.delete(new Delete(results.get(0).getRow()), null, false); if (!result) break; - } while(true); + } while (true); // Flush r.flushcache(); // Major compact. @@ -139,13 +134,13 @@ boolean result = s.next(results); if (!result) break; counter++; - } while(true); + } while (true); assertEquals(0, counter); } /** - * Run compaction and flushing memstore - * Assert deletes get cleaned up. + * Run compaction and flushing memstore Assert deletes get cleaned up. + * * @throws Exception */ public void testMajorCompaction() throws Exception { @@ -164,7 +159,7 @@ assertEquals(compactionThreshold, result.size()); // see if CompactionProgress is in place but null - for (Store store: this.r.stores.values()) { + for (Store store : this.r.stores.values()) { assertNull(store.getCompactionProgress()); } @@ -173,19 +168,19 @@ // see if CompactionProgress has done its thing on at least one store int storeCount = 0; - for (Store store: this.r.stores.values()) { + for (Store store : this.r.stores.values()) { CompactionProgress progress = store.getCompactionProgress(); - if( progress != null ) { + if (progress != null) { ++storeCount; - assert(progress.currentCompactedKVs > 0); - assert(progress.totalCompactingKVs > 0); + assert (progress.currentCompactedKVs > 0); + assert (progress.totalCompactingKVs > 0); } - assert(storeCount > 0); + assert (storeCount > 0); } // look at the second row // Increment the least significant character so we get to next row. - byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); + byte[] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING); secondRowBytes[START_KEY_BYTES.length - 1]++; // Always 3 versions if that is what max versions is. @@ -198,42 +193,42 @@ // should result in a compacted store file that has no references to the // deleted row. Delete delete = new Delete(secondRowBytes, System.currentTimeMillis(), null); - byte [][] famAndQf = {COLUMN_FAMILY, null}; + byte[][] famAndQf = {COLUMN_FAMILY, null}; delete.deleteFamily(famAndQf[0]); r.delete(delete, null, true); // Assert deleted. - result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); + result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertTrue("Second row should have been deleted", result.isEmpty()); r.flushcache(); - result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); + result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertTrue("Second row should have been deleted", result.isEmpty()); // Add a bit of data and flush. Start adding at 'bbb'. createSmallerStoreFile(this.r); r.flushcache(); // Assert that the second row is still deleted. - result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); + result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertTrue("Second row should still be deleted", result.isEmpty()); // Force major compaction. r.compactStores(true); assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1); - result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null ); + result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertTrue("Second row should still be deleted", result.isEmpty()); // Make sure the store files do have some 'aaa' keys in them -- exactly 3. // Also, that compacted store files do not have any secondRowBytes because // they were deleted. - verifyCounts(3,0); + verifyCounts(3, 0); // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped final int ttlInSeconds = 1; - for (Store store: this.r.stores.values()) { + for (Store store : this.r.stores.values()) { store.ttl = ttlInSeconds * 1000; } Thread.sleep(ttlInSeconds * 1000); @@ -247,12 +242,14 @@ Delete deleteRow = new Delete(secondRowBytes); testMinorCompactionWithDelete(deleteRow); } + public void testMinorCompactionWithDeleteColumn1() throws Exception { Delete dc = new Delete(secondRowBytes); /* delete all timestamps in the column */ dc.deleteColumns(fam2, col2); testMinorCompactionWithDelete(dc); } + public void testMinorCompactionWithDeleteColumn2() throws Exception { Delete dc = new Delete(secondRowBytes); dc.deleteColumn(fam2, col2); @@ -265,11 +262,13 @@ //testMinorCompactionWithDelete(dc, 2); testMinorCompactionWithDelete(dc, 3); } + public void testMinorCompactionWithDeleteColumnFamily() throws Exception { Delete deleteCF = new Delete(secondRowBytes); deleteCF.deleteFamily(fam2); testMinorCompactionWithDelete(deleteCF); } + public void testMinorCompactionWithDeleteVersion1() throws Exception { Delete deleteVersion = new Delete(secondRowBytes); deleteVersion.deleteColumns(fam2, col2, 2); @@ -278,6 +277,7 @@ */ testMinorCompactionWithDelete(deleteVersion, 1); } + public void testMinorCompactionWithDeleteVersion2() throws Exception { Delete deleteVersion = new Delete(secondRowBytes); deleteVersion.deleteColumn(fam2, col2, 1); @@ -299,6 +299,7 @@ private void testMinorCompactionWithDelete(Delete delete) throws Exception { testMinorCompactionWithDelete(delete, 0); } + private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception { HRegionIncommon loader = new HRegionIncommon(r); for (int i = 0; i < compactionThreshold + 1; i++) { @@ -360,25 +361,25 @@ private void verifyCounts(int countRow1, int countRow2) throws Exception { int count1 = 0; int count2 = 0; - for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { + for (StoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); scanner.seekTo(); do { - byte [] row = scanner.getKeyValue().getRow(); + byte[] row = scanner.getKeyValue().getRow(); if (Bytes.equals(row, STARTROW)) { count1++; - } else if(Bytes.equals(row, secondRowBytes)) { + } else if (Bytes.equals(row, secondRowBytes)) { count2++; } - } while(scanner.next()); + } while (scanner.next()); } - assertEquals(countRow1,count1); - assertEquals(countRow2,count2); + assertEquals(countRow1, count1); + assertEquals(countRow2, count2); } /** - * Verify that you can stop a long-running compaction - * (used during RS shutdown) + * Verify that you can stop a long-running compaction (used during RS shutdown) + * * @throws Exception */ public void testInterruptCompaction() throws Exception { @@ -386,12 +387,12 @@ // lower the polling interval for this test int origWI = Store.closeCheckInterval; - Store.closeCheckInterval = 10*1000; // 10 KB + Store.closeCheckInterval = 10 * 1000; // 10 KB try { // Create a couple store files w/ 15KB (over 10KB interval) - int jmax = (int) Math.ceil(15.0/compactionThreshold); - byte [] pad = new byte[1000]; // 1 KB chunk + int jmax = (int) Math.ceil(15.0 / compactionThreshold); + byte[] pad = new byte[1000]; // 1 KB chunk for (int i = 0; i < compactionThreshold; i++) { HRegionIncommon loader = new HRegionIncommon(r); Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); @@ -418,7 +419,7 @@ // ensure that the compaction stopped, all old files are intact, Store s = r.stores.get(COLUMN_FAMILY); assertEquals(compactionThreshold, s.getStorefilesCount()); - assertTrue(s.getStorefilesSize() > 15*1000); + assertTrue(s.getStorefilesSize() > 15 * 1000); // and no new store files persisted past compactStores() FileStatus[] ls = FileSystem.get(conf).listStatus(r.getTmpDir()); assertEquals(0, ls.length); @@ -431,7 +432,7 @@ // Delete all Store information once done using for (int i = 0; i < compactionThreshold; i++) { Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); - byte [][] famAndQf = {COLUMN_FAMILY, null}; + byte[][] famAndQf = {COLUMN_FAMILY, null}; delete.deleteFamily(famAndQf[0]); r.delete(delete, null, true); } @@ -440,7 +441,7 @@ // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped final int ttlInSeconds = 1; - for (Store store: this.r.stores.values()) { + for (Store store : this.r.stores.values()) { store.ttl = ttlInSeconds * 1000; } Thread.sleep(ttlInSeconds * 1000); @@ -452,7 +453,7 @@ private int count() throws IOException { int count = 0; - for (StoreFile f: this.r.stores. + for (StoreFile f : this.r.stores. get(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); if (!scanner.seekTo()) { @@ -460,7 +461,7 @@ } do { count++; - } while(scanner.next()); + } while (scanner.next()); } return count; } @@ -474,7 +475,7 @@ private void createSmallerStoreFile(final HRegion region) throws IOException { HRegionIncommon loader = new HRegionIncommon(region); addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" + - "bbb").getBytes(), null); + "bbb").getBytes(), null); loader.flushcache(); } @@ -514,4 +515,35 @@ fail("testCompactionWithCorruptResult failed since no exception was" + "thrown while completing a corrupt file"); } + + /** + * Test for HBASE-5920 - Test user requested major compactions always occurring + */ + public void testNonUserMajorCompactionRequest() throws Exception { + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + store.triggerMajorCompaction(); + + CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY); + assertNotNull("Expected to receive a compaction request", request); + assertEquals("System-requested major compaction should not occur if there are too many store files", false, request.isMajor()); + } + + /** + * Test for HBASE-5920 + */ + public void testUserMajorCompactionRequest() throws IOException{ + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + store.triggerMajorCompaction(); + CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER); + assertNotNull("Expected to receive a compaction request", request); + assertEquals("User-requested major compaction should always occur, even if there are too many store files", true, request.isMajor()); + } } Index: src/test/java/org/apache/hadoop/hbase/TimestampTestBase.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TimestampTestBase.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/TimestampTestBase.java (working copy) @@ -26,6 +26,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HBaseTestingUtility.Incommon; +import org.apache.hadoop.hbase.HBaseTestingUtility.FlushCache; +import org.apache.hadoop.hbase.HBaseTestingUtility.ScannerIncommon; + /** * Tests user specifiable time stamps putting, getting and scanning. Also * tests same in presence of deletes. Test cores are written so can be Index: src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java (revision 1339029) +++ src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java (working copy) @@ -30,9 +30,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestCase.FlushCache; -import org.apache.hadoop.hbase.HBaseTestCase.HTableIncommon; -import org.apache.hadoop.hbase.HBaseTestCase.Incommon; +import org.apache.hadoop.hbase.HBaseTestingUtility.FlushCache; +import org.apache.hadoop.hbase.HBaseTestingUtility.HTableIncommon; +import org.apache.hadoop.hbase.HBaseTestingUtility.Incommon; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1339029) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -49,12 +49,6 @@ private final ThreadPoolExecutor splits; private final long throttleSize; - /* The default priority for user-specified compaction requests. - * The user gets top priority unless we have blocking compactions. (Pri <= 0) - */ - public static final int PRIORITY_USER = 1; - public static final int NO_PRIORITY = Integer.MIN_VALUE; - /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -145,7 +139,7 @@ public synchronized boolean requestSplit(final HRegion r) { // don't split regions that are blocking - if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { + if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) { byte[] midKey = r.checkSplit(); if (midKey != null) { requestSplit(r, midKey); @@ -174,13 +168,13 @@ public synchronized void requestCompaction(final HRegion r, final String why) { for(Store s : r.getStores().values()) { - requestCompaction(r, s, why, NO_PRIORITY); + requestCompaction(r, s, why, Store.NO_PRIORITY); } } public synchronized void requestCompaction(final HRegion r, final Store s, final String why) { - requestCompaction(r, s, why, NO_PRIORITY); + requestCompaction(r, s, why, Store.NO_PRIORITY); } public synchronized void requestCompaction(final HRegion r, final String why, @@ -201,10 +195,10 @@ if (this.server.isStopped()) { return; } - CompactionRequest cr = s.requestCompaction(); + CompactionRequest cr = s.requestCompaction(priority); if (cr != null) { cr.setServer(server); - if (priority != NO_PRIORITY) { + if (priority != Store.NO_PRIORITY) { cr.setPriority(priority); } ThreadPoolExecutor pool = largeCompactions; @@ -222,6 +216,9 @@ + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); } + } else { + if(LOG.isDebugEnabled()) + LOG.debug("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1339029) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -88,6 +88,13 @@ */ public class Store implements HeapSize { static final Log LOG = LogFactory.getLog(Store.class); + + /* The default priority for user-specified compaction requests. + * The user gets top priority unless we have blocking compactions. (Pri <= 0) + */ + public static final int PRIORITY_USER = 1; + public static final int NO_PRIORITY = Integer.MIN_VALUE; + protected final MemStore memstore; // This stores directory in the filesystem. private final Path homedir; @@ -969,7 +976,7 @@ return ret; } - public CompactionRequest requestCompaction() { + public CompactionRequest requestCompaction(int priority) { // don't even select for compaction if writes are disabled if (!this.region.areWritesEnabled()) { return null; @@ -1000,7 +1007,7 @@ // coprocessor is overriding normal file selection filesToCompact = candidates; } else { - filesToCompact = compactSelection(candidates); + filesToCompact = compactSelection(candidates, priority); } if (region.getCoprocessorHost() != null) { @@ -1031,7 +1038,7 @@ } // everything went better than expected. create a compaction request - int pri = getCompactPriority(); + int pri = getCompactPriority(priority); ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); } } catch (IOException ex) { @@ -1064,11 +1071,13 @@ * "hbase.hstore.compaction.max" * max files to compact at once (avoids OOM) * + * * @param candidates candidate files, ordered from oldest to newest + * @param priority * @return subset copy of candidate list that meets compaction criteria * @throws IOException */ - List compactSelection(List candidates) + List compactSelection(List candidates, int priority) throws IOException { // ASSUMPTION!!! filesCompacting is locked when calling this function @@ -1103,8 +1112,14 @@ } // major compact on user action or age (caveat: we have too many files) - boolean majorcompaction = filesToCompact.size() < this.maxFilesToCompact - && (forcemajor || isMajorCompaction(filesToCompact)); + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction + boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) || + (filesToCompact.size() < this.maxFilesToCompact + && (forcemajor || isMajorCompaction(filesToCompact))); + LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + + this.storeNameStr + ": Initiating " + (majorcompaction ? "major" : "minor") + "compaction"); if (!majorcompaction && !hasReferences(filesToCompact)) { // we're doing a minor compaction, let's see what files are applicable @@ -1113,6 +1128,11 @@ // skip selection algorithm if we don't have enough files if (filesToCompact.size() < this.minFilesToCompact) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + filesToCompact.size() + + " files ready for compaction. Need " + this.minFilesToCompact + " to initiate."); + + } return Collections.emptyList(); } @@ -1169,10 +1189,17 @@ return Collections.emptyList(); } } else { - // all files included in this compaction, up to max - if (filesToCompact.size() > this.maxFilesToCompact) { - int pastMax = filesToCompact.size() - this.maxFilesToCompact; - filesToCompact.subList(0, pastMax).clear(); + if(!majorcompaction) { + // all files included in this compaction, up to max + if (filesToCompact.size() > this.maxFilesToCompact) { + int pastMax = filesToCompact.size() - this.maxFilesToCompact; + filesToCompact.subList(0, pastMax).clear(); + } + } else if (filesToCompact.size() > this.maxFilesToCompact) { + LOG.debug("Warning, compacting more than " + this.maxFilesToCompact + " files, probably because of a user-requested major compaction"); + if(priority != PRIORITY_USER) { + LOG.error("Compacting more than max files on a non user-requested compaction"); + } } } return filesToCompact; @@ -1808,9 +1835,15 @@ /** * @return The priority that this store should have in the compaction queue + * @param priority */ - public int getCompactPriority() { - return this.blockingStoreFileCount - this.storefiles.size(); + public int getCompactPriority(int priority) { + // If this is a user-requested compaction, leave this at the highest priority + if(priority == PRIORITY_USER) { + return PRIORITY_USER; + } else { + return this.blockingStoreFileCount - this.storefiles.size(); + } } HRegion getHRegion() { Index: src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (revision 1339029) +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (working copy) @@ -181,7 +181,7 @@ if (completed) { server.getMetrics().addCompaction(now - start, this.totalSize); // degenerate case: blocked regions require recursive enqueues - if (s.getCompactPriority() <= 0) { + if (s.getCompactPriority(Store.NO_PRIORITY) <= 0) { server.compactSplitThread .requestCompaction(r, s, "Recursive enqueue"); } else { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1339029) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -38,7 +38,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.SortedMap; @@ -151,8 +150,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import org.codehaus.jackson.map.ObjectMapper; -import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -2648,9 +2645,10 @@ if (major) { region.triggerMajorCompaction(); } + LOG.trace("User-triggered compaction requested for region " + region.getRegionNameAsString()); compactSplitThread.requestCompaction(region, "User-triggered " + (major ? "major " : "") + "compaction", - CompactSplitThread.PRIORITY_USER); + Store.PRIORITY_USER); } /** @return the info server */