diff --git src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
index beaff97..408db79 100644
--- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
+++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
@@ -185,7 +185,7 @@ public class CatalogTracker {
this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout);
}
- CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
+ public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
HConnection connection, Abortable abortable, final int defaultTimeout)
throws IOException {
this.connection = connection;
@@ -310,16 +310,6 @@ public class CatalogTracker {
}
/**
- * Waits indefinitely for availability of -ROOT-. Used during
- * cluster startup.
- * @throws InterruptedException if interrupted while waiting
- */
- public void waitForRoot()
- throws InterruptedException {
- this.rootRegionTracker.blockUntilAvailable();
- }
-
- /**
* Gets the current location for -ROOT- if available and waits
* for up to the specified timeout if not immediately available. Returns null
* if the timeout elapses before root is available.
@@ -330,7 +320,7 @@ public class CatalogTracker {
* @throws NotAllMetaRegionsOnlineException if root not available before
* timeout
*/
- ServerName waitForRoot(final long timeout)
+ public ServerName waitForRoot(final long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException {
ServerName sn = rootRegionTracker.waitRootRegionLocation(timeout);
if (sn == null) {
diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index b52e5d3..91562f7 100644
--- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -628,7 +628,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
- LruBlockCache.LOG.debug("LRU Stats: " +
+ LruBlockCache.LOG.debug("Stats: " +
"total=" + StringUtils.byteDesc(totalSize) + ", " +
"free=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " +
@@ -636,11 +636,11 @@ public class LruBlockCache implements BlockCache, HeapSize {
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" +
- (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) +
+ (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" +
- (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) +
+ (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount() + ", " +
"evictedPerRun=" + stats.evictedPerEviction());
diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index d47ef10..013f528 100644
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -346,6 +346,9 @@ public class AssignmentManager extends ZooKeeperListener {
// Returns servers who have not checked in (assumed dead) and their regions
Map>> deadServers = rebuildUserRegions(onlineServers);
+ // This method will assign all user regions if a clean server startup or
+ // it will reconstitute master state and cleanup any leftovers from
+ // previous master process.
processDeadServersAndRegionsInTransition(deadServers);
// Recover the tables that were not fully moved to DISABLED state.
@@ -380,7 +383,8 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Process all regions that are in transition in zookeeper and also
* processes the list of dead servers by scanning the META.
- * Used by master joining an cluster.
+ * Used by master joining an cluster. If we figure this is a clean cluster
+ * startup, will assign all user regions.
* @param deadServers
* Map of dead servers and their regions. Can be null.
* @throws KeeperException
@@ -395,8 +399,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Run through all regions. If they are not assigned and not in RIT, then
// its a clean cluster startup, else its a failover.
for (Map.Entry e: this.regions.entrySet()) {
- if (!e.getKey().isMetaTable()
- && e.getValue() != null) {
+ if (!e.getKey().isMetaTable() && e.getValue() != null) {
LOG.debug("Found " + e + " out on cluster");
this.failover = true;
break;
@@ -2127,7 +2130,7 @@ public class AssignmentManager extends ZooKeeperListener {
public void waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
synchronized(regions) {
- while(!regions.containsKey(regionInfo)) {
+ while(!this.master.isStopped() && !regions.containsKey(regionInfo)) {
// We should receive a notification, but it's
// better to have a timeout to recheck the condition here:
// it lowers the impact of a race condition if any
diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cd1755f..154db42 100644
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -154,6 +158,9 @@ Server {
// RPC server for the HMaster
private final RpcServer rpcServer;
+ // Set after we've called HBaseServer#openServer and ready to receive RPCs.
+ // Set back to false after we stop rpcServer. Used by tests.
+ private volatile boolean rpcServerOpen = false;
/**
* This servers address.
@@ -290,17 +297,18 @@ Server {
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.rpcServer.startThreads();
this.metrics = new MasterMetrics(getServerName().toString());
- // initialize instant schema change settings
- this.supportInstantSchemaChanges = conf.getBoolean(
- "hbase.instant.schema.alter.enabled", false);
- if (supportInstantSchemaChanges) {
- LOG.info("Instant schema change enabled. All schema alter operations will " +
- "happen through ZK.");
- }
- else {
- LOG.info("Instant schema change disabled. All schema alter operations will " +
- "happen normally.");
- }
+ this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf);
+ }
+
+ /**
+ * Get whether instant schema change is on or not.
+ * @param c
+ * @return True if instant schema enabled.
+ */
+ private boolean getSupportInstantSchemaChanges(final Configuration c) {
+ boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false);
+ LOG.debug("Instant schema change enabled=" + b + ".");
+ return b;
}
/**
@@ -418,7 +426,7 @@ Server {
*/
private void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
- this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
+ this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf,
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
this.catalogTracker.start();
@@ -452,8 +460,27 @@ Server {
", cluster-up flag was=" + wasUp);
}
+ /**
+ * Create CatalogTracker.
+ * In its own method so can intercept and mock it over in tests.
+ * @param zk If zk is null, we'll create an instance (and shut it down
+ * when {@link #stop()} is called) else we'll use what is passed.
+ * @param conf
+ * @param abortable If fatal exception we'll call abort on this. May be null.
+ * If it is we'll use the Connection associated with the passed
+ * {@link Configuration} as our {@link Abortable}.
+ * @param defaultTimeout Timeout to use. Pass zero for no timeout
+ * ({@link Object#wait(long)} when passed a 0 waits for ever).
+ * @throws IOException
+ */
+ CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
+ final Configuration conf, Abortable abortable, final int defaultTimeout)
+ throws IOException {
+ return new CatalogTracker(zk, conf, abortable, defaultTimeout);
+ }
+
// Check if we should stop every second.
- private Sleeper stopSleeper = new Sleeper(1000, this);
+ private Sleeper stopSleeper = new Sleeper(100, this);
private void loop() {
while (!this.stopped) {
stopSleeper.sleep();
@@ -505,7 +532,7 @@ Server {
this.executorService = new ExecutorService(getServerName().toString());
- this.serverManager = new ServerManager(this, this);
+ this.serverManager = createServerManager(this, this);
status.setStatus("Initializing ZK system trackers");
initializeZKBasedSystemTrackers();
@@ -537,7 +564,7 @@ Server {
splitLogAfterStartup(this.fileSystemManager, onlineServers);
// Make sure root and meta assigned before proceeding.
- assignRootAndMeta(status);
+ if (!assignRootAndMeta(status)) return;
serverShutdownHandlerEnabled = true;
this.serverManager.expireDeadNotExpiredServers();
@@ -596,14 +623,30 @@ Server {
}
/**
+ * Create a {@link ServerManager} instance.
+ * @param master
+ * @param services
+ * @return An instance of {@link ServerManager}
+ * @throws ZooKeeperConnectionException
+ * @throws IOException
+ */
+ ServerManager createServerManager(final Server master,
+ final MasterServices services)
+ throws IOException {
+ // We put this out here in a method so can do a Mockito.spy and stub it out
+ // w/ a mocked up ServerManager.
+ return new ServerManager(master, services);
+ }
+
+ /**
* Check -ROOT- and .META. are assigned. If not,
* assign them.
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
- * @return Count of regions we assigned.
+ * @return True if root and meta are healthy, assigned
*/
- int assignRootAndMeta(MonitoredTask status)
+ boolean assignRootAndMeta(MonitoredTask status)
throws InterruptedException, IOException, KeeperException {
int assigned = 0;
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
@@ -617,8 +660,9 @@ Server {
currentRootServer = this.catalogTracker.getRootLocation();
splitLogAndExpireIfOnline(currentRootServer);
this.assignmentManager.assignRoot();
- this.catalogTracker.waitForRoot();
- //This guarantees that the transition has completed
+ // Make sure a -ROOT- location is set.
+ if (!isRootLocation()) return false;
+ // This guarantees that the transition assigning -ROOT- has completed
this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO);
assigned++;
} else {
@@ -629,6 +673,8 @@ Server {
// Enable the ROOT table if on process fail over the RS containing ROOT
// was active.
enableCatalogTables(Bytes.toString(HConstants.ROOT_TABLE_NAME));
+ // Check for stopped, just in case
+ if (this.stopped) return false;
LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit +
", location=" + catalogTracker.getRootLocation());
@@ -658,7 +704,25 @@ Server {
LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
", location=" + catalogTracker.getMetaLocation());
status.setStatus("META and ROOT assigned.");
- return assigned;
+ return true;
+ }
+
+ /**
+ * @return True if there a root available
+ * @throws InterruptedException
+ */
+ private boolean isRootLocation() throws InterruptedException {
+ // Cycle up here in master rather than down in catalogtracker so we can
+ // check the master stopped flag every so often.
+ while (!this.stopped) {
+ try {
+ if (this.catalogTracker.waitForRoot(100) != null) break;
+ } catch (NotAllMetaRegionsOnlineException e) {
+ // Ignore. I know -ROOT- is not online yet.
+ }
+ }
+ // We got here because we came of above loop.
+ return !this.stopped;
}
private void enableCatalogTables(String catalogTableName) {
@@ -793,7 +857,7 @@ Server {
* as OOMEs; it should be lightly loaded. See what HRegionServer does if
* need to install an unexpected exception handler.
*/
- private void startServiceThreads() throws IOException{
+ void startServiceThreads() throws IOException{
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
@@ -831,10 +895,18 @@ Server {
// Start allowing requests to happen.
this.rpcServer.openServer();
+ this.rpcServerOpen = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Started service threads");
}
+ }
+ /**
+ * Use this when trying to figure when its ok to send in rpcs. Used by tests.
+ * @return True if we have successfully run {@link HBaseServer#openServer()}
+ */
+ boolean isRpcServerOpen() {
+ return this.rpcServerOpen;
}
private void stopServiceThreads() {
@@ -842,6 +914,7 @@ Server {
LOG.debug("Stopping service threads");
}
if (this.rpcServer != null) this.rpcServer.stop();
+ this.rpcServerOpen = false;
// Clean up and close up shop
if (this.logCleaner!= null) this.logCleaner.interrupt();
if (this.infoServer != null) {
@@ -908,7 +981,7 @@ Server {
final long serverStartCode, final long serverCurrentTime)
throws IOException {
// Register with server manager
- InetAddress ia = HBaseServer.getRemoteIp();
+ InetAddress ia = getRemoteInetAddress(port, serverStartCode);
ServerName rs = this.serverManager.regionServerStartup(ia, port,
serverStartCode, serverCurrentTime);
// Send back some config info
@@ -919,6 +992,17 @@ Server {
}
/**
+ * @return Get remote side's InetAddress
+ * @throws UnknownHostException
+ */
+ InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
+ throws UnknownHostException {
+ // Do it out here in its own little method so can fake an address when
+ // mocking up in tests.
+ return HBaseServer.getRemoteIp();
+ }
+
+ /**
* @return Subset of configuration to pass initializing regionservers: e.g.
* the filesystem to use and root directory to use.
*/
@@ -1255,8 +1339,6 @@ Server {
LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
+ Bytes.toString(tableName) + " Alter Status = "
+ alterStatus.toString());
- int numberPending = alterStatus.getNumberOfRegionsToProcess() -
- alterStatus.getNumberOfRegionsProcessed();
return new Pair(alterStatus.getNumberOfRegionsProcessed(),
alterStatus.getNumberOfRegionsToProcess());
} else {
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
new file mode 100644
index 0000000..264a7ae
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -0,0 +1,257 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.mockito.Mockito;
+
+/**
+ * Compact passed set of files.
+ */
+public class CompactionTool implements Tool {
+ private static final Log LOG = LogFactory.getLog(CompactionTool.class);
+ private Configuration conf;
+ private CompactionProgress progress;
+
+ CompactionTool() {
+ super();
+ }
+
+ /**
+ * Do a minor/major compaction on an explicit set of storefiles from a Store.
+ *
+ * @param store Store the files belong to
+ * @param filesToCompact which files to compact
+ * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
+ * @throws IOException
+ */
+ StoreFile.Writer compact(final Store store,
+ final Collection filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // Calculate maximum key count after compaction (for blooms)
+ // Also calculate earliest put timestamp if major compaction
+ int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ for (StoreFile file: filesToCompact) {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ LOG.warn("Null reader for " + file.getPath());
+ continue;
+ }
+ // NOTE: getFilterEntries could cause under-sized blooms if the user
+ // switches bloom type (e.g. from ROW to ROWCOL)
+ long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
+ r.getFilterEntries() : r.getEntries();
+ maxKeyCount += keyCount;
+ // For major compactions calculate the earliest put timestamp of all
+ // involved storefiles. This is used to remove family delete marker during
+ // compaction.
+ if (majorCompaction) {
+ byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // There's a file with no information, must be an old one
+ // assume we have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting " + file +
+ ", keycount=" + keyCount +
+ ", bloomtype=" + r.getBloomFilterType().toString() +
+ ", size=" + StringUtils.humanReadableInt(r.length()) +
+ ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+ (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
+ }
+ }
+
+ // keep track of compaction progress
+ this.progress = new CompactionProgress(maxKeyCount);
+
+ // For each file, obtain a scanner:
+ List scanners = StoreFileScanner
+ .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+ // Get some configs
+ int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
+ Compression.Algorithm compression = store.getFamily().getCompression();
+ // Avoid overriding compression setting for major compactions if the user
+ // has not specified it separately
+ Compression.Algorithm compactionCompression =
+ (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
+ store.getFamily().getCompactionCompression(): compression;
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ StoreFile.Writer writer = null;
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+ MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+ try {
+ InternalScanner scanner = null;
+ try {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ /* Include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(store, scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ smallestReadPoint, earliestPutTs);
+ if (store.getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ store.getHRegion().getCoprocessorHost().preCompact(store, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
+
+ int bytesWritten = 0;
+ // Since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ ArrayList kvs = new ArrayList();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs, compactionKVMax);
+ if (writer == null && !kvs.isEmpty()) {
+ writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
+ }
+ if (writer != null) {
+ // output to writer:
+ for (KeyValue kv : kvs) {
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ kv.setMemstoreTS(0);
+ }
+ writer.append(kv);
+ // update progress per key
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (Store.closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > Store.closeCheckInterval) {
+ bytesWritten = 0;
+ if (!store.getHRegion().areWritesEnabled()) {
+ writer.close();
+ store.getFileSystem().delete(writer.getPath(), false);
+ throw new InterruptedIOException(
+ "Aborting compaction of store " + store +
+ " in region " + store.getHRegion() +
+ " because user requested stop.");
+ }
+ }
+ }
+ }
+ }
+ kvs.clear();
+ } while (hasMore);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.appendMetadata(maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
+ }
+
+ CompactionProgress getProgress() {
+ return this.progress;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration c) {
+ this.conf = c;
+ }
+
+ void usage() {
+
+ }
+
+ @Override
+ public int run(final String[] args) throws Exception {
+ if (args.length == 0) usage();
+ FileSystem fs = FileSystem.get(getConf());
+ List files = new ArrayList();
+ for (String s: args) {
+ files.add(fs.getFileStatus(new Path(s)));
+ }
+ // Pass anything for hbase.rootdir. We should not be writing anything there.
+ // We are going to fake out Store on where everything is.
+ Path hbaseRootdir = new Path(getConf().get("hbase.tmp.dir"));
+ // TODO: Let this be configurable from command-line setting versions, etc.
+ HColumnDescriptor hcd = new HColumnDescriptor("f");
+ HRegionInfo hri = new HRegionInfo(Bytes.toBytes("t"));
+ // Mock up an HRegion instance.
+ HRegion mockedHRegion = Mockito.mock(HRegion.class);
+ Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(hri);
+ // Create a Store w/ check of hbase.rootdir blanked out and return our
+ // list of files instead of have Store search its home dir.
+ Store s = new Store(hbaseRootdir, mockedHRegion, hcd, fs, getConf()) {
+ @Override
+ FileStatus[] getStoreFiles() throws IOException {
+ return null;
+ }
+
+ @Override
+ Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException {
+ return homedir;
+ }
+ };
+ // Now we have a Store, run a compaction of passed files.
+ try {
+ CompactSelection selection =
+ new CompactSelection(getConf(), new ArrayList());
+ // TODO: Take major compact or not from command-line.
+ CompactionRequest cr =
+ new CompactionRequest(mockedHRegion, s, selection, true, 0);
+ s.compact(cr);
+ } finally {
+ s.close();
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(),
+ new CompactionTool(), args));
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4efdc6b..683d955 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5334,11 +5334,11 @@ public class HRegion implements HeapSize { // , Writable{
final HLog log = new HLog(fs, logdir, oldLogDir, c);
try {
processTable(fs, tableDir, log, c, majorCompact);
- } finally {
+ } finally {
log.close();
// TODO: is this still right?
BlockCache bc = new CacheConfig(c).getBlockCache();
if (bc != null) bc.shutdown();
- }
+ }
}
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index dcede5a..6fe2891 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -132,9 +131,6 @@ public class Store extends SchemaConfigured implements HeapSize {
private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final String storeNameStr;
- private CompactionProgress progress;
- private final int compactionKVMax;
private final boolean verifyBulkLoads;
// not private for testing
@@ -152,10 +148,6 @@ public class Store extends SchemaConfigured implements HeapSize {
new CopyOnWriteArraySet();
private final int blocksize;
- /** Compression algorithm for flush files and minor compaction */
- private final Compression.Algorithm compression;
- /** Compression algorithm for major compaction */
- private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
@@ -165,6 +157,8 @@ public class Store extends SchemaConfigured implements HeapSize {
// Comparing KeyValues
final KeyValue.KVComparator comparator;
+ private final CompactionTool compactionTool;
+
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -179,52 +173,37 @@ public class Store extends SchemaConfigured implements HeapSize {
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration conf)
throws IOException {
- super(conf, region.getTableDesc().getNameAsString(),
+ super(conf, region.getRegionInfo().getTableNameAsString(),
Bytes.toString(family.getName()));
- HRegionInfo info = region.regionInfo;
+ HRegionInfo info = region.getRegionInfo();
this.fs = fs;
- this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
- if (!this.fs.exists(this.homedir)) {
- if (!this.fs.mkdirs(this.homedir))
- throw new IOException("Failed create of: " + this.homedir.toString());
- }
+ // Assemble the store's home directory.
+ Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+ // Ensure it exists.
+ this.homedir = createStoreHomeDir(this.fs, p);
this.region = region;
this.family = family;
this.conf = conf;
this.blocksize = family.getBlocksize();
- this.compression = family.getCompression();
- // avoid overriding compression setting for major compactions if the user
- // has not specified it separately
- this.compactionCompression =
- (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
- family.getCompactionCompression() : this.compression;
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
family.getDataBlockEncoding());
this.comparator = info.getComparator();
- // getTimeToLive returns ttl in seconds. Convert to milliseconds.
- this.ttl = family.getTimeToLive();
- if (ttl == HConstants.FOREVER) {
- // default is unlimited ttl.
- ttl = Long.MAX_VALUE;
- } else if (ttl == -1) {
- ttl = Long.MAX_VALUE;
- } else {
- // second -> ms adjust for user data
- this.ttl *= 1000;
- }
+ // Get TTL
+ this.ttl = getTTL(family);
// used by ScanQueryMatcher
long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
- LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
+ LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
"ms in store " + this);
+ // Why not just pass a HColumnDescriptor in here altogether? Even if have
+ // to clone it?
scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
- this.storeNameStr = getColumnFamilyName();
// By default, compact if storefile.count >= minFilesToCompact
this.minFilesToCompact = Math.max(2,
@@ -241,7 +220,6 @@ public class Store extends SchemaConfigured implements HeapSize {
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
- this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
false);
@@ -256,6 +234,48 @@ public class Store extends SchemaConfigured implements HeapSize {
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
+ // Create a compaction tool instance
+ this.compactionTool = new CompactionTool();
+ this.compactionTool.setConf(this.conf);
+ }
+
+ /**
+ * @param family
+ * @return
+ */
+ long getTTL(final HColumnDescriptor family) {
+ // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
+ long ttl = family.getTimeToLive();
+ if (ttl == HConstants.FOREVER) {
+ // Default is unlimited ttl.
+ ttl = Long.MAX_VALUE;
+ } else if (ttl == -1) {
+ ttl = Long.MAX_VALUE;
+ } else {
+ // Second -> ms adjust for user data
+ ttl *= 1000;
+ }
+ return ttl;
+ }
+
+ /**
+ * Create this store's homedir
+ * @param fs
+ * @param homedir
+ * @return Return homedir
+ * @throws IOException
+ */
+ Path createStoreHomeDir(final FileSystem fs,
+ final Path homedir) throws IOException {
+ if (!fs.exists(homedir)) {
+ if (!fs.mkdirs(homedir))
+ throw new IOException("Failed create of: " + homedir.toString());
+ }
+ return homedir;
+ }
+
+ FileSystem getFileSystem() {
+ return this.fs;
}
/**
@@ -316,7 +336,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* Return the directory in which this store stores its
* StoreFiles
*/
- public Path getHomedir() {
+ Path getHomedir() {
return homedir;
}
@@ -335,6 +355,10 @@ public class Store extends SchemaConfigured implements HeapSize {
this.dataBlockEncoder = blockEncoder;
}
+ FileStatus [] getStoreFiles() throws IOException {
+ return FSUtils.listStatus(this.fs, this.homedir, null);
+ }
+
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
@@ -342,7 +366,7 @@ public class Store extends SchemaConfigured implements HeapSize {
*/
private List loadStoreFiles() throws IOException {
ArrayList results = new ArrayList();
- FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
+ FileStatus files[] = getStoreFiles();
if (files == null || files.length == 0) {
return results;
@@ -630,7 +654,7 @@ public class Store extends SchemaConfigured implements HeapSize {
storeFileCloserThreadPool.shutdownNow();
}
}
- LOG.debug("closed " + this.storeNameStr);
+ LOG.info("Closed " + this);
return result;
} finally {
this.lock.writeLock().unlock();
@@ -806,7 +830,7 @@ public class Store extends SchemaConfigured implements HeapSize {
*/
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
throws IOException {
- return createWriterInTmp(maxKeyCount, this.compression, false);
+ return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
}
/*
@@ -815,7 +839,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir.
*/
- private StoreFile.Writer createWriterInTmp(int maxKeyCount,
+ StoreFile.Writer createWriterInTmp(int maxKeyCount,
Compression.Algorithm compression, boolean isCompaction)
throws IOException {
final CacheConfig writerCacheConf;
@@ -980,15 +1004,15 @@ public class Store extends SchemaConfigured implements HeapSize {
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this.storeNameStr + " of "
+ + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
try {
- StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
- maxId);
+ StoreFile.Writer writer =
+ this.compactionTool.compact(this, filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1001,7 +1025,7 @@ public class Store extends SchemaConfigured implements HeapSize {
}
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ + filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " +
(sf == null ? "none" : sf.getPath().getName()) +
@@ -1048,7 +1072,8 @@ public class Store extends SchemaConfigured implements HeapSize {
try {
// Ready to go. Have list of files to compact.
- StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
+ StoreFile.Writer writer =
+ this.compactionTool.compact(this, filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
@@ -1097,10 +1122,10 @@ public class Store extends SchemaConfigured implements HeapSize {
}
/** getter for CompactionProgress object
- * @return CompactionProgress object
+ * @return CompactionProgress object; can be null
*/
public CompactionProgress getCompactionProgress() {
- return this.progress;
+ return this.compactionTool.getProgress();
}
/*
@@ -1152,19 +1177,19 @@ public class Store extends SchemaConfigured implements HeapSize {
if (sf.isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this.storeNameStr +
+ LOG.debug("Skipping major compaction of " + this +
" because one (major) compacted file only and oldestTime " +
oldest + "ms is < ttl=" + this.ttl);
}
} else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
", because keyvalues outdated; time since last major compaction " +
(now - lowTimestamp) + "ms");
result = true;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ LOG.debug("Major compaction triggered on store " + this +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
@@ -1344,7 +1369,7 @@ public class Store extends SchemaConfigured implements HeapSize {
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
- this.storeNameStr + ": no store files to compact");
+ this + ": no store files to compact");
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1420,7 +1445,7 @@ public class Store extends SchemaConfigured implements HeapSize {
// if we don't have enough files to compact, just wait
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipped compaction of " + this.storeNameStr
+ LOG.debug("Skipped compaction of " + this
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
@@ -1440,142 +1465,6 @@ public class Store extends SchemaConfigured implements HeapSize {
}
/**
- * Do a minor/major compaction on an explicit set of storefiles in a Store.
- * Uses the scan infrastructure to make it easy.
- *
- * @param filesToCompact which files to compact
- * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
- * @param maxId Readers maximum sequence id.
- * @return Product of compaction or null if all cells expired or deleted and
- * nothing made it through the compaction.
- * @throws IOException
- */
- StoreFile.Writer compactStore(final Collection filesToCompact,
- final boolean majorCompaction, final long maxId)
- throws IOException {
- // calculate maximum key count after compaction (for blooms)
- int maxKeyCount = 0;
- long earliestPutTs = HConstants.LATEST_TIMESTAMP;
- for (StoreFile file : filesToCompact) {
- StoreFile.Reader r = file.getReader();
- if (r != null) {
- // NOTE: getFilterEntries could cause under-sized blooms if the user
- // switches bloom type (e.g. from ROW to ROWCOL)
- long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
- ? r.getFilterEntries() : r.getEntries();
- maxKeyCount += keyCount;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting " + file +
- ", keycount=" + keyCount +
- ", bloomtype=" + r.getBloomFilterType().toString() +
- ", size=" + StringUtils.humanReadableInt(r.length()) +
- ", encoding=" + r.getHFileReader().getEncodingOnDisk());
- }
- }
- // For major compactions calculate the earliest put timestamp
- // of all involved storefiles. This is used to remove
- // family delete marker during the compaction.
- if (majorCompaction) {
- byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
- if (tmp == null) {
- // there's a file with no information, must be an old one
- // assume we have very old puts
- earliestPutTs = HConstants.OLDEST_TIMESTAMP;
- } else {
- earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
- }
- }
- }
-
- // keep track of compaction progress
- progress = new CompactionProgress(maxKeyCount);
-
- // For each file, obtain a scanner:
- List scanners = StoreFileScanner
- .getScannersForStoreFiles(filesToCompact, false, false, true);
-
- // Make the instantiation lazy in case compaction produces no product; i.e.
- // where all source cells are expired or deleted.
- StoreFile.Writer writer = null;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = region.getSmallestReadPoint();
- MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
- try {
- InternalScanner scanner = null;
- try {
- Scan scan = new Scan();
- scan.setMaxVersions(family.getMaxVersions());
- /* include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, scan, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
- smallestReadPoint, earliestPutTs);
- if (region.getCoprocessorHost() != null) {
- InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
- this, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
-
- scanner = cpScanner;
- }
-
- int bytesWritten = 0;
- // since scanner.next() can return 'false' but still be delivering data,
- // we have to use a do/while loop.
- ArrayList kvs = new ArrayList();
- // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, this.compactionKVMax);
- if (writer == null && !kvs.isEmpty()) {
- writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
- true);
- }
- if (writer != null) {
- // output to writer:
- for (KeyValue kv : kvs) {
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- // update progress per key
- ++progress.currentCompactedKVs;
-
- // check periodically to see if a system stop is requested
- if (Store.closeCheckInterval > 0) {
- bytesWritten += kv.getLength();
- if (bytesWritten > Store.closeCheckInterval) {
- bytesWritten = 0;
- if (!this.region.areWritesEnabled()) {
- writer.close();
- fs.delete(writer.getPath(), false);
- throw new InterruptedIOException(
- "Aborting compaction of store " + this +
- " in region " + this.region +
- " because user requested stop.");
- }
- }
- }
- }
- }
- kvs.clear();
- } while (hasMore);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
- }
- } finally {
- if (writer != null) {
- writer.appendMetadata(maxId, majorCompaction);
- writer.close();
- }
- }
- return writer;
- }
-
- /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1677,7 +1566,7 @@ public class Store extends SchemaConfigured implements HeapSize {
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ LOG.error("Failed replacing compacted files in " + this +
". Compacted file is " + (result == null? "none": result.toString()) +
". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
@@ -1962,7 +1851,7 @@ public class Store extends SchemaConfigured implements HeapSize {
return mk.getRow();
}
} catch(IOException e) {
- LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+ LOG.warn("Failed getting store size for " + this, e);
} finally {
this.lock.readLock().unlock();
}
@@ -2008,7 +1897,7 @@ public class Store extends SchemaConfigured implements HeapSize {
@Override
public String toString() {
- return this.storeNameStr;
+ return getColumnFamilyName();
}
/**
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
index 553eee0..4444d42 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
@@ -52,5 +52,4 @@ public class CompactionProgress {
public float getProgressPct() {
return currentCompactedKVs / totalCompactingKVs;
}
-
}
diff --git src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
index d2329e1..885625b 100644
--- src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
+++ src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
@@ -25,9 +25,6 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-
/**
* Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm
@@ -70,7 +67,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) {
- LOG.info(PURECRC32 + " not available.");
+ LOG.trace(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
@@ -80,7 +77,7 @@ public enum ChecksumType {
LOG.info("Checksum can use " + JDKCRC);
}
} catch (Exception e) {
- LOG.warn(JDKCRC + " not available. ", e);
+ LOG.trace(JDKCRC + " not available.");
}
}
@@ -113,7 +110,7 @@ public enum ChecksumType {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) {
- LOG.info(PURECRC32C + " not available. ");
+ LOG.trace(PURECRC32C + " not available.");
}
}
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
index a929e31..33e4e71 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.zookeeper.KeeperException;
/**
* Tracks the root region server location node in zookeeper.
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 7f97b01..0078ebc 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -523,7 +523,9 @@ public class ZKUtil {
logRetrievedMsg(zkw, znode, data, watcherSet);
return data;
} catch (KeeperException.NoNodeException e) {
- LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
+ // This log can get pretty annoying when we cycle on 100ms waits.
+ // Enable trace if you really want to see it.
+ LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " +
"because node does not exist (not an error)"));
return null;
} catch (KeeperException e) {
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 79b6604..0f83655 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.zookeeper;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -80,9 +78,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// negotiation to complete
public CountDownLatch saslLatch = new CountDownLatch(1);
- // set of unassigned nodes watched
- private Set unassignedNodes = new HashSet();
-
// node names
// base znode for this cluster
@@ -179,10 +174,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
}
}
- private boolean isFinishedRetryingRecoverable(final long finished) {
- return System.currentTimeMillis() < finished;
- }
-
@Override
public String toString() {
return this.identifier;
diff --git src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
index 90fa45a..533b2bf 100644
--- src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
+++ src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
@@ -340,7 +340,7 @@ public class TestCatalogTracker {
// Now test waiting on root location getting set.
Thread t = new WaitOnMetaThread(ct);
- startWaitAliveThenWaitItLives(t, 1000);
+ startWaitAliveThenWaitItLives(t, 1);
// Set a root location.
hsa = setRootLocation();
// Join the thread... should exit shortly.
@@ -511,12 +511,15 @@ public class TestCatalogTracker {
}
void doWaiting() throws InterruptedException {
- this.ct.waitForRoot();
+ try {
+ while (this.ct.waitForRoot(100) == null);
+ } catch (NotAllMetaRegionsOnlineException e) {
+ // Ignore.
+ }
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
-
+}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
new file mode 100644
index 0000000..d2b3060
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiResponse;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ProtocolSignature;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A mock RegionServer implementation.
+ * Use this when you can't bend Mockito to your liking (e.g. return null result
+ * when 'scanning' until master timesout and then return a coherent meta row
+ * result thereafter. Have some facility for faking gets and scans. See
+ * {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data
+ * store that the get pulls from.
+ */
+class MockRegionServer implements HRegionInterface, RegionServerServices {
+ private final ServerName sn;
+ private final ZooKeeperWatcher zkw;
+ private final Configuration conf;
+ private final Random random = new Random();
+
+ /**
+ * Map of regions to map of rows and {@link Results}. Used as data source when
+ * {@link MockRegionServer#get(byte[], Get)} is called. Because we have a byte
+ * key, need to use TreeMap and provide a Comparator. Use
+ * {@link #setGetResult(byte[], byte[], Result)} filling this map.
+ */
+ private final Map> gets =
+ new TreeMap>(Bytes.BYTES_COMPARATOR);
+
+ /**
+ * Map of regions to results to return when scanning.
+ */
+ private final Map nexts =
+ new TreeMap(Bytes.BYTES_COMPARATOR);
+
+ /**
+ * Data structure that holds regionname and index used scanning.
+ */
+ class RegionNameAndIndex {
+ private final byte[] regionName;
+ private int index = 0;
+
+ RegionNameAndIndex(final byte[] regionName) {
+ this.regionName = regionName;
+ }
+
+ byte[] getRegionName() {
+ return this.regionName;
+ }
+
+ int getThenIncrement() {
+ int currentIndex = this.index;
+ this.index++;
+ return currentIndex;
+ }
+ }
+
+ /**
+ * Outstanding scanners and their offset into nexts
+ */
+ private final Map scannersAndOffsets =
+ new HashMap();
+
+ /**
+ * @param sn Name of this mock regionserver
+ * @throws IOException
+ * @throws ZooKeeperConnectionException
+ */
+ MockRegionServer(final Configuration conf, final ServerName sn)
+ throws ZooKeeperConnectionException, IOException {
+ this.sn = sn;
+ this.conf = conf;
+ this.zkw = new ZooKeeperWatcher(conf, sn.toString(), this, true);
+ }
+
+ /**
+ * Use this method filling the backing data source used by {@link #get(byte[], Get)}
+ * @param regionName
+ * @param row
+ * @param r
+ */
+ void setGetResult(final byte [] regionName, final byte [] row, final Result r) {
+ Map value = this.gets.get(regionName);
+ if (value == null) {
+ // If no value already, create one. Needs to be treemap because we are
+ // using byte array as key. Not thread safe.
+ value = new TreeMap(Bytes.BYTES_COMPARATOR);
+ this.gets.put(regionName, value);
+ }
+ value.put(row, r);
+ }
+
+ /**
+ * Use this method to set what a scanner will reply as we next through
+ * @param regionName
+ * @param rs
+ */
+ void setNextResults(final byte [] regionName, final Result [] rs) {
+ this.nexts.put(regionName, rs);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isStopped() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ throw new RuntimeException(this.sn + ": " + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo(byte[] regionName) {
+ // Just return this. Calls to getRegionInfo are usually to test connection
+ // to regionserver does reasonable things so should be safe to return
+ // anything.
+ return HRegionInfo.ROOT_REGIONINFO;
+ }
+
+ @Override
+ public void flushRegion(byte[] regionName) throws IllegalArgumentException,
+ IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void flushRegion(byte[] regionName, long ifOlderThanTS)
+ throws IllegalArgumentException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public long getLastFlushTime(byte[] regionName) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public List getStoreFileList(byte[] regionName, byte[] columnFamily)
+ throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List getStoreFileList(byte[] regionName,
+ byte[][] columnFamilies) throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List getStoreFileList(byte[] regionName)
+ throws IllegalArgumentException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Result getClosestRowBefore(byte[] regionName, byte[] row,
+ byte[] family) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Result get(byte[] regionName, Get get) throws IOException {
+ Map m = this.gets.get(regionName);
+ if (m == null) return null;
+ return m.get(get.getRow());
+ }
+
+ @Override
+ public boolean exists(byte[] regionName, Get get) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void put(byte[] regionName, Put put) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public int put(byte[] regionName, List puts) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void delete(byte[] regionName, Delete delete) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public int delete(byte[] regionName, List deletes)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
+ byte[] qualifier, byte[] value, Put put) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
+ byte[] qualifier, byte[] value, Delete delete) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] regionName, byte[] row,
+ byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Result append(byte[] regionName, Append append) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Result increment(byte[] regionName, Increment increment)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long openScanner(byte[] regionName, Scan scan) throws IOException {
+ long scannerId = this.random.nextLong();
+ this.scannersAndOffsets.put(scannerId, new RegionNameAndIndex(regionName));
+ return scannerId;
+ }
+
+ @Override
+ public Result next(long scannerId) throws IOException {
+ RegionNameAndIndex rnai = this.scannersAndOffsets.get(scannerId);
+ int index = rnai.getThenIncrement();
+ Result [] results = this.nexts.get(rnai.getRegionName());
+ if (results == null) return null;
+ return index < results.length? results[index]: null;
+ }
+
+ @Override
+ public Result [] next(long scannerId, int numberOfRows) throws IOException {
+ // Just return one result whatever they ask for.
+ Result r = next(scannerId);
+ return r == null? null: new Result [] {r};
+ }
+
+ @Override
+ public void close(final long scannerId) throws IOException {
+ this.scannersAndOffsets.remove(scannerId);
+ }
+
+ @Override
+ public long lockRow(byte[] regionName, byte[] row) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void unlockRow(byte[] regionName, long lockId) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public List getOnlineRegions() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HServerInfo getHServerInfo() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public MultiResponse multi(MultiAction multi) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean bulkLoadHFiles(List> familyPaths,
+ byte[] regionName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public RegionOpeningState openRegion(HRegionInfo region) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RegionOpeningState openRegion(HRegionInfo region,
+ int versionOfOfflineNode) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void openRegions(List regions) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean closeRegion(HRegionInfo region) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean closeRegion(HRegionInfo region, int versionOfClosingNode)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean closeRegion(HRegionInfo region, boolean zk)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean closeRegion(byte[] encodedRegionName, boolean zk)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void flushRegion(HRegionInfo regionInfo)
+ throws NotServingRegionException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void splitRegion(HRegionInfo regionInfo)
+ throws NotServingRegionException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
+ throws NotServingRegionException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void compactRegion(HRegionInfo regionInfo, boolean major)
+ throws NotServingRegionException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void replicateLogEntries(Entry[] entries) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public ExecResult execCoprocessor(byte[] regionName, Exec call)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
+ byte[] qualifier, CompareOp compareOp,
+ WritableByteArrayComparable comparator, Put put) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
+ byte[] qualifier, CompareOp compareOp,
+ WritableByteArrayComparable comparator, Delete delete)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public List getBlockCacheColumnFamilySummaries()
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public byte[][] rollHLogWriter() throws IOException,
+ FailedLogCloseException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void stop(String why) {
+ this.zkw.close();
+ }
+
+ @Override
+ public void addToOnlineRegions(HRegion r) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean removeFromOnlineRegions(String encodedRegionName) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public HRegion getFromOnlineRegions(String encodedRegionName) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List getOnlineRegions(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void refreshRegion(HRegion hRegion) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return this.zkw;
+ }
+
+ @Override
+ public CatalogTracker getCatalogTracker() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return this.sn;
+ }
+
+ @Override
+ public boolean isStopping() {
+ return false;
+ }
+
+ @Override
+ public HLog getWAL() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CompactionRequestor getCompactionRequester() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FlushRequester getFlushRequester() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RegionServerAccounting getRegionServerAccounting() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void postOpenDeployTasks(HRegion r, CatalogTracker ct, boolean daughter)
+ throws KeeperException, IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public RpcServer getRpcServer() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Map getRegionsInTransitionInRS() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void mutateRow(byte[] regionName, RowMutations rm) throws IOException {
+ // TODO Auto-generated method stub
+ }
+}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/master/Mocking.java src/test/java/org/apache/hadoop/hbase/master/Mocking.java
new file mode 100644
index 0000000..676d6bb
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/master/Mocking.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertNotSame;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Package scoped mocking utility.
+ */
+public class Mocking {
+ /**
+ * @param sn ServerName to use making startcode and server in meta
+ * @param hri Region to serialize into HRegionInfo
+ * @return A mocked up Result that fakes a Get on a row in the
+ * .META. table.
+ * @throws IOException
+ */
+ static Result getMetaTableRowResult(final HRegionInfo hri,
+ final ServerName sn)
+ throws IOException {
+ // TODO: Move to a utilities class. More than one test case can make use
+ // of this facility.
+ List kvs = new ArrayList();
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(hri)));
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(sn.getHostAndPort())));
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(sn.getStartcode())));
+ return new Result(kvs);
+ }
+
+ /**
+ * Fakes the regionserver-side zk transitions of a region open.
+ * @param w ZooKeeperWatcher to use.
+ * @param sn Name of the regionserver doing the 'opening'
+ * @param hri Region we're 'opening'.
+ * @throws KeeperException
+ */
+ static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w,
+ final ServerName sn, final HRegionInfo hri)
+ throws KeeperException {
+ // Wait till we see the OFFLINE zk node before we proceed.
+ while (!ZKAssign.verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) {
+ Threads.sleep(1);
+ }
+ // Get current versionid else will fail on transition from OFFLINE to OPENING below
+ int versionid = ZKAssign.getVersion(w, hri);
+ assertNotSame(-1, versionid);
+ // This uglyness below is what the openregionhandler on RS side does. I
+ // looked at exposing the method over in openregionhandler but its just a
+ // one liner and its deep over in another package so just repeat it below.
+ versionid = ZKAssign.transitionNode(w, hri, sn,
+ EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_OPENING, versionid);
+ assertNotSame(-1, versionid);
+ // Move znode from OPENING to OPENED as RS does on successful open.
+ versionid = ZKAssign.transitionNodeOpened(w, hri, sn, versionid);
+ assertNotSame(-1, versionid);
+ // We should be done now. The master open handler will notice the
+ // transition and remove this regions znode.
+ }
+}
diff --git src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 841649a..91dce36 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -145,7 +144,7 @@ public class TestAssignmentManager {
/**
* Test a balance going on at same time as a master failover
- *
+ *
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
@@ -383,7 +382,7 @@ public class TestAssignmentManager {
// Make an RS Interface implementation. Make it so a scanner can go against it.
HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
// Get a meta row result that has region up on SERVERNAME_A
- Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
+ Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())).
thenReturn(System.currentTimeMillis());
// Return a good result first and then return null to indicate end of scan
@@ -420,31 +419,6 @@ public class TestAssignmentManager {
}
/**
- * @param sn ServerName to use making startcode and server in meta
- * @param hri Region to serialize into HRegionInfo
- * @return A mocked up Result that fakes a Get on a row in the
- * .META. table.
- * @throws IOException
- */
- private Result getMetaTableRowResult(final HRegionInfo hri,
- final ServerName sn)
- throws IOException {
- // TODO: Move to a utilities class. More than one test case can make use
- // of this facility.
- List kvs = new ArrayList();
- kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
- HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(hri)));
- kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
- HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
- Bytes.toBytes(sn.getHostAndPort())));
- kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
- HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
- Bytes.toBytes(sn.getStartcode())));
- return new Result(kvs);
- }
-
- /**
* Create and startup executor pools. Start same set as master does (just
* run a few less).
* @param name Name to give our executor
@@ -507,8 +481,8 @@ public class TestAssignmentManager {
* @param region region to be created as offline
* @param serverName server event originates from
* @return Version of znode created.
- * @throws KeeperException
- * @throws IOException
+ * @throws KeeperException
+ * @throws IOException
*/
// Copied from SplitTransaction rather than open the method over there in
// the regionserver package.
@@ -567,7 +541,7 @@ public class TestAssignmentManager {
// with an encoded name by doing a Get on .META.
HRegionInterface ri = Mockito.mock(HRegionInterface.class);
// Get a meta row result that has region up on SERVERNAME_A for REGIONINFO
- Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
+ Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
Mockito.when(ri .openScanner((byte[]) Mockito.any(), (Scan) Mockito.any())).
thenReturn(System.currentTimeMillis());
// Return good result 'r' first and then return null to indicate end of scan
diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
new file mode 100644
index 0000000..1f7853e
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.RootLocationEditor;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Standup the master and fake it to test various aspects of master function.
+ * Does NOT spin up a mini hbase nor mini dfs cluster testing master (it does
+ * put up a zk cluster but this is usually pretty fast compared). Also, should
+ * be possible to inject faults at points difficult to get at in cluster context.
+ * TODO: Speed up the zk connection by Master. It pauses 5 seconds establishing
+ * session.
+ */
+public class TestMasterNoCluster {
+ private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration c = TESTUTIL.getConfiguration();
+ // We use local filesystem. Set it so it writes into the testdir.
+ c.set(HConstants.HBASE_DIR, TESTUTIL.getDataTestDir().toString());
+ // Startup a mini zk cluster.
+ TESTUTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TESTUTIL.shutdownMiniZKCluster();
+ }
+
+ @After
+ public void tearDown()
+ throws KeeperException, ZooKeeperConnectionException, IOException {
+ // Make sure zk is clean before we run the next test.
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(TESTUTIL.getConfiguration(),
+ "@Before", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ throw new RuntimeException(why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ });
+ ZKUtil.deleteNodeRecursively(zkw, zkw.baseZNode);
+ zkw.close();
+ }
+
+ /**
+ * Test starting master then stopping it before its fully up.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testStopDuringStart()
+ throws IOException, KeeperException, InterruptedException {
+ HMaster master = new HMaster(TESTUTIL.getConfiguration());
+ master.start();
+ // Immediately have it stop. We used hang in assigning root.
+ master.stopMaster();
+ master.join();
+ }
+
+ /**
+ * Test master failover.
+ * Start up three fake regionservers and a master.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testFailover()
+ throws IOException, KeeperException, InterruptedException {
+ final long now = System.currentTimeMillis();
+ // Names for our three servers. Make the port numbers match hostname.
+ // Will come in use down in the server when we need to figure how to respond.
+ final ServerName sn0 = new ServerName("0.example.org", 0, now);
+ final ServerName sn1 = new ServerName("1.example.org", 1, now);
+ final ServerName sn2 = new ServerName("2.example.org", 2, now);
+ final ServerName [] sns = new ServerName [] {sn0, sn1, sn2};
+ // Put up the mock servers
+ final Configuration conf = TESTUTIL.getConfiguration();
+ final MockRegionServer rs0 = new MockRegionServer(conf, sn0);
+ final MockRegionServer rs1 = new MockRegionServer(conf, sn1);
+ final MockRegionServer rs2 = new MockRegionServer(conf, sn2);
+ // Put some data into the servers. Make it look like sn0 has the root
+ // w/ an entry that points to sn1 as the host of .META. Put data into sn2
+ // so it looks like it has a few regions for a table named 't'.
+ RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName());
+ byte [] rootregion = Bytes.toBytes("-ROOT-,,0");
+ rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),
+ Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO,
+ rs1.getServerName()));
+ final byte [] tableName = Bytes.toBytes("t");
+ Result [] results = new Result [] {
+ Mocking.getMetaTableRowResult(
+ new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HBaseTestingUtility.KEYS[1]),
+ rs2.getServerName()),
+ Mocking.getMetaTableRowResult(
+ new HRegionInfo(tableName, HBaseTestingUtility.KEYS[1], HBaseTestingUtility.KEYS[2]),
+ rs2.getServerName()),
+ Mocking.getMetaTableRowResult(new HRegionInfo(tableName, HBaseTestingUtility.KEYS[2],
+ HConstants.EMPTY_END_ROW),
+ rs2.getServerName())
+ };
+ rs1.setNextResults(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), results);
+
+ // Create master. Subclass to override a few methods so we can insert mocks
+ // and get notification on transitions. We need to fake out any rpcs the
+ // master does opening/closing regions. Also need to fake out the address
+ // of the 'remote' mocked up regionservers.
+ HMaster master = new HMaster(conf) {
+ InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
+ throws UnknownHostException {
+ // Return different address dependent on port passed.
+ ServerName sn = sns[port];
+ return InetAddress.getByAddress(sn.getHostname(),
+ new byte [] {10, 0, 0, (byte)sn.getPort()});
+ }
+
+ @Override
+ ServerManager createServerManager(Server master, MasterServices services)
+ throws IOException {
+ ServerManager sm = super.createServerManager(master, services);
+ // Spy on the created servermanager
+ ServerManager spy = Mockito.spy(sm);
+ // Fake a successful open.
+ Mockito.doReturn(RegionOpeningState.OPENED).when(spy).
+ sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(),
+ Mockito.anyInt());
+ return spy;
+ }
+
+ @Override
+ CatalogTracker createCatalogTracker(ZooKeeperWatcher zk,
+ Configuration conf, Abortable abortable, int defaultTimeout)
+ throws IOException {
+ // Insert a mock for the connection used by the CatalogTracker. Any
+ // regionserver should do. Use TESTUTIL.getConfiguration rather than
+ // the conf from the master; the conf will already have an HConnection
+ // associate so the below mocking of a connection will fail.
+ HConnection connection =
+ HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(),
+ rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
+ return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout);
+ }
+ };
+ master.start();
+
+ try {
+ // Wait till master is up ready for RPCs.
+ while (!master.isRpcServerOpen()) Threads.sleep(10);
+ // Fake master that there are regionservers out there. Report in.
+ for (int i = 0; i < sns.length; i++) {
+ master.regionServerReport(sns[i].getVersionedBytes(), new HServerLoad());
+ }
+ // Master should now come up.
+ while (!master.isInitialized()) {Threads.sleep(10);}
+ assertTrue(master.isInitialized());
+ } finally {
+ rs0.stop("Test is done");
+ rs1.stop("Test is done");
+ rs2.stop("Test is done");
+ master.stopMaster();
+ master.join();
+ }
+ }
+
+ /**
+ * Test starting master getting it up post initialized state using mocks.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testCatalogDeploys()
+ throws IOException, KeeperException, InterruptedException {
+ final Configuration conf = TESTUTIL.getConfiguration();
+ final long now = System.currentTimeMillis();
+ // Name for our single mocked up regionserver.
+ final ServerName sn = new ServerName("0.example.org", 0, now);
+ // Here is our mocked up regionserver. Create it now. Need it setting up
+ // master next.
+ final MockRegionServer rs0 = new MockRegionServer(conf, sn);
+
+ // Create master. Subclass to override a few methods so we can insert mocks
+ // and get notification on transitions. We need to fake out any rpcs the
+ // master does opening/closing regions. Also need to fake out the address
+ // of the 'remote' mocked up regionservers.
+ HMaster master = new HMaster(conf) {
+ InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
+ throws UnknownHostException {
+ // Interject an unchecked, nonsense InetAddress; i.e. no resolve.
+ return InetAddress.getByAddress(rs0.getServerName().getHostname(),
+ new byte [] {10, 0, 0, 0});
+ }
+
+ @Override
+ ServerManager createServerManager(Server master, MasterServices services)
+ throws IOException {
+ ServerManager sm = super.createServerManager(master, services);
+ // Spy on the created servermanager
+ ServerManager spy = Mockito.spy(sm);
+ // Fake a successful open.
+ Mockito.doReturn(RegionOpeningState.OPENED).when(spy).
+ sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(),
+ Mockito.anyInt());
+ return spy;
+ }
+
+ @Override
+ CatalogTracker createCatalogTracker(ZooKeeperWatcher zk,
+ Configuration conf, Abortable abortable, int defaultTimeout)
+ throws IOException {
+ // Insert a mock for the connection used by the CatalogTracker. Use
+ // TESTUTIL.getConfiguration rather than the conf from the master; the
+ // conf will already have an HConnection associate so the below mocking
+ // of a connection will fail.
+ HConnection connection =
+ HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(),
+ rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
+ return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout);
+ }
+ };
+ master.start();
+
+ try {
+ // Wait till master is up ready for RPCs.
+ while (!master.isRpcServerOpen()) Threads.sleep(10);
+ // Fake master that there is a regionserver out there. Report in.
+ MapWritable mw = master.regionServerStartup(rs0.getServerName().getPort(),
+ rs0.getServerName().getStartcode(), now);
+ // Assert hostname is as expected.
+ String rshostname =
+ mw.get(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)).toString();
+ assertEquals(rs0.getServerName().getHostname(), rshostname);
+ // Now master knows there is at least one regionserver checked in and so
+ // it'll wait a while to see if more and when none, will assign root and
+ // meta to this single server. Will do an rpc open but we've
+ // mocked it above in our master override to return 'success'. As part of
+ // region open, master will have set an unassigned znode for the region up
+ // into zk for the regionserver to transition. Lets do that now to
+ // complete fake of a successful open.
+ Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
+ rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
+ // Need to set root location as r1. Usually the regionserver does this
+ // when its figured it just opened the root region by setting the root
+ // location up into zk. Since we're mocking regionserver, need to do this
+ // ourselves.
+ RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName());
+ // Do same transitions for .META. (presuming master has by now assigned
+ // .META. to rs1).
+ Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
+ rs0.getServerName(), HRegionInfo.FIRST_META_REGIONINFO);
+ // Now trigger our mock regionserver to start returning a row when we
+ // go to get .META. entry in -ROOT-. We do it by setting into
+ // our MockRegionServer some data to be returned when there is a get on
+ // -ROOT- table (up to this its been returning null making master think
+ // nothing assigned, not even .META.). The region for -ROOT- table we
+ // hardcode below. Its always the same, at least in tests. We need to do
+ // this because CatalogTracker runs inside in Master initialization to
+ // confirm .META. has a server.
+ byte [] rootregion = Bytes.toBytes("-ROOT-,,0");
+ rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),
+ Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO,
+ rs0.getServerName()));
+ // Master should now come up.
+ while (!master.isInitialized()) {Threads.sleep(10);}
+ assertTrue(master.isInitialized());
+ } finally {
+ rs0.stop("Test is done");
+ master.stopMaster();
+ master.join();
+ }
+ }
+}
\ No newline at end of file