commit 1a82092c7f2c9ee49824064ddbc1d4577d13f10c Author: Todd Lipcon Date: Sat Apr 30 22:16:29 2011 -0700 HBASE-3836. Add TaskMonitor facility diff --git src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index efa4ac7..7a19c0e 100644 --- src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -119,17 +120,20 @@ class ActiveMasterManager extends ZooKeeperListener { * * This also makes sure that we are watching the master znode so will be * notified if another master dies. + * @param startupStatus * @return True if no issue becoming active master else false if another * master was running or if some other problem (zookeeper, stop flag has been * set on this Master) */ - boolean blockUntilBecomingActiveMaster() { + boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus) { + startupStatus.setStatus("Trying to register in ZK as active master"); boolean cleanSetOfActiveMaster = true; // Try to become the active master, watch if there is another master try { if (ZKUtil.createEphemeralNodeAndWatch(this.watcher, this.watcher.masterAddressZNode, Bytes.toBytes(this.sn.toString()))) { // We are the master, return + startupStatus.setStatus("Successfully registered as active master."); this.clusterHasActiveMaster.set(true); LOG.info("Master=" + this.sn); return cleanSetOfActiveMaster; @@ -143,13 +147,17 @@ class ActiveMasterManager extends ZooKeeperListener { ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode); ServerName currentMaster = new ServerName(Bytes.toString(bytes)); if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) { - LOG.info("Current master has this master's address, " + currentMaster + + String msg = ("Current master has this master's address, " + currentMaster + "; master was restarted? Waiting on znode to expire..."); + LOG.info(msg); + startupStatus.setStatus(msg); // Hurry along the expiration of the znode. ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode); } else { - LOG.info("Another master is the active master, " + currentMaster + - "; waiting to become the next active master"); + String msg = "Another master is the active master, " + currentMaster + + "; waiting to become the next active master"; + LOG.info(msg); + startupStatus.setStatus(msg); } } catch (KeeperException ke) { master.abort("Received an unexpected KeeperException, aborting", ke); @@ -168,7 +176,7 @@ class ActiveMasterManager extends ZooKeeperListener { return cleanSetOfActiveMaster; } // Try to become active master again now that there is no active master - blockUntilBecomingActiveMaster(); + blockUntilBecomingActiveMaster(startupStatus); } return cleanSetOfActiveMaster; } diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f526411..a8c9eb4 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -72,6 +72,8 @@ import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; @@ -274,6 +276,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { */ @Override public void run() { + MonitoredTask startupStatus = + TaskMonitor.get().createStatus("Master startup"); + startupStatus.setDescription("Master startup"); try { /* * Block on becoming the active master. @@ -285,16 +290,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { * now wait until it dies to try and become the next active master. If we * do not succeed on our first attempt, this is no longer a cluster startup. */ - becomeActiveMaster(); + becomeActiveMaster(startupStatus); // We are either the active master or we were asked to shutdown if (!this.stopped) { - finishInitialization(); + finishInitialization(startupStatus); loop(); } } catch (Throwable t) { abort("Unhandled exception. Starting shutdown.", t); } finally { + startupStatus.cleanup(); + stopChores(); // Wait for all the remaining region servers to report in IFF we were // running a cluster shutdown AND we were NOT aborting. @@ -317,17 +324,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { /** * Try becoming active master. + * @param startupStatus * @return True if we could successfully become the active master. * @throws InterruptedException */ - private boolean becomeActiveMaster() throws InterruptedException { + private boolean becomeActiveMaster(MonitoredTask startupStatus) + throws InterruptedException { // TODO: This is wrong!!!! Should have new servername if we restart ourselves, // if we come back to life. this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); this.zooKeeper.registerListener(activeMasterManager); stallIfBackupMaster(this.conf, this.activeMasterManager); - return this.activeMasterManager.blockUntilBecomingActiveMaster(); + return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus); } /** @@ -390,7 +399,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { * @throws InterruptedException * @throws KeeperException */ - private void finishInitialization() + private void finishInitialization(MonitoredTask status) throws IOException, InterruptedException, KeeperException { isActiveMaster = true; @@ -401,9 +410,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { * below after we determine if cluster startup or failover. */ + status.setStatus("Initializing Master file system"); // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. this.fileSystemManager = new MasterFileSystem(this, metrics); + // publish cluster ID + status.setStatus("Publishing Cluster ID in ZooKeeper"); ClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); @@ -412,16 +424,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager = new ServerManager(this, this); + status.setStatus("Initializing ZK system trackers"); initializeZKBasedSystemTrackers(); // initialize master side coprocessors before we start handling requests + status.setStatus("Initializing master coprocessors"); this.cpHost = new MasterCoprocessorHost(this, this.conf); // start up all service threads. + status.setStatus("Initializing master service threads"); startServiceThreads(); // Wait for region servers to report in. - this.serverManager.waitForRegionServers(); + this.serverManager.waitForRegionServers(status); // Check zk for regionservers that are up but didn't register for (ServerName sn: this.regionServerTracker.getOnlineServers()) { if (!this.serverManager.isServerOnline(sn)) { @@ -432,20 +447,25 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } // TODO: Should do this in background rather than block master startup + status.setStatus("Splitting logs after master startup"); this.fileSystemManager. splitLogAfterStartup(this.serverManager.getOnlineServers().keySet()); // Make sure root and meta assigned before proceeding. - assignRootAndMeta(); + assignRootAndMeta(status); + // Fixup assignment manager status + status.setStatus("Starting assignment manager"); this.assignmentManager.joinCluster(); // Start balancer and meta catalog janitor after meta and regions have // been assigned. + status.setStatus("Starting balancer and catalog janitor"); this.balancerChore = getAndStartBalancerChore(this); this.catalogJanitorChore = Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); + status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); initialized = true; } @@ -458,12 +478,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { * @throws KeeperException * @return Count of regions we assigned. */ - int assignRootAndMeta() + int assignRootAndMeta(MonitoredTask status) throws InterruptedException, IOException, KeeperException { int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); // Work on ROOT region. Is it in zk in transition? + status.setStatus("Assigning ROOT region"); boolean rit = this.assignmentManager. processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO); if (!catalogTracker.verifyRootRegionLocation(timeout)) { @@ -479,6 +500,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { ", location=" + catalogTracker.getRootLocation()); // Work on meta region + status.setStatus("Assigning META region"); rit = this.assignmentManager. processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) { @@ -495,6 +517,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getMetaLocation()); + status.setStatus("META and ROOT assigned."); return assigned; } @@ -1106,15 +1129,21 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + this.serverName.getPort(), this); - if (!becomeActiveMaster()) { - return false; + MonitoredTask status = + TaskMonitor.get().createStatus("Recovering expired ZK session"); + try { + if (!becomeActiveMaster(status)) { + return false; + } + initializeZKBasedSystemTrackers(); + // Update in-memory structures to reflect our earlier Root/Meta assignment. + assignRootAndMeta(status); + // process RIT if any + this.assignmentManager.processRegionsInTransition(); + return true; + } finally { + status.cleanup(); } - initializeZKBasedSystemTrackers(); - // Update in-memory structures to reflect our earlier Root/Meta assignment. - assignRootAndMeta(); - // process RIT if any - this.assignmentManager.processRegionsInTransition(); - return true; } /** diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 834c456..8a42f85 100644 --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; /** * The ServerManager class manages info about region servers. @@ -458,7 +459,7 @@ public class ServerManager { * Waits for the regionservers to report in. * @throws InterruptedException */ - public void waitForRegionServers() + public void waitForRegionServers(MonitoredTask status) throws InterruptedException { long interval = this.master.getConfiguration(). getLong("hbase.master.wait.on.regionservers.interval", 3000); @@ -469,11 +470,15 @@ public class ServerManager { Thread.sleep(interval); count = countOfRegionServers(); if (count == oldcount && count > 0) break; + + String msg; if (count == 0) { - LOG.info("Waiting on regionserver(s) to checkin"); + msg = "Waiting on regionserver(s) to checkin"; } else { - LOG.info("Waiting on regionserver(s) count to settle; currently=" + count); + msg = "Waiting on regionserver(s) count to settle; currently=" + count; } + LOG.info(msg); + status.setStatus(msg); oldcount = count; } } diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 7c3eb2a..e46c223 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -35,6 +35,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; @@ -183,15 +185,22 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn(logDir + " doesn't exist. Nothing to do!"); return 0; } + + MonitoredTask status = TaskMonitor.get().createStatus( + "Doing distributed log split in " + logDir); + + status.setStatus("Checking directory contents..."); FileStatus[] logfiles = fs.listStatus(logDir); // TODO filter filenames? if (logfiles == null || logfiles.length == 0) { LOG.info(logDir + " is empty dir, no logs to split"); return 0; } + + status.setStatus("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); LOG.info("started splitting logs in " + logDir); long t = EnvironmentEdgeManager.currentTimeMillis(); - long totalSize = 0; + long totalSize = 0; TaskBatch batch = new TaskBatch(); for (FileStatus lf : logfiles) { // TODO If the log file is still being written to - which is most likely @@ -205,7 +214,7 @@ public class SplitLogManager extends ZooKeeperListener { + lf.getPath()); } } - waitTasks(batch); + waitTasks(batch, status); if (batch.done != batch.installed) { stopTrackingTasks(batch); tot_mgr_log_split_batch_err.incrementAndGet(); @@ -214,6 +223,8 @@ public class SplitLogManager extends ZooKeeperListener { throw new IOException("error or interrupt while splitting logs in " + logDir + " Task = " + batch); } + + status.setStatus("Checking for orphaned logs in log directory..."); if (anyNewLogFiles(logDir, logfiles)) { tot_mgr_new_unexpected_hlogs.incrementAndGet(); LOG.warn("new hlogs were produced while logs in " + logDir + @@ -221,12 +232,18 @@ public class SplitLogManager extends ZooKeeperListener { throw new OrphanHLogAfterSplitException(); } tot_mgr_log_split_batch_success.incrementAndGet(); + + status.setStatus("Cleaning up log directory..."); if (!fs.delete(logDir, true)) { throw new IOException("Unable to delete src dir: " + logDir); } - LOG.info("finished splitting (more than or equal to) " + totalSize + + + String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + " log files in " + logDir + " in " + - (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"); + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; + status.markComplete(msg); + LOG.info(msg); + return totalSize; } @@ -244,10 +261,14 @@ public class SplitLogManager extends ZooKeeperListener { return false; } - private void waitTasks(TaskBatch batch) { + private void waitTasks(TaskBatch batch, MonitoredTask status) { synchronized (batch) { while ((batch.done + batch.error) != batch.installed) { try { + status.setStatus("Waiting for distributed tasks to finish. " + + " scheduled=" + batch.installed + + " done=" + batch.done + + " error=" + batch.error); batch.wait(100); if (stopper.isStopped()) { LOG.warn("Stopped while waiting for log splits to be completed"); diff --git src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java new file mode 100644 index 0000000..ecbe4c2 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java @@ -0,0 +1,53 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +public interface MonitoredTask { + enum State { + RUNNING, + COMPLETE, + ABORTED; + } + + public abstract long getStartTime(); + + public abstract String getDescription(); + + public abstract String getStatus(); + + public abstract State getState(); + + public abstract long getCompletionTimestamp(); + + public abstract void markComplete(String msg); + public abstract void abort(String msg); + + public abstract void setStatus(String status); + + public abstract void setDescription(String description); + + /** + * Explicitly mark this status as able to be cleaned up, + * even though it might not be complete. + */ + public abstract void cleanup(); + + +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java new file mode 100644 index 0000000..b0edfdc --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java @@ -0,0 +1,102 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import com.google.common.annotations.VisibleForTesting; + +class MonitoredTaskImpl implements MonitoredTask { + private long startTime; + private long completionTimestamp = -1; + + private String status; + private String description; + + private State state = State.RUNNING; + + public MonitoredTaskImpl() { + startTime = System.currentTimeMillis(); + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getStatus() { + return status; + } + + @Override + public State getState() { + return state; + } + + @Override + public long getCompletionTimestamp() { + return completionTimestamp; + } + + @Override + public void markComplete(String status) { + state = State.COMPLETE; + setStatus(status); + completionTimestamp = System.currentTimeMillis(); + } + + @Override + public void abort(String msg) { + setStatus(msg); + state = State.ABORTED; + completionTimestamp = System.currentTimeMillis(); + } + + @Override + public void setStatus(String status) { + this.status = status; + } + + @Override + public void setDescription(String description) { + this.description = description; + } + + @Override + public void cleanup() { + if (state == State.RUNNING) { + state = State.ABORTED; + completionTimestamp = System.currentTimeMillis(); + } + } + + /** + * Force the completion timestamp backwards so that + * it expires now. + */ + @VisibleForTesting + void expireNow() { + completionTimestamp -= 180 * 1000; + } +} diff --git src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java new file mode 100644 index 0000000..7eab85f --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -0,0 +1,176 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import java.lang.ref.WeakReference; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +/** + * Singleton which keeps track of tasks going on in this VM. + * A Task here is anything which takes more than a few seconds + * and the user might want to inquire about the status + */ +public class TaskMonitor { + private static final Log LOG = LogFactory.getLog(TaskMonitor.class); + + // Don't keep around any tasks that have completed more than + // 60 seconds ago + private static final long EXPIRATION_TIME = 60*1000; + + @VisibleForTesting + static final int MAX_TASKS = 1000; + + private static TaskMonitor instance; + private List tasks = + Lists.newArrayList(); + + /** + * Get singleton instance. + * TODO this would be better off scoped to a single daemon + */ + public static synchronized TaskMonitor get() { + if (instance == null) { + instance = new TaskMonitor(); + } + return instance; + } + + public MonitoredTask createStatus(String description) { + MonitoredTask stat = new MonitoredTaskImpl(); + stat.setDescription(description); + MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance( + stat.getClass().getClassLoader(), + new Class[] { MonitoredTask.class }, + new PassthroughInvocationHandler(stat)); + + TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); + tasks.add(pair); + return proxy; + } + + private synchronized void purgeExpiredTasks() { + int size = 0; + + for (Iterator it = tasks.iterator(); + it.hasNext();) { + TaskAndWeakRefPair pair = it.next(); + MonitoredTask stat = pair.get(); + + if (pair.isDead()) { + // The class who constructed this leaked it. So we can + // assume it's done. + if (stat.getState() == MonitoredTaskImpl.State.RUNNING) { + LOG.warn("Status " + stat + " appears to have been leaked"); + stat.cleanup(); + } + } + + if (canPurge(stat)) { + it.remove(); + } else { + size++; + } + } + + if (size > MAX_TASKS) { + LOG.warn("Too many actions in action monitor! Purging some."); + tasks = tasks.subList(size - MAX_TASKS, size); + } + } + + public synchronized List getTasks() { + purgeExpiredTasks(); + ArrayList ret = Lists.newArrayListWithCapacity(tasks.size()); + for (TaskAndWeakRefPair pair : tasks) { + ret.add(pair.get()); + } + return ret; + } + + private boolean canPurge(MonitoredTask stat) { + long cts = stat.getCompletionTimestamp(); + return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME); + } + + /** + * This class encapsulates an object as well as a weak reference to a proxy + * that passes through calls to that object. In art form: + * + * Proxy <------------------ + * | \ + * v \ + * PassthroughInvocationHandler | weak reference + * | / + * MonitoredTaskImpl / + * | / + * StatAndWeakRefProxy ------/ + * + * Since we only return the Proxy to the creator of the MonitorableStatus, + * this means that they can leak that object, and we'll detect it + * since our weak reference will go null. But, we still have the actual + * object, so we can log it and display it as a leaked (incomplete) action. + */ + private static class TaskAndWeakRefPair { + private MonitoredTask impl; + private WeakReference weakProxy; + + public TaskAndWeakRefPair(MonitoredTask stat, + MonitoredTask proxy) { + this.impl = stat; + this.weakProxy = new WeakReference(proxy); + } + + public MonitoredTask get() { + return impl; + } + + public boolean isDead() { + return weakProxy.get() == null; + } + } + + /** + * An InvocationHandler that simply passes through calls to the original object. + */ + private static class PassthroughInvocationHandler implements InvocationHandler { + private T delegatee; + + public PassthroughInvocationHandler(T delegatee) { + this.delegatee = delegatee; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return method.invoke(delegatee, args); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0525cd3..80def69 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -349,7 +351,12 @@ public class HRegion implements HeapSize { // , Writable{ */ public long initialize(final CancelableProgressable reporter) throws IOException { + + MonitoredTask status = TaskMonitor.get().createStatus( + "Initializing region " + this); + if (coprocessorHost != null) { + status.setStatus("Running coprocessor pre-open hook"); coprocessorHost.preOpen(); } // A region can be reopened if failed a split; reset flags @@ -357,14 +364,17 @@ public class HRegion implements HeapSize { // , Writable{ this.closed.set(false); // Write HRI to a file in case we need to recover .META. + status.setStatus("Writing region info on filesystem"); checkRegioninfoOnFilesystem(); // Remove temporary data left over from old regions + status.setStatus("Cleaning up temporary data from old regions"); cleanupTmpDir(); // Load in all the HStores. Get maximum seqid. long maxSeqId = -1; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { + status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); this.stores.put(c.getName(), store); long storeSeqId = store.getMaxSequenceId(); @@ -373,8 +383,10 @@ public class HRegion implements HeapSize { // , Writable{ } } // Recover any edits if available. - maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter); + maxSeqId = replayRecoveredEditsIfAny( + this.regiondir, maxSeqId, reporter, status); + status.setStatus("Cleaning up detritus from prior splits"); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was // being split but we crashed in the middle of it all. @@ -390,9 +402,12 @@ public class HRegion implements HeapSize { // , Writable{ long nextSeqid = maxSeqId + 1; LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid); + if (coprocessorHost != null) { + status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); } + status.markComplete("Region opened successfully"); return nextSeqid; } @@ -556,12 +571,22 @@ public class HRegion implements HeapSize { // , Writable{ public List close(final boolean abort) throws IOException { // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. - synchronized (closeLock) { - return doClose(abort); + MonitoredTask status = TaskMonitor.get().createStatus( + "Closing region " + this + + (abort ? " due to abort" : "")); + + status.setStatus("Waiting for close lock"); + try { + synchronized (closeLock) { + return doClose(abort, status); + } + } finally { + status.cleanup(); } } - private List doClose(final boolean abort) + private List doClose( + final boolean abort, MonitoredTask status) throws IOException { if (isClosed()) { LOG.warn("Region " + this + " already closed"); @@ -569,9 +594,11 @@ public class HRegion implements HeapSize { // , Writable{ } if (coprocessorHost != null) { + status.setStatus("Running coprocessor pre-close hooks"); this.coprocessorHost.preClose(abort); } + status.setStatus("Disabling compacts and flushes for region"); boolean wasFlushing = false; synchronized (writestate) { // Disable compacting and flushing by background threads for this @@ -596,20 +623,24 @@ public class HRegion implements HeapSize { // , Writable{ // that will clear out of the bulk of the memstore before we put up // the close flag? if (!abort && !wasFlushing && worthPreFlushing()) { + status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(); + internalFlushcache(status); } + this.closing.set(true); + status.setStatus("Disabling writes for close"); lock.writeLock().lock(); try { if (this.isClosed()) { + status.abort("Already got closed by another process"); // SplitTransaction handles the null return null; } LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(); + internalFlushcache(status); } List result = new ArrayList(); @@ -619,8 +650,10 @@ public class HRegion implements HeapSize { // , Writable{ this.closed.set(true); if (coprocessorHost != null) { + status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); } + status.markComplete("Closed"); LOG.info("Closed " + this); return result; } finally { @@ -824,6 +857,8 @@ public class HRegion implements HeapSize { // , Writable{ lock.readLock().lock(); this.lastCompactInfo = null; byte [] splitRow = null; + MonitoredTask status = TaskMonitor.get().createStatus( + "Compacting stores in " + this); try { if (this.closed.get()) { LOG.debug("Skipping compaction on " + this + " because closed"); @@ -833,6 +868,7 @@ public class HRegion implements HeapSize { // , Writable{ return splitRow; } if (coprocessorHost != null) { + status.setStatus("Running coprocessor preCompact hooks"); coprocessorHost.preCompact(false); } try { @@ -840,10 +876,12 @@ public class HRegion implements HeapSize { // , Writable{ if (!writestate.compacting && writestate.writesEnabled) { writestate.compacting = true; } else { - LOG.info("NOT compacting region " + this + + String msg = "NOT compacting region " + this + ": compacting=" + writestate.compacting + ", writesEnabled=" + - writestate.writesEnabled); - return splitRow; + writestate.writesEnabled; + LOG.info(msg); + status.abort(msg); + return splitRow; } } LOG.info("Starting compaction on region " + this); @@ -852,6 +890,7 @@ public class HRegion implements HeapSize { // , Writable{ long lastCompactSize = 0; boolean completed = false; try { + status.setStatus("Compacting store " + store); final Store.StoreSize ss = store.compact(); lastCompactSize += store.getLastCompactSize(); if (ss != null) { @@ -868,6 +907,9 @@ public class HRegion implements HeapSize { // , Writable{ if (completed) { this.lastCompactInfo = new Pair((now - startTime) / 1000, lastCompactSize); + status.setStatus("Compaction complete: " + + StringUtils.humanReadableInt(lastCompactSize) + " in " + + (now - startTime) + "ms"); } } } finally { @@ -877,9 +919,13 @@ public class HRegion implements HeapSize { // , Writable{ } } if (coprocessorHost != null) { + status.setStatus("Running coprocessor post-compact hooks"); coprocessorHost.postCompact(splitRow != null); } + + status.markComplete("Compaction complete"); } finally { + status.cleanup(); lock.readLock().unlock(); } if (splitRow != null) { @@ -915,13 +961,17 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Skipping flush on " + this + " because closing"); return false; } + MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); + status.setStatus("Acquiring readlock on region"); lock.readLock().lock(); try { if (this.closed.get()) { LOG.debug("Skipping flush on " + this + " because closed"); + status.abort("Skipped: closed"); return false; } if (coprocessorHost != null) { + status.setStatus("Running coprocessor pre-flush hooks"); coprocessorHost.preFlush(); } try { @@ -935,13 +985,19 @@ public class HRegion implements HeapSize { // , Writable{ writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } + status.abort("Not flushing since " + + (writestate.flushing ? "already flushing" : "writes not enabled")); return false; } } - boolean result = internalFlushcache(); + boolean result = internalFlushcache(status); + if (coprocessorHost != null) { + status.setStatus("Running post-flush coprocessor hooks"); coprocessorHost.postFlush(); } + + status.markComplete("Flush successful"); return result; } finally { synchronized (writestate) { @@ -952,6 +1008,7 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { lock.readLock().unlock(); + status.cleanup(); } } @@ -982,6 +1039,7 @@ public class HRegion implements HeapSize { // , Writable{ * routes. * *

This method may block for some time. + * @param status * * @return true if the region needs compacting * @@ -989,19 +1047,21 @@ public class HRegion implements HeapSize { // , Writable{ * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - protected boolean internalFlushcache() throws IOException { - return internalFlushcache(this.log, -1); + protected boolean internalFlushcache(MonitoredTask status) throws IOException { + return internalFlushcache(this.log, -1, status); } /** * @param wal Null if we're NOT to go via hlog/wal. * @param myseqid The seqid to use if wal is null writing out * flush file. + * @param status * @return true if the region needs compacting * @throws IOException * @see #internalFlushcache() */ - protected boolean internalFlushcache(final HLog wal, final long myseqid) + protected boolean internalFlushcache( + final HLog wal, final long myseqid, MonitoredTask status) throws IOException { final long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Clear flush flag. @@ -1031,7 +1091,9 @@ public class HRegion implements HeapSize { // , Writable{ // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic // rows then) + status.setStatus("Obtaining lock to block concurrent updates"); this.updatesLock.writeLock().lock(); + status.setStatus("Preparing to flush by snapshotting stores"); final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(stores.size()); try { @@ -1049,6 +1111,7 @@ public class HRegion implements HeapSize { // , Writable{ } finally { this.updatesLock.writeLock().unlock(); } + status.setStatus("Flushing stores"); LOG.debug("Finished snapshotting, commencing flushing stores"); @@ -1063,7 +1126,7 @@ public class HRegion implements HeapSize { // , Writable{ // just-made new flush store file. for (StoreFlusher flusher : storeFlushers) { - flusher.flushCache(); + flusher.flushCache(status); } // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). @@ -1088,6 +1151,7 @@ public class HRegion implements HeapSize { // , Writable{ DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); + status.abort("Flush failed: " + StringUtils.stringifyException(t)); throw dse; } @@ -1111,13 +1175,13 @@ public class HRegion implements HeapSize { // , Writable{ } long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; - if (LOG.isDebugEnabled()) { - LOG.info("Finished memstore flush of ~" + + String msg = "Finished memstore flush of ~" + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + this + " in " + time + "ms, sequenceid=" + sequenceId + ", compaction requested=" + compactionRequested + - ((wal == null)? "; wal=null": "")); - } + ((wal == null)? "; wal=null": ""); + LOG.info(msg); + status.setStatus(msg); this.recentFlushes.add(new Pair(time/1000,currentMemStoreSize)); return compactionRequested; @@ -2020,7 +2084,8 @@ public class HRegion implements HeapSize { // , Writable{ * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, - final long minSeqId, final CancelableProgressable reporter) + final long minSeqId, final CancelableProgressable reporter, + final MonitoredTask status) throws UnsupportedEncodingException, IOException { long seqid = minSeqId; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); @@ -2046,7 +2111,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (seqid > minSeqId) { // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid); + internalFlushcache(null, seqid, status); } // Now delete the content of recovered edits. We're done w/ them. for (Path file: files) { @@ -2071,7 +2136,11 @@ public class HRegion implements HeapSize { // , Writable{ private long replayRecoveredEdits(final Path edits, final long minSeqId, final CancelableProgressable reporter) throws IOException { - LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId); + String msg = "Replaying edits from " + edits + "; minSequenceid=" + minSeqId; + LOG.info(msg); + MonitoredTask status = TaskMonitor.get().createStatus(msg); + + status.setStatus("Opening logs"); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { long currentEditSeqId = minSeqId; @@ -2103,10 +2172,14 @@ public class HRegion implements HeapSize { // , Writable{ intervalEdits = 0; long cur = EnvironmentEdgeManager.currentTimeMillis(); if (lastReport + period <= cur) { + status.setStatus("Replaying edits..." + + " skipped=" + skippedEdits + + " edits=" + editsCount); // Timeout reached if(!reporter.progress()) { - String msg = "Progressable reporter failed, stopping replay"; + msg = "Progressable reporter failed, stopping replay"; LOG.warn(msg); + status.abort(msg); throw new IOException(msg); } lastReport = cur; @@ -2117,6 +2190,7 @@ public class HRegion implements HeapSize { // , Writable{ // Start coprocessor replay here. The coprocessor is for each WALEdit // instead of a KeyValue. if (coprocessorHost != null) { + status.setStatus("Running pre-WAL-restore hook in coprocessors"); if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { // if bypass this log entry, ignore it ... continue; @@ -2158,7 +2232,7 @@ public class HRegion implements HeapSize { // , Writable{ flush = restoreEdit(store, kv); editsCount++; } - if (flush) internalFlushcache(null, currentEditSeqId); + if (flush) internalFlushcache(null, currentEditSeqId, status); if (coprocessorHost != null) { coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); @@ -2166,30 +2240,39 @@ public class HRegion implements HeapSize { // , Writable{ } } catch (EOFException eof) { Path p = HLog.moveAsideBadEditsFile(fs, edits); - LOG.warn("Encountered EOF. Most likely due to Master failure during " + + msg = "Encountered EOF. Most likely due to Master failure during " + "log spliting, so we have this data in another edit. " + - "Continuing, but renaming " + edits + " as " + p, eof); + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, eof); + status.abort(msg); } catch (IOException ioe) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (ioe.getCause() instanceof ParseException) { Path p = HLog.moveAsideBadEditsFile(fs, edits); - LOG.warn("File corruption encountered! " + - "Continuing, but renaming " + edits + " as " + p, ioe); + msg = "File corruption encountered! " + + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, ioe); + status.setStatus(msg); } else { + status.abort(StringUtils.stringifyException(ioe)); // other IO errors may be transient (bad network connection, // checksum exception on one datanode, etc). throw & retry throw ioe; } } + + msg = "Applied " + editsCount + ", skipped " + skippedEdits + + ", firstSequenceidInLog=" + firstSeqIdInLog + + ", maxSequenceidInLog=" + currentEditSeqId; + status.markComplete(msg); if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + - ", firstSequenceidInLog=" + firstSeqIdInLog + - ", maxSequenceidInLog=" + currentEditSeqId); + LOG.debug(msg); } return currentEditSeqId; } finally { reader.close(); + status.cleanup(); } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 47c5781..e2295c2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -448,11 +449,13 @@ public class Store implements HeapSize { */ private StoreFile flushCache(final long logCacheFlushId, SortedSet snapshot, - TimeRangeTracker snapshotTimeRangeTracker) throws IOException { + TimeRangeTracker snapshotTimeRangeTracker, + MonitoredTask status) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. - return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker); + return internalFlushCache( + snapshot, logCacheFlushId, snapshotTimeRangeTracker, status); } /* @@ -463,7 +466,8 @@ public class Store implements HeapSize { */ private StoreFile internalFlushCache(final SortedSet set, final long logCacheFlushId, - TimeRangeTracker snapshotTimeRangeTracker) + TimeRangeTracker snapshotTimeRangeTracker, + MonitoredTask status) throws IOException { StoreFile.Writer writer = null; long flushed = 0; @@ -476,6 +480,7 @@ public class Store implements HeapSize { // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. synchronized (flushLock) { + status.setStatus("Flushing " + this + ": creating writer"); // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); @@ -491,18 +496,23 @@ public class Store implements HeapSize { } finally { // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. + status.setStatus("Flushing " + this + ": appending metadata"); writer.appendMetadata(logCacheFlushId, false); + status.setStatus("Flushing " + this + ": closing flushed file"); writer.close(); } } // Write-out finished successfully, move into the right spot Path dstPath = StoreFile.getUniqueFile(fs, homedir); - LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); + String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; + LOG.info(msg); + status.setStatus("Flushing " + this + ": " + msg); if (!fs.rename(writer.getPath(), dstPath)) { LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath); } + status.setStatus("Flushing " + this + ": reopening flushed file"); StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.family.getBloomFilterType(), this.inMemory); StoreFile.Reader r = sf.createReader(); @@ -1593,8 +1603,9 @@ public class Store implements HeapSize { } @Override - public void flushCache() throws IOException { - storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker); + public void flushCache(MonitoredTask status) throws IOException { + storeFile = Store.this.flushCache( + cacheFlushId, snapshot, snapshotTimeRangeTracker, status); } @Override diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 8706e65..c84ba05 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; + /** * A package protected interface for a store flushing. * A store flusher carries the state required to prepare/flush/commit the @@ -45,7 +47,7 @@ interface StoreFlusher { * * @throws IOException in case the flush fails */ - void flushCache() throws IOException; + void flushCache(MonitoredTask status) throws IOException; /** * Commit the flush - add the store file to the store and clear the diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 6fe2a03..db29e56 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -46,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -70,7 +72,6 @@ import com.google.common.collect.Lists; * region to replay on startup. Delete the old log files when finished. */ public class HLogSplitter { - private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl"; /** @@ -106,6 +107,8 @@ public class HLogSplitter { // Wait/notify for when data has been produced by the reader thread, // consumed by the reader thread, or an exception occurred Object dataAvailable = new Object(); + + private MonitoredTask status; /** @@ -179,10 +182,16 @@ public class HLogSplitter { "An HLogSplitter instance may only be used once"); hasSplit = true; + status = TaskMonitor.get().createStatus( + "Splitting logs in " + srcDir); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + + status.setStatus("Determining files to split..."); List splits = null; if (!fs.exists(srcDir)) { // Nothing to do + status.markComplete("No log directory existed to split."); return splits; } FileStatus[] logfiles = fs.listStatus(srcDir); @@ -190,15 +199,20 @@ public class HLogSplitter { // Nothing to do return splits; } - LOG.info("Splitting " + logfiles.length + " hlog(s) in " - + srcDir.toString()); + logAndReport("Splitting " + logfiles.length + " hlog(s) in " + + srcDir.toString()); splits = splitLog(logfiles); splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; - LOG.info("hlog file splitting completed in " + splitTime + + logAndReport("hlog file splitting completed in " + splitTime + " ms for " + srcDir.toString()); return splits; } + + private void logAndReport(String msg) { + status.setStatus(msg); + LOG.info(msg); + } /** * @return time that this split took @@ -252,6 +266,7 @@ public class HLogSplitter { boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true); + long totalBytesToSplit = countTotalBytes(logfiles); splitSize = 0; outputSink.startWriterThreads(entryBuffers); @@ -262,7 +277,7 @@ public class HLogSplitter { Path logPath = log.getPath(); long logLength = log.getLen(); splitSize += logLength; - LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length + logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length + ": " + logPath + ", length=" + logLength); Reader in; try { @@ -284,20 +299,36 @@ public class HLogSplitter { continue; } } + status.setStatus("Log splits complete. Checking for orphaned logs."); + if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { throw new OrphanHLogAfterSplitException( "Discovered orphan hlog after split. Maybe the " + "HRegionServer was not dead when we started"); } + + status.setStatus("Archiving logs after completed split"); archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { + status.setStatus("Finishing writing output logs and closing down."); splits = outputSink.finishWritingAndClose(); } return splits; } /** + * @return the total size of the passed list of files. + */ + private static long countTotalBytes(FileStatus[] logfiles) { + long ret = 0; + for (FileStatus stat : logfiles) { + ret += stat.getLen(); + } + return ret; + } + + /** * Splits a HLog file into a temporary staging area. tmpname is used to build * the name of the staging area where the recovered-edits will be separated * out by region and stored. @@ -328,6 +359,11 @@ public class HLogSplitter { final Map logWriters = Collections. synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); boolean isCorrupted = false; + + Preconditions.checkState(status == null); + status = TaskMonitor.get().createStatus( + "Splitting log file " + logfile.getPath() + + "into a temporary staging area."); Object BAD_WRITER = new Object(); @@ -342,6 +378,7 @@ public class HLogSplitter { Path logPath = logfile.getPath(); long logLength = logfile.getLen(); LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); + status.setStatus("Opening log file"); Reader in = null; try { in = getReader(fs, logfile, conf, skipErrors); @@ -351,12 +388,14 @@ public class HLogSplitter { isCorrupted = true; } if (in == null) { + status.markComplete("Was nothing to split in log file"); LOG.warn("Nothing to split in log file " + logPath); return true; } long t = EnvironmentEdgeManager.currentTimeMillis(); long last_report_at = t; if (reporter != null && reporter.progress() == false) { + status.markComplete("Failed: reporter.progress asked us to terminate"); return false; } int editsCount = 0; @@ -380,10 +419,12 @@ public class HLogSplitter { wap.w.append(entry); editsCount++; if (editsCount % interval == 0) { + status.setStatus("Split " + editsCount + " edits"); long t1 = EnvironmentEdgeManager.currentTimeMillis(); if ((t1 - last_report_at) > period) { last_report_at = t; if (reporter != null && reporter.progress() == false) { + status.markComplete("Failed: reporter.progress asked us to terminate"); progress_failed = true; return false; } @@ -416,10 +457,12 @@ public class HLogSplitter { wap.w.close(); LOG.debug("Closed " + wap.p); } - LOG.info("processed " + editsCount + " edits across " + n + " regions" + + String msg = ("processed " + editsCount + " edits across " + n + " regions" + " threw away edits for " + (logWriters.size() - n) + " regions" + " log file = " + logPath + " is corrupted = " + isCorrupted); + LOG.info(msg); + status.markComplete(msg); } return true; } diff --git src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 75397f7..7f19c72 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -41,6 +42,7 @@ import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** * Test the {@link ActiveMasterManager}. @@ -77,7 +79,8 @@ public class TestActiveMasterManager { assertFalse(activeMasterManager.clusterHasActiveMaster.get()); // First test becoming the active master uninterrupted - activeMasterManager.blockUntilBecomingActiveMaster(); + MonitoredTask status = Mockito.mock(MonitoredTask.class); + activeMasterManager.blockUntilBecomingActiveMaster(status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); @@ -87,7 +90,7 @@ public class TestActiveMasterManager { master, secondDummyMaster); zk.registerListener(secondActiveMasterManager); assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get()); - activeMasterManager.blockUntilBecomingActiveMaster(); + activeMasterManager.blockUntilBecomingActiveMaster(status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); } @@ -120,7 +123,8 @@ public class TestActiveMasterManager { assertFalse(activeMasterManager.clusterHasActiveMaster.get()); // First test becoming the active master uninterrupted - activeMasterManager.blockUntilBecomingActiveMaster(); + activeMasterManager.blockUntilBecomingActiveMaster( + Mockito.mock(MonitoredTask.class)); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, firstMasterAddress); @@ -201,7 +205,8 @@ public class TestActiveMasterManager { @Override public void run() { - manager.blockUntilBecomingActiveMaster(); + manager.blockUntilBecomingActiveMaster( + Mockito.mock(MonitoredTask.class)); LOG.info("Second master has become the active master!"); isActiveMaster = true; } diff --git src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java new file mode 100644 index 0000000..d0d767d --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -0,0 +1,101 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +public class TestTaskMonitor { + + @Test + public void testTaskMonitorBasics() { + TaskMonitor tm = new TaskMonitor(); + assertTrue("Task monitor should start empty", + tm.getTasks().isEmpty()); + + // Make a task and fetch it back out + MonitoredTask task = tm.createStatus("Test task"); + MonitoredTask taskFromTm = tm.getTasks().get(0); + + // Make sure the state is reasonable. + assertEquals(task.getDescription(), taskFromTm.getDescription()); + assertEquals(-1, taskFromTm.getCompletionTimestamp()); + assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState()); + + // Mark it as finished + task.markComplete("Finished!"); + assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState()); + + // It should still show up in the TaskMonitor list + assertEquals(1, tm.getTasks().size()); + + // If we mark its completion time back a few minutes, it should get gced + ((MonitoredTaskImpl)taskFromTm).expireNow(); + assertEquals(0, tm.getTasks().size()); + } + + @Test + public void testTasksGetAbortedOnLeak() throws InterruptedException { + final TaskMonitor tm = new TaskMonitor(); + assertTrue("Task monitor should start empty", + tm.getTasks().isEmpty()); + + final AtomicBoolean threadSuccess = new AtomicBoolean(false); + // Make a task in some other thread and leak it + Thread t = new Thread() { + @Override + public void run() { + MonitoredTask task = tm.createStatus("Test task"); + assertEquals(MonitoredTask.State.RUNNING, task.getState()); + threadSuccess.set(true); + } + }; + t.start(); + t.join(); + // Make sure the thread saw the correct state + assertTrue(threadSuccess.get()); + + // Make sure the leaked reference gets cleared + System.gc(); + System.gc(); + System.gc(); + + // Now it should be aborted + MonitoredTask taskFromTm = tm.getTasks().get(0); + assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState()); + } + + @Test + public void testTaskLimit() throws Exception { + TaskMonitor tm = new TaskMonitor(); + for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) { + tm.createStatus("task " + i); + } + // Make sure it was limited correctly + assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size()); + // Make sure we culled the earlier tasks, not later + // (i.e. tasks 0 through 9 should have been deleted) + assertEquals("task 10", tm.getTasks().get(0).getDescription()); + } + +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 7e46989..adfe1f8 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.mockito.Mockito; import com.google.common.base.Joiner; @@ -600,7 +602,7 @@ public class TestStore extends TestCase { private static void flushStore(Store store, long id) throws IOException { StoreFlusher storeFlusher = store.getStoreFlusher(id); storeFlusher.prepare(); - storeFlusher.flushCache(); + storeFlusher.flushCache(Mockito.mock(MonitoredTask.class)); storeFlusher.commit(); } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index d10ab13..3ae770b 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Store; @@ -55,6 +56,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** * Test replay of edits out of a WAL split. @@ -394,7 +396,8 @@ public class TestWALReplay { null) { protected boolean internalFlushcache(HLog wal, long myseqid) throws IOException { - boolean b = super.internalFlushcache(wal, myseqid); + boolean b = super.internalFlushcache(wal, myseqid, + Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); return b; };