diff --git src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index beaff97..408db79 100644 --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -185,7 +185,7 @@ public class CatalogTracker { this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout); } - CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, + public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf, HConnection connection, Abortable abortable, final int defaultTimeout) throws IOException { this.connection = connection; @@ -310,16 +310,6 @@ public class CatalogTracker { } /** - * Waits indefinitely for availability of -ROOT-. Used during - * cluster startup. - * @throws InterruptedException if interrupted while waiting - */ - public void waitForRoot() - throws InterruptedException { - this.rootRegionTracker.blockUntilAvailable(); - } - - /** * Gets the current location for -ROOT- if available and waits * for up to the specified timeout if not immediately available. Returns null * if the timeout elapses before root is available. @@ -330,7 +320,7 @@ public class CatalogTracker { * @throws NotAllMetaRegionsOnlineException if root not available before * timeout */ - ServerName waitForRoot(final long timeout) + public ServerName waitForRoot(final long timeout) throws InterruptedException, NotAllMetaRegionsOnlineException { ServerName sn = rootRegionTracker.waitRootRegionLocation(timeout); if (sn == null) { diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index b52e5d3..91562f7 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -628,7 +628,7 @@ public class LruBlockCache implements BlockCache, HeapSize { // Log size long totalSize = heapSize(); long freeSize = maxSize - totalSize; - LruBlockCache.LOG.debug("LRU Stats: " + + LruBlockCache.LOG.debug("Stats: " + "total=" + StringUtils.byteDesc(totalSize) + ", " + "free=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + @@ -636,11 +636,11 @@ public class LruBlockCache implements BlockCache, HeapSize { "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + - (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" + - (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2)+ ", ")) + ", " + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " + "evictedPerRun=" + stats.evictedPerEviction()); diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index d47ef10..013f528 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -346,6 +346,9 @@ public class AssignmentManager extends ZooKeeperListener { // Returns servers who have not checked in (assumed dead) and their regions Map>> deadServers = rebuildUserRegions(onlineServers); + // This method will assign all user regions if a clean server startup or + // it will reconstitute master state and cleanup any leftovers from + // previous master process. processDeadServersAndRegionsInTransition(deadServers); // Recover the tables that were not fully moved to DISABLED state. @@ -380,7 +383,8 @@ public class AssignmentManager extends ZooKeeperListener { /** * Process all regions that are in transition in zookeeper and also * processes the list of dead servers by scanning the META. - * Used by master joining an cluster. + * Used by master joining an cluster. If we figure this is a clean cluster + * startup, will assign all user regions. * @param deadServers * Map of dead servers and their regions. Can be null. * @throws KeeperException @@ -395,8 +399,7 @@ public class AssignmentManager extends ZooKeeperListener { // Run through all regions. If they are not assigned and not in RIT, then // its a clean cluster startup, else its a failover. for (Map.Entry e: this.regions.entrySet()) { - if (!e.getKey().isMetaTable() - && e.getValue() != null) { + if (!e.getKey().isMetaTable() && e.getValue() != null) { LOG.debug("Found " + e + " out on cluster"); this.failover = true; break; @@ -2127,7 +2130,7 @@ public class AssignmentManager extends ZooKeeperListener { public void waitForAssignment(HRegionInfo regionInfo) throws InterruptedException { synchronized(regions) { - while(!regions.containsKey(regionInfo)) { + while(!this.master.isStopped() && !regions.containsKey(regionInfo)) { // We should receive a notification, but it's // better to have a timeout to recheck the condition here: // it lowers the impact of a race condition if any diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cd1755f..154db42 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -24,6 +24,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -43,6 +44,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -154,6 +158,9 @@ Server { // RPC server for the HMaster private final RpcServer rpcServer; + // Set after we've called HBaseServer#openServer and ready to receive RPCs. + // Set back to false after we stop rpcServer. Used by tests. + private volatile boolean rpcServerOpen = false; /** * This servers address. @@ -290,17 +297,18 @@ Server { this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.rpcServer.startThreads(); this.metrics = new MasterMetrics(getServerName().toString()); - // initialize instant schema change settings - this.supportInstantSchemaChanges = conf.getBoolean( - "hbase.instant.schema.alter.enabled", false); - if (supportInstantSchemaChanges) { - LOG.info("Instant schema change enabled. All schema alter operations will " + - "happen through ZK."); - } - else { - LOG.info("Instant schema change disabled. All schema alter operations will " + - "happen normally."); - } + this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf); + } + + /** + * Get whether instant schema change is on or not. + * @param c + * @return True if instant schema enabled. + */ + private boolean getSupportInstantSchemaChanges(final Configuration c) { + boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false); + LOG.debug("Instant schema change enabled=" + b + "."); + return b; } /** @@ -418,7 +426,7 @@ Server { */ private void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, KeeperException { - this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, + this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); @@ -452,8 +460,27 @@ Server { ", cluster-up flag was=" + wasUp); } + /** + * Create CatalogTracker. + * In its own method so can intercept and mock it over in tests. + * @param zk If zk is null, we'll create an instance (and shut it down + * when {@link #stop()} is called) else we'll use what is passed. + * @param conf + * @param abortable If fatal exception we'll call abort on this. May be null. + * If it is we'll use the Connection associated with the passed + * {@link Configuration} as our {@link Abortable}. + * @param defaultTimeout Timeout to use. Pass zero for no timeout + * ({@link Object#wait(long)} when passed a 0 waits for ever). + * @throws IOException + */ + CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk, + final Configuration conf, Abortable abortable, final int defaultTimeout) + throws IOException { + return new CatalogTracker(zk, conf, abortable, defaultTimeout); + } + // Check if we should stop every second. - private Sleeper stopSleeper = new Sleeper(1000, this); + private Sleeper stopSleeper = new Sleeper(100, this); private void loop() { while (!this.stopped) { stopSleeper.sleep(); @@ -505,7 +532,7 @@ Server { this.executorService = new ExecutorService(getServerName().toString()); - this.serverManager = new ServerManager(this, this); + this.serverManager = createServerManager(this, this); status.setStatus("Initializing ZK system trackers"); initializeZKBasedSystemTrackers(); @@ -537,7 +564,7 @@ Server { splitLogAfterStartup(this.fileSystemManager, onlineServers); // Make sure root and meta assigned before proceeding. - assignRootAndMeta(status); + if (!assignRootAndMeta(status)) return; serverShutdownHandlerEnabled = true; this.serverManager.expireDeadNotExpiredServers(); @@ -596,14 +623,30 @@ Server { } /** + * Create a {@link ServerManager} instance. + * @param master + * @param services + * @return An instance of {@link ServerManager} + * @throws ZooKeeperConnectionException + * @throws IOException + */ + ServerManager createServerManager(final Server master, + final MasterServices services) + throws IOException { + // We put this out here in a method so can do a Mockito.spy and stub it out + // w/ a mocked up ServerManager. + return new ServerManager(master, services); + } + + /** * Check -ROOT- and .META. are assigned. If not, * assign them. * @throws InterruptedException * @throws IOException * @throws KeeperException - * @return Count of regions we assigned. + * @return True if root and meta are healthy, assigned */ - int assignRootAndMeta(MonitoredTask status) + boolean assignRootAndMeta(MonitoredTask status) throws InterruptedException, IOException, KeeperException { int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); @@ -617,8 +660,9 @@ Server { currentRootServer = this.catalogTracker.getRootLocation(); splitLogAndExpireIfOnline(currentRootServer); this.assignmentManager.assignRoot(); - this.catalogTracker.waitForRoot(); - //This guarantees that the transition has completed + // Make sure a -ROOT- location is set. + if (!isRootLocation()) return false; + // This guarantees that the transition assigning -ROOT- has completed this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO); assigned++; } else { @@ -629,6 +673,8 @@ Server { // Enable the ROOT table if on process fail over the RS containing ROOT // was active. enableCatalogTables(Bytes.toString(HConstants.ROOT_TABLE_NAME)); + // Check for stopped, just in case + if (this.stopped) return false; LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getRootLocation()); @@ -658,7 +704,25 @@ Server { LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getMetaLocation()); status.setStatus("META and ROOT assigned."); - return assigned; + return true; + } + + /** + * @return True if there a root available + * @throws InterruptedException + */ + private boolean isRootLocation() throws InterruptedException { + // Cycle up here in master rather than down in catalogtracker so we can + // check the master stopped flag every so often. + while (!this.stopped) { + try { + if (this.catalogTracker.waitForRoot(100) != null) break; + } catch (NotAllMetaRegionsOnlineException e) { + // Ignore. I know -ROOT- is not online yet. + } + } + // We got here because we came of above loop. + return !this.stopped; } private void enableCatalogTables(String catalogTableName) { @@ -793,7 +857,7 @@ Server { * as OOMEs; it should be lightly loaded. See what HRegionServer does if * need to install an unexpected exception handler. */ - private void startServiceThreads() throws IOException{ + void startServiceThreads() throws IOException{ // Start the executor service pools this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, @@ -831,10 +895,18 @@ Server { // Start allowing requests to happen. this.rpcServer.openServer(); + this.rpcServerOpen = true; if (LOG.isDebugEnabled()) { LOG.debug("Started service threads"); } + } + /** + * Use this when trying to figure when its ok to send in rpcs. Used by tests. + * @return True if we have successfully run {@link HBaseServer#openServer()} + */ + boolean isRpcServerOpen() { + return this.rpcServerOpen; } private void stopServiceThreads() { @@ -842,6 +914,7 @@ Server { LOG.debug("Stopping service threads"); } if (this.rpcServer != null) this.rpcServer.stop(); + this.rpcServerOpen = false; // Clean up and close up shop if (this.logCleaner!= null) this.logCleaner.interrupt(); if (this.infoServer != null) { @@ -908,7 +981,7 @@ Server { final long serverStartCode, final long serverCurrentTime) throws IOException { // Register with server manager - InetAddress ia = HBaseServer.getRemoteIp(); + InetAddress ia = getRemoteInetAddress(port, serverStartCode); ServerName rs = this.serverManager.regionServerStartup(ia, port, serverStartCode, serverCurrentTime); // Send back some config info @@ -919,6 +992,17 @@ Server { } /** + * @return Get remote side's InetAddress + * @throws UnknownHostException + */ + InetAddress getRemoteInetAddress(final int port, final long serverStartCode) + throws UnknownHostException { + // Do it out here in its own little method so can fake an address when + // mocking up in tests. + return HBaseServer.getRemoteIp(); + } + + /** * @return Subset of configuration to pass initializing regionservers: e.g. * the filesystem to use and root directory to use. */ @@ -1255,8 +1339,6 @@ Server { LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = " + Bytes.toString(tableName) + " Alter Status = " + alterStatus.toString()); - int numberPending = alterStatus.getNumberOfRegionsToProcess() - - alterStatus.getNumberOfRegionsProcessed(); return new Pair(alterStatus.getNumberOfRegionsProcessed(), alterStatus.getNumberOfRegionsToProcess()); } else { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java new file mode 100644 index 0000000..264a7ae --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -0,0 +1,257 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.mockito.Mockito; + +/** + * Compact passed set of files. + */ +public class CompactionTool implements Tool { + private static final Log LOG = LogFactory.getLog(CompactionTool.class); + private Configuration conf; + private CompactionProgress progress; + + CompactionTool() { + super(); + } + + /** + * Do a minor/major compaction on an explicit set of storefiles from a Store. + * + * @param store Store the files belong to + * @param filesToCompact which files to compact + * @param majorCompaction true to major compact (prune all deletes, max versions, etc) + * @param maxId Readers maximum sequence id. + * @return Product of compaction or null if all cells expired or deleted and + * nothing made it through the compaction. + * @throws IOException + */ + StoreFile.Writer compact(final Store store, + final Collection filesToCompact, + final boolean majorCompaction, final long maxId) + throws IOException { + // Calculate maximum key count after compaction (for blooms) + // Also calculate earliest put timestamp if major compaction + int maxKeyCount = 0; + long earliestPutTs = HConstants.LATEST_TIMESTAMP; + for (StoreFile file: filesToCompact) { + StoreFile.Reader r = file.getReader(); + if (r == null) { + LOG.warn("Null reader for " + file.getPath()); + continue; + } + // NOTE: getFilterEntries could cause under-sized blooms if the user + // switches bloom type (e.g. from ROW to ROWCOL) + long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())? + r.getFilterEntries() : r.getEntries(); + maxKeyCount += keyCount; + // For major compactions calculate the earliest put timestamp of all + // involved storefiles. This is used to remove family delete marker during + // compaction. + if (majorCompaction) { + byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); + if (tmp == null) { + // There's a file with no information, must be an old one + // assume we have very old puts + earliestPutTs = HConstants.OLDEST_TIMESTAMP; + } else { + earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting " + file + + ", keycount=" + keyCount + + ", bloomtype=" + r.getBloomFilterType().toString() + + ", size=" + StringUtils.humanReadableInt(r.length()) + + ", encoding=" + r.getHFileReader().getEncodingOnDisk() + + (majorCompaction? ", earliestPutTs=" + earliestPutTs: "")); + } + } + + // keep track of compaction progress + this.progress = new CompactionProgress(maxKeyCount); + + // For each file, obtain a scanner: + List scanners = StoreFileScanner + .getScannersForStoreFiles(filesToCompact, false, false, true); + + // Get some configs + int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10); + Compression.Algorithm compression = store.getFamily().getCompression(); + // Avoid overriding compression setting for major compactions if the user + // has not specified it separately + Compression.Algorithm compactionCompression = + (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ? + store.getFamily().getCompactionCompression(): compression; + // Make the instantiation lazy in case compaction produces no product; i.e. + // where all source cells are expired or deleted. + StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + try { + InternalScanner scanner = null; + try { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + /* Include deletes, unless we are doing a major compaction */ + scanner = new StoreScanner(store, scan, scanners, + majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, + smallestReadPoint, earliestPutTs); + if (store.getHRegion().getCoprocessorHost() != null) { + InternalScanner cpScanner = + store.getHRegion().getCoprocessorHost().preCompact(store, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } + + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + ArrayList kvs = new ArrayList(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (writer == null && !kvs.isEmpty()) { + writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true); + } + if (writer != null) { + // output to writer: + for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } + writer.append(kv); + // update progress per key + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (Store.closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > Store.closeCheckInterval) { + bytesWritten = 0; + if (!store.getHRegion().areWritesEnabled()) { + writer.close(); + store.getFileSystem().delete(writer.getPath(), false); + throw new InterruptedIOException( + "Aborting compaction of store " + store + + " in region " + store.getHRegion() + + " because user requested stop."); + } + } + } + } + } + kvs.clear(); + } while (hasMore); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } finally { + if (writer != null) { + writer.appendMetadata(maxId, majorCompaction); + writer.close(); + } + } + return writer; + } + + CompactionProgress getProgress() { + return this.progress; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration c) { + this.conf = c; + } + + void usage() { + + } + + @Override + public int run(final String[] args) throws Exception { + if (args.length == 0) usage(); + FileSystem fs = FileSystem.get(getConf()); + List files = new ArrayList(); + for (String s: args) { + files.add(fs.getFileStatus(new Path(s))); + } + // Pass anything for hbase.rootdir. We should not be writing anything there. + // We are going to fake out Store on where everything is. + Path hbaseRootdir = new Path(getConf().get("hbase.tmp.dir")); + // TODO: Let this be configurable from command-line setting versions, etc. + HColumnDescriptor hcd = new HColumnDescriptor("f"); + HRegionInfo hri = new HRegionInfo(Bytes.toBytes("t")); + // Mock up an HRegion instance. + HRegion mockedHRegion = Mockito.mock(HRegion.class); + Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(hri); + // Create a Store w/ check of hbase.rootdir blanked out and return our + // list of files instead of have Store search its home dir. + Store s = new Store(hbaseRootdir, mockedHRegion, hcd, fs, getConf()) { + @Override + FileStatus[] getStoreFiles() throws IOException { + return null; + } + + @Override + Path createStoreHomeDir(FileSystem fs, Path homedir) throws IOException { + return homedir; + } + }; + // Now we have a Store, run a compaction of passed files. + try { + CompactSelection selection = + new CompactSelection(getConf(), new ArrayList()); + // TODO: Take major compact or not from command-line. + CompactionRequest cr = + new CompactionRequest(mockedHRegion, s, selection, true, 0); + s.compact(cr); + } finally { + s.close(); + } + return 0; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), + new CompactionTool(), args)); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4efdc6b..683d955 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5334,11 +5334,11 @@ public class HRegion implements HeapSize { // , Writable{ final HLog log = new HLog(fs, logdir, oldLogDir, c); try { processTable(fs, tableDir, log, c, majorCompact); - } finally { + } finally { log.close(); // TODO: is this still right? BlockCache bc = new CacheConfig(c).getBlockCache(); if (bc != null) bc.shutdown(); - } + } } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index dcede5a..6fe2891 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -132,9 +131,6 @@ public class Store extends SchemaConfigured implements HeapSize { private volatile long totalUncompressedBytes = 0L; private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final String storeNameStr; - private CompactionProgress progress; - private final int compactionKVMax; private final boolean verifyBulkLoads; // not private for testing @@ -152,10 +148,6 @@ public class Store extends SchemaConfigured implements HeapSize { new CopyOnWriteArraySet(); private final int blocksize; - /** Compression algorithm for flush files and minor compaction */ - private final Compression.Algorithm compression; - /** Compression algorithm for major compaction */ - private final Compression.Algorithm compactionCompression; private HFileDataBlockEncoder dataBlockEncoder; /** Checksum configuration */ @@ -165,6 +157,8 @@ public class Store extends SchemaConfigured implements HeapSize { // Comparing KeyValues final KeyValue.KVComparator comparator; + private final CompactionTool compactionTool; + /** * Constructor * @param basedir qualified path under which the region directory lives; @@ -179,52 +173,37 @@ public class Store extends SchemaConfigured implements HeapSize { protected Store(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Configuration conf) throws IOException { - super(conf, region.getTableDesc().getNameAsString(), + super(conf, region.getRegionInfo().getTableNameAsString(), Bytes.toString(family.getName())); - HRegionInfo info = region.regionInfo; + HRegionInfo info = region.getRegionInfo(); this.fs = fs; - this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); - if (!this.fs.exists(this.homedir)) { - if (!this.fs.mkdirs(this.homedir)) - throw new IOException("Failed create of: " + this.homedir.toString()); - } + // Assemble the store's home directory. + Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); + // Ensure it exists. + this.homedir = createStoreHomeDir(this.fs, p); this.region = region; this.family = family; this.conf = conf; this.blocksize = family.getBlocksize(); - this.compression = family.getCompression(); - // avoid overriding compression setting for major compactions if the user - // has not specified it separately - this.compactionCompression = - (family.getCompactionCompression() != Compression.Algorithm.NONE) ? - family.getCompactionCompression() : this.compression; this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(), family.getDataBlockEncoding()); this.comparator = info.getComparator(); - // getTimeToLive returns ttl in seconds. Convert to milliseconds. - this.ttl = family.getTimeToLive(); - if (ttl == HConstants.FOREVER) { - // default is unlimited ttl. - ttl = Long.MAX_VALUE; - } else if (ttl == -1) { - ttl = Long.MAX_VALUE; - } else { - // second -> ms adjust for user data - this.ttl *= 1000; - } + // Get TTL + this.ttl = getTTL(family); // used by ScanQueryMatcher long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); - LOG.info("time to purge deletes set to " + timeToPurgeDeletes + + LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes + "ms in store " + this); + // Why not just pass a HColumnDescriptor in here altogether? Even if have + // to clone it? scanInfo = new ScanInfo(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family.getKeepDeletedCells(), timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); - this.storeNameStr = getColumnFamilyName(); // By default, compact if storefile.count >= minFilesToCompact this.minFilesToCompact = Math.max(2, @@ -241,7 +220,6 @@ public class Store extends SchemaConfigured implements HeapSize { this.region.memstoreFlushSize); this.maxCompactSize = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); - this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10); this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); @@ -256,6 +234,48 @@ public class Store extends SchemaConfigured implements HeapSize { this.checksumType = getChecksumType(conf); // initilize bytes per checksum this.bytesPerChecksum = getBytesPerChecksum(conf); + // Create a compaction tool instance + this.compactionTool = new CompactionTool(); + this.compactionTool.setConf(this.conf); + } + + /** + * @param family + * @return + */ + long getTTL(final HColumnDescriptor family) { + // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds. + long ttl = family.getTimeToLive(); + if (ttl == HConstants.FOREVER) { + // Default is unlimited ttl. + ttl = Long.MAX_VALUE; + } else if (ttl == -1) { + ttl = Long.MAX_VALUE; + } else { + // Second -> ms adjust for user data + ttl *= 1000; + } + return ttl; + } + + /** + * Create this store's homedir + * @param fs + * @param homedir + * @return Return homedir + * @throws IOException + */ + Path createStoreHomeDir(final FileSystem fs, + final Path homedir) throws IOException { + if (!fs.exists(homedir)) { + if (!fs.mkdirs(homedir)) + throw new IOException("Failed create of: " + homedir.toString()); + } + return homedir; + } + + FileSystem getFileSystem() { + return this.fs; } /** @@ -316,7 +336,7 @@ public class Store extends SchemaConfigured implements HeapSize { * Return the directory in which this store stores its * StoreFiles */ - public Path getHomedir() { + Path getHomedir() { return homedir; } @@ -335,6 +355,10 @@ public class Store extends SchemaConfigured implements HeapSize { this.dataBlockEncoder = blockEncoder; } + FileStatus [] getStoreFiles() throws IOException { + return FSUtils.listStatus(this.fs, this.homedir, null); + } + /** * Creates an unsorted list of StoreFile loaded in parallel * from the given directory. @@ -342,7 +366,7 @@ public class Store extends SchemaConfigured implements HeapSize { */ private List loadStoreFiles() throws IOException { ArrayList results = new ArrayList(); - FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null); + FileStatus files[] = getStoreFiles(); if (files == null || files.length == 0) { return results; @@ -630,7 +654,7 @@ public class Store extends SchemaConfigured implements HeapSize { storeFileCloserThreadPool.shutdownNow(); } } - LOG.debug("closed " + this.storeNameStr); + LOG.info("Closed " + this); return result; } finally { this.lock.writeLock().unlock(); @@ -806,7 +830,7 @@ public class Store extends SchemaConfigured implements HeapSize { */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) throws IOException { - return createWriterInTmp(maxKeyCount, this.compression, false); + return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); } /* @@ -815,7 +839,7 @@ public class Store extends SchemaConfigured implements HeapSize { * @param isCompaction whether we are creating a new file in a compaction * @return Writer for a new StoreFile in the tmp dir. */ - private StoreFile.Writer createWriterInTmp(int maxKeyCount, + StoreFile.Writer createWriterInTmp(int maxKeyCount, Compression.Algorithm compression, boolean isCompaction) throws IOException { final CacheConfig writerCacheConf; @@ -980,15 +1004,15 @@ public class Store extends SchemaConfigured implements HeapSize { // Ready to go. Have list of files to compact. LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " - + this.storeNameStr + " of " + + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize=" + StringUtils.humanReadableInt(cr.getSize())); StoreFile sf = null; try { - StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(), - maxId); + StoreFile.Writer writer = + this.compactionTool.compact(this, filesToCompact, cr.isMajor(), maxId); // Move the compaction into place. sf = completeCompaction(filesToCompact, writer); if (region.getCoprocessorHost() != null) { @@ -1001,7 +1025,7 @@ public class Store extends SchemaConfigured implements HeapSize { } LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " - + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of " + + filesToCompact.size() + " file(s) in " + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into " + (sf == null ? "none" : sf.getPath().getName()) + @@ -1048,7 +1072,8 @@ public class Store extends SchemaConfigured implements HeapSize { try { // Ready to go. Have list of files to compact. - StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId); + StoreFile.Writer writer = + this.compactionTool.compact(this, filesToCompact, isMajor, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); if (region.getCoprocessorHost() != null) { @@ -1097,10 +1122,10 @@ public class Store extends SchemaConfigured implements HeapSize { } /** getter for CompactionProgress object - * @return CompactionProgress object + * @return CompactionProgress object; can be null */ public CompactionProgress getCompactionProgress() { - return this.progress; + return this.compactionTool.getProgress(); } /* @@ -1152,19 +1177,19 @@ public class Store extends SchemaConfigured implements HeapSize { if (sf.isMajorCompaction() && (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this.storeNameStr + + LOG.debug("Skipping major compaction of " + this + " because one (major) compacted file only and oldestTime " + oldest + "ms is < ttl=" + this.ttl); } } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store " + this.storeNameStr + + LOG.debug("Major compaction triggered on store " + this + "; time since last major compaction " + (now - lowTimestamp) + "ms"); } result = true; @@ -1344,7 +1369,7 @@ public class Store extends SchemaConfigured implements HeapSize { if (compactSelection.getFilesToCompact().isEmpty()) { LOG.debug(this.getHRegionInfo().getEncodedName() + " - " + - this.storeNameStr + ": no store files to compact"); + this + ": no store files to compact"); compactSelection.emptyFileList(); return compactSelection; } @@ -1420,7 +1445,7 @@ public class Store extends SchemaConfigured implements HeapSize { // if we don't have enough files to compact, just wait if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipped compaction of " + this.storeNameStr + LOG.debug("Skipped compaction of " + this + ". Only " + (end - start) + " file(s) of size " + StringUtils.humanReadableInt(totalSize) + " have met compaction criteria."); @@ -1440,142 +1465,6 @@ public class Store extends SchemaConfigured implements HeapSize { } /** - * Do a minor/major compaction on an explicit set of storefiles in a Store. - * Uses the scan infrastructure to make it easy. - * - * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @param maxId Readers maximum sequence id. - * @return Product of compaction or null if all cells expired or deleted and - * nothing made it through the compaction. - * @throws IOException - */ - StoreFile.Writer compactStore(final Collection filesToCompact, - final boolean majorCompaction, final long maxId) - throws IOException { - // calculate maximum key count after compaction (for blooms) - int maxKeyCount = 0; - long earliestPutTs = HConstants.LATEST_TIMESTAMP; - for (StoreFile file : filesToCompact) { - StoreFile.Reader r = file.getReader(); - if (r != null) { - // NOTE: getFilterEntries could cause under-sized blooms if the user - // switches bloom type (e.g. from ROW to ROWCOL) - long keyCount = (r.getBloomFilterType() == family.getBloomFilterType()) - ? r.getFilterEntries() : r.getEntries(); - maxKeyCount += keyCount; - if (LOG.isDebugEnabled()) { - LOG.debug("Compacting " + file + - ", keycount=" + keyCount + - ", bloomtype=" + r.getBloomFilterType().toString() + - ", size=" + StringUtils.humanReadableInt(r.length()) + - ", encoding=" + r.getHFileReader().getEncodingOnDisk()); - } - } - // For major compactions calculate the earliest put timestamp - // of all involved storefiles. This is used to remove - // family delete marker during the compaction. - if (majorCompaction) { - byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS); - if (tmp == null) { - // there's a file with no information, must be an old one - // assume we have very old puts - earliestPutTs = HConstants.OLDEST_TIMESTAMP; - } else { - earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp)); - } - } - } - - // keep track of compaction progress - progress = new CompactionProgress(maxKeyCount); - - // For each file, obtain a scanner: - List scanners = StoreFileScanner - .getScannersForStoreFiles(filesToCompact, false, false, true); - - // Make the instantiation lazy in case compaction produces no product; i.e. - // where all source cells are expired or deleted. - StoreFile.Writer writer = null; - // Find the smallest read point across all the Scanners. - long smallestReadPoint = region.getSmallestReadPoint(); - MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); - try { - InternalScanner scanner = null; - try { - Scan scan = new Scan(); - scan.setMaxVersions(family.getMaxVersions()); - /* include deletes, unless we are doing a major compaction */ - scanner = new StoreScanner(this, scan, scanners, - majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, - smallestReadPoint, earliestPutTs); - if (region.getCoprocessorHost() != null) { - InternalScanner cpScanner = region.getCoprocessorHost().preCompact( - this, scanner); - // NULL scanner returned from coprocessor hooks means skip normal processing - if (cpScanner == null) { - return null; - } - - scanner = cpScanner; - } - - int bytesWritten = 0; - // since scanner.next() can return 'false' but still be delivering data, - // we have to use a do/while loop. - ArrayList kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - boolean hasMore; - do { - hasMore = scanner.next(kvs, this.compactionKVMax); - if (writer == null && !kvs.isEmpty()) { - writer = createWriterInTmp(maxKeyCount, this.compactionCompression, - true); - } - if (writer != null) { - // output to writer: - for (KeyValue kv : kvs) { - if (kv.getMemstoreTS() <= smallestReadPoint) { - kv.setMemstoreTS(0); - } - writer.append(kv); - // update progress per key - ++progress.currentCompactedKVs; - - // check periodically to see if a system stop is requested - if (Store.closeCheckInterval > 0) { - bytesWritten += kv.getLength(); - if (bytesWritten > Store.closeCheckInterval) { - bytesWritten = 0; - if (!this.region.areWritesEnabled()) { - writer.close(); - fs.delete(writer.getPath(), false); - throw new InterruptedIOException( - "Aborting compaction of store " + this + - " in region " + this.region + - " because user requested stop."); - } - } - } - } - } - kvs.clear(); - } while (hasMore); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } finally { - if (writer != null) { - writer.appendMetadata(maxId, majorCompaction); - writer.close(); - } - } - return writer; - } - - /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. * @@ -1677,7 +1566,7 @@ public class Store extends SchemaConfigured implements HeapSize { } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files in " + this.storeNameStr + + LOG.error("Failed replacing compacted files in " + this + ". Compacted file is " + (result == null? "none": result.toString()) + ". Files replaced " + compactedFiles.toString() + " some of which may have been already removed", e); @@ -1962,7 +1851,7 @@ public class Store extends SchemaConfigured implements HeapSize { return mk.getRow(); } } catch(IOException e) { - LOG.warn("Failed getting store size for " + this.storeNameStr, e); + LOG.warn("Failed getting store size for " + this, e); } finally { this.lock.readLock().unlock(); } @@ -2008,7 +1897,7 @@ public class Store extends SchemaConfigured implements HeapSize { @Override public String toString() { - return this.storeNameStr; + return getColumnFamilyName(); } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 553eee0..4444d42 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -52,5 +52,4 @@ public class CompactionProgress { public float getProgressPct() { return currentCompactedKVs / totalCompactingKVs; } - } diff --git src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java index d2329e1..885625b 100644 --- src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java +++ src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java @@ -25,9 +25,6 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.ChecksumFactory; - /** * Checksum types. The Checksum type is a one byte number * that stores a representation of the checksum algorithm @@ -70,7 +67,7 @@ public enum ChecksumType { ctor = ChecksumFactory.newConstructor(PURECRC32); LOG.info("Checksum using " + PURECRC32); } catch (Exception e) { - LOG.info(PURECRC32 + " not available."); + LOG.trace(PURECRC32 + " not available."); } try { // The default checksum class name is java.util.zip.CRC32. @@ -80,7 +77,7 @@ public enum ChecksumType { LOG.info("Checksum can use " + JDKCRC); } } catch (Exception e) { - LOG.warn(JDKCRC + " not available. ", e); + LOG.trace(JDKCRC + " not available."); } } @@ -113,7 +110,7 @@ public enum ChecksumType { ctor = ChecksumFactory.newConstructor(PURECRC32C); LOG.info("Checksum can use " + PURECRC32C); } catch (Exception e) { - LOG.info(PURECRC32C + " not available. "); + LOG.trace(PURECRC32C + " not available."); } } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java index a929e31..33e4e71 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.zookeeper.KeeperException; /** * Tracks the root region server location node in zookeeper. diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 7f97b01..0078ebc 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -523,7 +523,9 @@ public class ZKUtil { logRetrievedMsg(zkw, znode, data, watcherSet); return data; } catch (KeeperException.NoNodeException e) { - LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " + + // This log can get pretty annoying when we cycle on 100ms waits. + // Enable trace if you really want to see it. + LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " + "because node does not exist (not an error)")); return null; } catch (KeeperException e) { diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 79b6604..0f83655 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -80,9 +78,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // negotiation to complete public CountDownLatch saslLatch = new CountDownLatch(1); - // set of unassigned nodes watched - private Set unassignedNodes = new HashSet(); - // node names // base znode for this cluster @@ -179,10 +174,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { } } - private boolean isFinishedRetryingRecoverable(final long finished) { - return System.currentTimeMillis() < finished; - } - @Override public String toString() { return this.identifier; diff --git src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java index 90fa45a..533b2bf 100644 --- src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java +++ src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java @@ -340,7 +340,7 @@ public class TestCatalogTracker { // Now test waiting on root location getting set. Thread t = new WaitOnMetaThread(ct); - startWaitAliveThenWaitItLives(t, 1000); + startWaitAliveThenWaitItLives(t, 1); // Set a root location. hsa = setRootLocation(); // Join the thread... should exit shortly. @@ -511,12 +511,15 @@ public class TestCatalogTracker { } void doWaiting() throws InterruptedException { - this.ct.waitForRoot(); + try { + while (this.ct.waitForRoot(100) == null); + } catch (NotAllMetaRegionsOnlineException e) { + // Ignore. + } } } @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java new file mode 100644 index 0000000..d2b3060 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.MultiResponse; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Exec; +import org.apache.hadoop.hbase.client.coprocessor.ExecResult; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; +import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.CompactionRequestor; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * A mock RegionServer implementation. + * Use this when you can't bend Mockito to your liking (e.g. return null result + * when 'scanning' until master timesout and then return a coherent meta row + * result thereafter. Have some facility for faking gets and scans. See + * {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data + * store that the get pulls from. + */ +class MockRegionServer implements HRegionInterface, RegionServerServices { + private final ServerName sn; + private final ZooKeeperWatcher zkw; + private final Configuration conf; + private final Random random = new Random(); + + /** + * Map of regions to map of rows and {@link Results}. Used as data source when + * {@link MockRegionServer#get(byte[], Get)} is called. Because we have a byte + * key, need to use TreeMap and provide a Comparator. Use + * {@link #setGetResult(byte[], byte[], Result)} filling this map. + */ + private final Map> gets = + new TreeMap>(Bytes.BYTES_COMPARATOR); + + /** + * Map of regions to results to return when scanning. + */ + private final Map nexts = + new TreeMap(Bytes.BYTES_COMPARATOR); + + /** + * Data structure that holds regionname and index used scanning. + */ + class RegionNameAndIndex { + private final byte[] regionName; + private int index = 0; + + RegionNameAndIndex(final byte[] regionName) { + this.regionName = regionName; + } + + byte[] getRegionName() { + return this.regionName; + } + + int getThenIncrement() { + int currentIndex = this.index; + this.index++; + return currentIndex; + } + } + + /** + * Outstanding scanners and their offset into nexts + */ + private final Map scannersAndOffsets = + new HashMap(); + + /** + * @param sn Name of this mock regionserver + * @throws IOException + * @throws ZooKeeperConnectionException + */ + MockRegionServer(final Configuration conf, final ServerName sn) + throws ZooKeeperConnectionException, IOException { + this.sn = sn; + this.conf = conf; + this.zkw = new ZooKeeperWatcher(conf, sn.toString(), this, true); + } + + /** + * Use this method filling the backing data source used by {@link #get(byte[], Get)} + * @param regionName + * @param row + * @param r + */ + void setGetResult(final byte [] regionName, final byte [] row, final Result r) { + Map value = this.gets.get(regionName); + if (value == null) { + // If no value already, create one. Needs to be treemap because we are + // using byte array as key. Not thread safe. + value = new TreeMap(Bytes.BYTES_COMPARATOR); + this.gets.put(regionName, value); + } + value.put(row, r); + } + + /** + * Use this method to set what a scanner will reply as we next through + * @param regionName + * @param rs + */ + void setNextResults(final byte [] regionName, final Result [] rs) { + this.nexts.put(regionName, rs); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isStopped() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void abort(String why, Throwable e) { + throw new RuntimeException(this.sn + ": " + why, e); + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public HRegionInfo getRegionInfo(byte[] regionName) { + // Just return this. Calls to getRegionInfo are usually to test connection + // to regionserver does reasonable things so should be safe to return + // anything. + return HRegionInfo.ROOT_REGIONINFO; + } + + @Override + public void flushRegion(byte[] regionName) throws IllegalArgumentException, + IOException { + // TODO Auto-generated method stub + } + + @Override + public void flushRegion(byte[] regionName, long ifOlderThanTS) + throws IllegalArgumentException, IOException { + // TODO Auto-generated method stub + } + + @Override + public long getLastFlushTime(byte[] regionName) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public List getStoreFileList(byte[] regionName, byte[] columnFamily) + throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getStoreFileList(byte[] regionName, + byte[][] columnFamilies) throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getStoreFileList(byte[] regionName) + throws IllegalArgumentException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result getClosestRowBefore(byte[] regionName, byte[] row, + byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result get(byte[] regionName, Get get) throws IOException { + Map m = this.gets.get(regionName); + if (m == null) return null; + return m.get(get.getRow()); + } + + @Override + public boolean exists(byte[] regionName, Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void put(byte[] regionName, Put put) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public int put(byte[] regionName, List puts) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void delete(byte[] regionName, Delete delete) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public int delete(byte[] regionName, List deletes) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public long incrementColumnValue(byte[] regionName, byte[] row, + byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Result append(byte[] regionName, Append append) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result increment(byte[] regionName, Increment increment) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + long scannerId = this.random.nextLong(); + this.scannersAndOffsets.put(scannerId, new RegionNameAndIndex(regionName)); + return scannerId; + } + + @Override + public Result next(long scannerId) throws IOException { + RegionNameAndIndex rnai = this.scannersAndOffsets.get(scannerId); + int index = rnai.getThenIncrement(); + Result [] results = this.nexts.get(rnai.getRegionName()); + if (results == null) return null; + return index < results.length? results[index]: null; + } + + @Override + public Result [] next(long scannerId, int numberOfRows) throws IOException { + // Just return one result whatever they ask for. + Result r = next(scannerId); + return r == null? null: new Result [] {r}; + } + + @Override + public void close(final long scannerId) throws IOException { + this.scannersAndOffsets.remove(scannerId); + } + + @Override + public long lockRow(byte[] regionName, byte[] row) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void unlockRow(byte[] regionName, long lockId) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public List getOnlineRegions() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HServerInfo getHServerInfo() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public MultiResponse multi(MultiAction multi) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean bulkLoadHFiles(List> familyPaths, + byte[] regionName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public RegionOpeningState openRegion(HRegionInfo region) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RegionOpeningState openRegion(HRegionInfo region, + int versionOfOfflineNode) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void openRegions(List regions) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public boolean closeRegion(HRegionInfo region) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean closeRegion(HRegionInfo region, int versionOfClosingNode) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean closeRegion(HRegionInfo region, boolean zk) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean closeRegion(byte[] encodedRegionName, boolean zk) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void flushRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + } + + @Override + public void splitRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + } + + @Override + public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + } + + @Override + public void compactRegion(HRegionInfo regionInfo, boolean major) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + } + + @Override + public void replicateLogEntries(Entry[] entries) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public ExecResult execCoprocessor(byte[] regionName, Exec call) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, + WritableByteArrayComparable comparator, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, + WritableByteArrayComparable comparator, Delete delete) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public List getBlockCacheColumnFamilySummaries() + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[][] rollHLogWriter() throws IOException, + FailedLogCloseException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void stop(String why) { + this.zkw.close(); + } + + @Override + public void addToOnlineRegions(HRegion r) { + // TODO Auto-generated method stub + } + + @Override + public boolean removeFromOnlineRegions(String encodedRegionName) { + // TODO Auto-generated method stub + return false; + } + + @Override + public HRegion getFromOnlineRegions(String encodedRegionName) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List getOnlineRegions(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void refreshRegion(HRegion hRegion) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return this.zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + // TODO Auto-generated method stub + return null; + } + + @Override + public ServerName getServerName() { + return this.sn; + } + + @Override + public boolean isStopping() { + return false; + } + + @Override + public HLog getWAL() { + // TODO Auto-generated method stub + return null; + } + + @Override + public CompactionRequestor getCompactionRequester() { + // TODO Auto-generated method stub + return null; + } + + @Override + public FlushRequester getFlushRequester() { + // TODO Auto-generated method stub + return null; + } + + @Override + public RegionServerAccounting getRegionServerAccounting() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void postOpenDeployTasks(HRegion r, CatalogTracker ct, boolean daughter) + throws KeeperException, IOException { + // TODO Auto-generated method stub + } + + @Override + public RpcServer getRpcServer() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map getRegionsInTransitionInRS() { + // TODO Auto-generated method stub + return null; + } + + @Override + public FileSystem getFileSystem() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void mutateRow(byte[] regionName, RowMutations rm) throws IOException { + // TODO Auto-generated method stub + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/master/Mocking.java src/test/java/org/apache/hadoop/hbase/master/Mocking.java new file mode 100644 index 0000000..676d6bb --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/master/Mocking.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertNotSame; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Package scoped mocking utility. + */ +public class Mocking { + /** + * @param sn ServerName to use making startcode and server in meta + * @param hri Region to serialize into HRegionInfo + * @return A mocked up Result that fakes a Get on a row in the + * .META. table. + * @throws IOException + */ + static Result getMetaTableRowResult(final HRegionInfo hri, + final ServerName sn) + throws IOException { + // TODO: Move to a utilities class. More than one test case can make use + // of this facility. + List kvs = new ArrayList(); + kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, + HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(hri))); + kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, + HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, + Bytes.toBytes(sn.getHostAndPort()))); + kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, + HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, + Bytes.toBytes(sn.getStartcode()))); + return new Result(kvs); + } + + /** + * Fakes the regionserver-side zk transitions of a region open. + * @param w ZooKeeperWatcher to use. + * @param sn Name of the regionserver doing the 'opening' + * @param hri Region we're 'opening'. + * @throws KeeperException + */ + static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w, + final ServerName sn, final HRegionInfo hri) + throws KeeperException { + // Wait till we see the OFFLINE zk node before we proceed. + while (!ZKAssign.verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) { + Threads.sleep(1); + } + // Get current versionid else will fail on transition from OFFLINE to OPENING below + int versionid = ZKAssign.getVersion(w, hri); + assertNotSame(-1, versionid); + // This uglyness below is what the openregionhandler on RS side does. I + // looked at exposing the method over in openregionhandler but its just a + // one liner and its deep over in another package so just repeat it below. + versionid = ZKAssign.transitionNode(w, hri, sn, + EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_OPENING, versionid); + assertNotSame(-1, versionid); + // Move znode from OPENING to OPENED as RS does on successful open. + versionid = ZKAssign.transitionNodeOpened(w, hri, sn, versionid); + assertNotSame(-1, versionid); + // We should be done now. The master open handler will notice the + // transition and remove this regions znode. + } +} diff --git src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 841649a..91dce36 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -145,7 +144,7 @@ public class TestAssignmentManager { /** * Test a balance going on at same time as a master failover - * + * * @throws IOException * @throws KeeperException * @throws InterruptedException @@ -383,7 +382,7 @@ public class TestAssignmentManager { // Make an RS Interface implementation. Make it so a scanner can go against it. HRegionInterface implementation = Mockito.mock(HRegionInterface.class); // Get a meta row result that has region up on SERVERNAME_A - Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A); Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())). thenReturn(System.currentTimeMillis()); // Return a good result first and then return null to indicate end of scan @@ -420,31 +419,6 @@ public class TestAssignmentManager { } /** - * @param sn ServerName to use making startcode and server in meta - * @param hri Region to serialize into HRegionInfo - * @return A mocked up Result that fakes a Get on a row in the - * .META. table. - * @throws IOException - */ - private Result getMetaTableRowResult(final HRegionInfo hri, - final ServerName sn) - throws IOException { - // TODO: Move to a utilities class. More than one test case can make use - // of this facility. - List kvs = new ArrayList(); - kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, - HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, - Writables.getBytes(hri))); - kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, - HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort()))); - kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, - HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode()))); - return new Result(kvs); - } - - /** * Create and startup executor pools. Start same set as master does (just * run a few less). * @param name Name to give our executor @@ -507,8 +481,8 @@ public class TestAssignmentManager { * @param region region to be created as offline * @param serverName server event originates from * @return Version of znode created. - * @throws KeeperException - * @throws IOException + * @throws KeeperException + * @throws IOException */ // Copied from SplitTransaction rather than open the method over there in // the regionserver package. @@ -567,7 +541,7 @@ public class TestAssignmentManager { // with an encoded name by doing a Get on .META. HRegionInterface ri = Mockito.mock(HRegionInterface.class); // Get a meta row result that has region up on SERVERNAME_A for REGIONINFO - Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A); Mockito.when(ri .openScanner((byte[]) Mockito.any(), (Scan) Mockito.any())). thenReturn(System.currentTimeMillis()); // Return good result 'r' first and then return null to indicate end of scan diff --git src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java new file mode 100644 index 0000000..1f7853e --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.RootLocationEditor; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionTestingUtility; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Standup the master and fake it to test various aspects of master function. + * Does NOT spin up a mini hbase nor mini dfs cluster testing master (it does + * put up a zk cluster but this is usually pretty fast compared). Also, should + * be possible to inject faults at points difficult to get at in cluster context. + * TODO: Speed up the zk connection by Master. It pauses 5 seconds establishing + * session. + */ +public class TestMasterNoCluster { + private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration c = TESTUTIL.getConfiguration(); + // We use local filesystem. Set it so it writes into the testdir. + c.set(HConstants.HBASE_DIR, TESTUTIL.getDataTestDir().toString()); + // Startup a mini zk cluster. + TESTUTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TESTUTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDown() + throws KeeperException, ZooKeeperConnectionException, IOException { + // Make sure zk is clean before we run the next test. + ZooKeeperWatcher zkw = new ZooKeeperWatcher(TESTUTIL.getConfiguration(), + "@Before", new Abortable() { + @Override + public void abort(String why, Throwable e) { + throw new RuntimeException(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + }); + ZKUtil.deleteNodeRecursively(zkw, zkw.baseZNode); + zkw.close(); + } + + /** + * Test starting master then stopping it before its fully up. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testStopDuringStart() + throws IOException, KeeperException, InterruptedException { + HMaster master = new HMaster(TESTUTIL.getConfiguration()); + master.start(); + // Immediately have it stop. We used hang in assigning root. + master.stopMaster(); + master.join(); + } + + /** + * Test master failover. + * Start up three fake regionservers and a master. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testFailover() + throws IOException, KeeperException, InterruptedException { + final long now = System.currentTimeMillis(); + // Names for our three servers. Make the port numbers match hostname. + // Will come in use down in the server when we need to figure how to respond. + final ServerName sn0 = new ServerName("0.example.org", 0, now); + final ServerName sn1 = new ServerName("1.example.org", 1, now); + final ServerName sn2 = new ServerName("2.example.org", 2, now); + final ServerName [] sns = new ServerName [] {sn0, sn1, sn2}; + // Put up the mock servers + final Configuration conf = TESTUTIL.getConfiguration(); + final MockRegionServer rs0 = new MockRegionServer(conf, sn0); + final MockRegionServer rs1 = new MockRegionServer(conf, sn1); + final MockRegionServer rs2 = new MockRegionServer(conf, sn2); + // Put some data into the servers. Make it look like sn0 has the root + // w/ an entry that points to sn1 as the host of .META. Put data into sn2 + // so it looks like it has a few regions for a table named 't'. + RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName()); + byte [] rootregion = Bytes.toBytes("-ROOT-,,0"); + rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), + Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO, + rs1.getServerName())); + final byte [] tableName = Bytes.toBytes("t"); + Result [] results = new Result [] { + Mocking.getMetaTableRowResult( + new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HBaseTestingUtility.KEYS[1]), + rs2.getServerName()), + Mocking.getMetaTableRowResult( + new HRegionInfo(tableName, HBaseTestingUtility.KEYS[1], HBaseTestingUtility.KEYS[2]), + rs2.getServerName()), + Mocking.getMetaTableRowResult(new HRegionInfo(tableName, HBaseTestingUtility.KEYS[2], + HConstants.EMPTY_END_ROW), + rs2.getServerName()) + }; + rs1.setNextResults(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), results); + + // Create master. Subclass to override a few methods so we can insert mocks + // and get notification on transitions. We need to fake out any rpcs the + // master does opening/closing regions. Also need to fake out the address + // of the 'remote' mocked up regionservers. + HMaster master = new HMaster(conf) { + InetAddress getRemoteInetAddress(final int port, final long serverStartCode) + throws UnknownHostException { + // Return different address dependent on port passed. + ServerName sn = sns[port]; + return InetAddress.getByAddress(sn.getHostname(), + new byte [] {10, 0, 0, (byte)sn.getPort()}); + } + + @Override + ServerManager createServerManager(Server master, MasterServices services) + throws IOException { + ServerManager sm = super.createServerManager(master, services); + // Spy on the created servermanager + ServerManager spy = Mockito.spy(sm); + // Fake a successful open. + Mockito.doReturn(RegionOpeningState.OPENED).when(spy). + sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(), + Mockito.anyInt()); + return spy; + } + + @Override + CatalogTracker createCatalogTracker(ZooKeeperWatcher zk, + Configuration conf, Abortable abortable, int defaultTimeout) + throws IOException { + // Insert a mock for the connection used by the CatalogTracker. Any + // regionserver should do. Use TESTUTIL.getConfiguration rather than + // the conf from the master; the conf will already have an HConnection + // associate so the below mocking of a connection will fail. + HConnection connection = + HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(), + rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO); + return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout); + } + }; + master.start(); + + try { + // Wait till master is up ready for RPCs. + while (!master.isRpcServerOpen()) Threads.sleep(10); + // Fake master that there are regionservers out there. Report in. + for (int i = 0; i < sns.length; i++) { + master.regionServerReport(sns[i].getVersionedBytes(), new HServerLoad()); + } + // Master should now come up. + while (!master.isInitialized()) {Threads.sleep(10);} + assertTrue(master.isInitialized()); + } finally { + rs0.stop("Test is done"); + rs1.stop("Test is done"); + rs2.stop("Test is done"); + master.stopMaster(); + master.join(); + } + } + + /** + * Test starting master getting it up post initialized state using mocks. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testCatalogDeploys() + throws IOException, KeeperException, InterruptedException { + final Configuration conf = TESTUTIL.getConfiguration(); + final long now = System.currentTimeMillis(); + // Name for our single mocked up regionserver. + final ServerName sn = new ServerName("0.example.org", 0, now); + // Here is our mocked up regionserver. Create it now. Need it setting up + // master next. + final MockRegionServer rs0 = new MockRegionServer(conf, sn); + + // Create master. Subclass to override a few methods so we can insert mocks + // and get notification on transitions. We need to fake out any rpcs the + // master does opening/closing regions. Also need to fake out the address + // of the 'remote' mocked up regionservers. + HMaster master = new HMaster(conf) { + InetAddress getRemoteInetAddress(final int port, final long serverStartCode) + throws UnknownHostException { + // Interject an unchecked, nonsense InetAddress; i.e. no resolve. + return InetAddress.getByAddress(rs0.getServerName().getHostname(), + new byte [] {10, 0, 0, 0}); + } + + @Override + ServerManager createServerManager(Server master, MasterServices services) + throws IOException { + ServerManager sm = super.createServerManager(master, services); + // Spy on the created servermanager + ServerManager spy = Mockito.spy(sm); + // Fake a successful open. + Mockito.doReturn(RegionOpeningState.OPENED).when(spy). + sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(), + Mockito.anyInt()); + return spy; + } + + @Override + CatalogTracker createCatalogTracker(ZooKeeperWatcher zk, + Configuration conf, Abortable abortable, int defaultTimeout) + throws IOException { + // Insert a mock for the connection used by the CatalogTracker. Use + // TESTUTIL.getConfiguration rather than the conf from the master; the + // conf will already have an HConnection associate so the below mocking + // of a connection will fail. + HConnection connection = + HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(), + rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO); + return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout); + } + }; + master.start(); + + try { + // Wait till master is up ready for RPCs. + while (!master.isRpcServerOpen()) Threads.sleep(10); + // Fake master that there is a regionserver out there. Report in. + MapWritable mw = master.regionServerStartup(rs0.getServerName().getPort(), + rs0.getServerName().getStartcode(), now); + // Assert hostname is as expected. + String rshostname = + mw.get(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)).toString(); + assertEquals(rs0.getServerName().getHostname(), rshostname); + // Now master knows there is at least one regionserver checked in and so + // it'll wait a while to see if more and when none, will assign root and + // meta to this single server. Will do an rpc open but we've + // mocked it above in our master override to return 'success'. As part of + // region open, master will have set an unassigned znode for the region up + // into zk for the regionserver to transition. Lets do that now to + // complete fake of a successful open. + Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(), + rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO); + // Need to set root location as r1. Usually the regionserver does this + // when its figured it just opened the root region by setting the root + // location up into zk. Since we're mocking regionserver, need to do this + // ourselves. + RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName()); + // Do same transitions for .META. (presuming master has by now assigned + // .META. to rs1). + Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(), + rs0.getServerName(), HRegionInfo.FIRST_META_REGIONINFO); + // Now trigger our mock regionserver to start returning a row when we + // go to get .META. entry in -ROOT-. We do it by setting into + // our MockRegionServer some data to be returned when there is a get on + // -ROOT- table (up to this its been returning null making master think + // nothing assigned, not even .META.). The region for -ROOT- table we + // hardcode below. Its always the same, at least in tests. We need to do + // this because CatalogTracker runs inside in Master initialization to + // confirm .META. has a server. + byte [] rootregion = Bytes.toBytes("-ROOT-,,0"); + rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), + Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO, + rs0.getServerName())); + // Master should now come up. + while (!master.isInitialized()) {Threads.sleep(10);} + assertTrue(master.isInitialized()); + } finally { + rs0.stop("Test is done"); + master.stopMaster(); + master.join(); + } + } +} \ No newline at end of file