commit b268c236415cfd872cda697a7f490fa43627a4c0 Author: stack Date: Tue May 19 15:44:15 2015 -0700 test diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 5e76fff..9d45651 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -140,14 +140,18 @@ public class Threads { /** * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns * @param millis How long to sleep for in milliseconds. + * @return True if the thread was interrupted */ - public static void sleep(long millis) { + public static boolean sleep(long millis) { + boolean interrupted = false; try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); + interrupted = true; } + return interrupted; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index acdcf60..19a807b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -68,8 +68,7 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** - * ZooKeeper based implementation of - * {@link org.apache.hadoop.hbase.master.SplitLogManagerCoordination} + * ZooKeeper based implementation of {@link SplitLogManagerCoordination} */ @InterfaceAudience.Private public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @@ -684,7 +683,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements /** * ZooKeeper implementation of - * {@link org.apache.hadoop.hbase.master. + * {@link org.apache.hadoop.hbase.coordination. * SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)} */ @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 637920b..f0a03e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -104,7 +104,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements @Override public void nodeChildrenChanged(String path) { if (path.equals(watcher.splitLogZNode)) { - LOG.debug("tasks arrived or departed"); + LOG.debug("Task arrived or departed; path=" + path); synchronized (taskReadyLock) { taskReadySeq++; taskReadyLock.notify(); @@ -456,10 +456,13 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); try { if (ZKUtil.checkExists(watcher, nodePath) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running region recovery though no znode at " + nodePath); + } server.getExecutorService().submit( new FinishRegionRecoveringHandler(server, region, nodePath)); } else { - // current check is a defensive(or redundant) mechanism to prevent us from + // Current check is a defensive(or redundant) mechanism to prevent us from // having stale recovering regions in our internal RS memory state while // zookeeper(source of truth) says differently. We stop at the first good one // because we should not have a single instance such as this in normal case so diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 808a53f..257e191 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -480,9 +480,9 @@ public class AssignmentManager extends ZooKeeperListener { // TODO: Regions that have a null location and are not in regionsInTransitions // need to be handled. - // Scan hbase:meta to build list of existing regions, servers, and assignment + // Scan hbase:meta to build list of existing regions, servers, and assignment. // Returns servers who have not checked in (assumed dead) that some regions - // were assigned to (according to the meta) + // were assigned to (according to the meta). Set deadServers = rebuildUserRegions(); // This method will assign all user regions if a clean server startup or diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 6a5181d..cab3047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -56,6 +55,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; /** * This class abstracts a bunch of operations the HMaster needs to interact with @@ -130,6 +132,11 @@ public class MasterFileSystem { this.distributedLogReplay = this.splitLogManager.isLogReplaying(); } + @VisibleForTesting + SplitLogManager getSplitLogManager() { + return this.splitLogManager; + } + /** * Create initial layout in filesystem. *
    diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 3fc95cc..cd3cdee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WALFactory; @@ -251,9 +254,8 @@ public class SplitLogManager { logDirs + " for serverName=" + serverNames); FileStatus[] logfiles = getFileList(logDirs, filter); status.setStatus("Checking directory contents..."); - LOG.debug("Scheduling batch of logs to split"); SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); - LOG.info("started splitting " + logfiles.length + " logs in " + logDirs + + LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + " for " + serverNames); long t = EnvironmentEdgeManager.currentTime(); long totalSize = 0; @@ -278,6 +280,13 @@ public class SplitLogManager { // meta or user regions but won't for both( we could have mixed situations in tests) isMetaRecovery = true; } + // For testing. Make it so we can hold the processing here if the flag is set. + while (this.doNotRemoveRecoveringRegions.get()) { + this.arrivedAtRemoveRecoveringRegions.set(true); + if (Threads.sleep(100) || this.server.isAborted()) { + throw new HBaseIOException("TEST code! Aborted!! Short-circuiting split log handling"); + } + } removeRecoveringRegions(serverNames, isMetaRecovery); if (batch.done != batch.installed) { @@ -317,6 +326,22 @@ public class SplitLogManager { } /** + * Set this flag to hold just before removeRecoveringRegions. + * These invasive flags are for tests only. We have them in here because it is too hard + * otherwise influencing behaviors in code this deep in hierarchy. + */ + @VisibleForTesting + AtomicBoolean doNotRemoveRecoveringRegions = new AtomicBoolean(false); + + /** + * Set if the above is set and we are blocked on the above doNotRemoveRecoveringRegions boolean. + * These invasive flags are for tests only. We have them in here because it is too hard + * otherwise influencing behaviors in code this deep in hierarchy. + */ + @VisibleForTesting + AtomicBoolean arrivedAtRemoveRecoveringRegions = new AtomicBoolean(false); + + /** * Add a task entry to coordination if it is not already there. * @param taskname the path of the log to be split * @param batch the batch this task belongs to @@ -546,7 +571,7 @@ public class SplitLogManager { } deadWorkers.add(workerName); } - LOG.info("dead splitlog worker " + workerName); + LOG.info("Dead splitlog worker " + workerName); } void handleDeadWorkers(Set serverNames) { @@ -556,7 +581,7 @@ public class SplitLogManager { } deadWorkers.addAll(serverNames); } - LOG.info("dead splitlog workers " + serverNames); + LOG.info("Dead splitlog worker(s) " + serverNames); } /** @@ -798,7 +823,7 @@ public class SplitLogManager { .getSplitLogManagerCoordination().getLastRecoveryTime(); if (!failedRecoveringRegionDeletions.isEmpty() || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) { - // inside the function there have more checks before GC anything + // Inside the function there have more checks before GC anything if (!failedRecoveringRegionDeletions.isEmpty()) { List, Boolean>> previouslyFailedDeletions = new ArrayList, Boolean>>(failedRecoveringRegionDeletions); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 586ca5d..c41b9eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1154,7 +1154,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public void setRecovering(boolean newState) { boolean wasRecovering = this.recovering; - // before we flip the recovering switch (enabling reads) we should write the region open + // Before we flip the recovering switch (enabling reads) we should write the region open // event to WAL if needed if (wal != null && getRegionServerServices() != null && !writestate.readOnly && wasRecovering && !newState) { @@ -1811,6 +1811,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return flushcache(force, false); } + private FlushResult createCannotFlushResult(final String msg) { + if (LOG.isDebugEnabled()) LOG.debug(msg); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + } + + @VisibleForTesting + public static final String HBASE_REGION_FLUSH_KEY = "hbase.region.flush"; + /** * Flush the cache. * @@ -1836,9 +1844,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { - String msg = "Skipping flush on " + this + " because closing"; - LOG.debug(msg); - return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + return createCannotFlushResult("Skipping flush on " + this + " because closing"); + } + // For testing to disable flushing. + if (this.conf.getBoolean(HBASE_REGION_FLUSH_KEY, true)) { + return createCannotFlushResult("Skipping flush on " + this + + " because " + HBASE_REGION_FLUSH_KEY + "=false"); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1847,9 +1858,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { if (this.closed.get()) { String msg = "Skipping flush on " + this + " because closed"; - LOG.debug(msg); status.abort(msg); - return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + return createCannotFlushResult(msg); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java index 19838d3..fcecf92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java @@ -49,8 +49,7 @@ public class FinishRegionRecoveringHandler extends EventHandler { Region region = this.rss.getRecoveringRegions().remove(regionName); if (region != null) { ((HRegion)region).setRecovering(false); - LOG.info(path + " deleted; " + regionName + " recovered."); + LOG.info("Set " + regionName + " as recovered"); } } - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c55280b..46b4ae1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -286,8 +286,8 @@ public class WALSplitter { Reader in = null; try { long logLength = logfile.getLen(); - LOG.info("Splitting wal: " + logPath + ", length=" + logLength); - LOG.info("DistributedLogReplay = " + this.distributedLogReplay); + LOG.info("Splitting WAL=" + logPath + ", length=" + logLength + ", DistributedLogReplay=" + + this.distributedLogReplay); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java index 5452de1..d102209 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java @@ -43,17 +43,17 @@ public class TestDeadServer { DeadServer ds = new DeadServer(); ds.add(hostname123); assertTrue(ds.areDeadServersInProgress()); - ds.finish(hostname123); + ds.finish(hostname123, "test 1"); assertFalse(ds.areDeadServersInProgress()); ds.add(hostname1234); assertTrue(ds.areDeadServersInProgress()); - ds.finish(hostname1234); + ds.finish(hostname1234, "test 2"); assertFalse(ds.areDeadServersInProgress()); ds.add(hostname12345); assertTrue(ds.areDeadServersInProgress()); - ds.finish(hostname12345); + ds.finish(hostname12345, "test 3"); assertFalse(ds.areDeadServersInProgress()); // Already dead = 127.0.0.1,9090,112321 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogReplay.java new file mode 100644 index 0000000..3d452bf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogReplay.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestDistributedLogReplay { + private final static TableName TABLE_NAME = TableName.valueOf("t"); + private final static byte[] FAMILY = Bytes.toBytes("f"); + + /** + * Ugly test that holds the processing of a dead server just after it has finished log splitting. + * We then kill the master so it cannot complete cleaning up the zk nodes that stop region reads. + * The test will fail by timing out with regions reporting: + * org.apache.hadoop.hbase.exceptions.RegionInRecoveryException: + * hbase:meta,,1.1588230740 is recovering; cannot take reads + * @throws Exception + */ + @Test (timeout=300000) + public void testKillMasterAfterSplitLogsBeforeRemoveRegionNoReadZKNode() throws Exception { + final HBaseTestingUtility htu = new HBaseTestingUtility(); + try { + // Make a cluster that rolls its logs frequently so I have many of them and make it so the + // regionserver cannot flush so the WALs stick around. Also set no UI, maximum logs up from + // default, disable the balancer and turn off the running of the background split log + // cleanup timeout monitor. + htu.getConfiguration().setBoolean(HRegion.HBASE_REGION_FLUSH_KEY, false); + htu.getConfiguration().setLong("hbase.regionserver.hlog.blocksize", 1024 * 64); + htu.getConfiguration().setInt("hbase.regionserver.maxlogs", 1024); + htu.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, -1); + htu.getConfiguration().setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); + htu.getConfiguration(). + setInt("hbase.splitlog.manager.timeoutmonitor.period", Integer.MAX_VALUE); + // Start one regionserver only. + htu.startMiniCluster(1); + HRegionServer hrs = htu.getHBaseCluster().getRegionServer(0); + // Verify one WAL for the currently running RS. + assertTrue(getCountOfServerWALS(htu.getConfiguration(), hrs.getServerName()) >= 1); + try (HTable t = htu.createMultiRegionTable(TABLE_NAME, FAMILY)) { + htu.loadTable(t, FAMILY); + } + // The above loading writes more than 7 logs, usually 9. + int walCount = getCountOfServerWALS(htu.getConfiguration(), hrs.getServerName()); + assertTrue(walCount > 7); + // 17576 is a weird number but its number of rows that get loaded by the above loadTable + final int ROW_COUNT = 17576; + int count = count(htu); + assertEquals(ROW_COUNT, count); + // Now kill the regionserver, wait till dead. + htu.getHBaseCluster().killRegionServer(hrs.getServerName()); + while (hrs.isAlive()) Threads.sleep(10); + + // Here is some ugly stuff. Stop the processing of server failure after logs have been + // split and then kill the master so that the region zk recovering-region znodes don't get + // cleaned up. + HMaster master = htu.getHBaseCluster().getMaster(); + SplitLogManager slm = master.getMasterFileSystem().getSplitLogManager(); + slm.doNotRemoveRecoveringRegions.set(true); + + // Start a new regionserver to get the regions and to run the log splitting. + htu.getHBaseCluster().startRegionServer(); + + // Now wait until we arrive at the post log split location but before removal of the + // recovering state from zk. + while (!slm.arrivedAtRemoveRecoveringRegions.get()) Threads.sleep(100); + // Kill Master and bring up new one. Do we recover? + master.abort("Killed by test"); + while(master.isAlive()) Threads.sleep(1); + htu.getHBaseCluster().startMaster(); + assertEquals(ROW_COUNT, count(htu)); + } finally { + htu.shutdownMiniCluster(); + } + } + + private int count(final HBaseTestingUtility htu) throws IOException { + int count = 0; + try (Table t = htu.getConnection().getTable(TABLE_NAME)) { + try (ResultScanner scanner = t.getScanner(FAMILY)) { + while (scanner.next() != null) count++; + } + } + return count; + } + + private int getCountOfServerWALS(final Configuration conf, final ServerName sn) + throws IOException { + int count = 0; + for (RemoteIterator iterator = getServerWALs(conf, sn); + iterator != null && iterator.hasNext();) { + LocatedFileStatus lfs = iterator.next(); + Log.info(lfs.toString()); + count++; + } + return count; + } + + private RemoteIterator getServerWALs(final Configuration conf, + final ServerName sn) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path logDir = new Path(rootDir, DefaultWALProvider.getWALDirectoryName(sn.toString())); + FileSystem fs = FileSystem.get(conf); + return fs.listFiles(logDir, false); + } +} \ No newline at end of file