Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 11224) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,12 +42,14 @@ import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Executes region split as a "transaction". Call {@link #prepare()} to setup * the transaction, {@link #execute(OnlineRegions)} to run the transaction and @@ -222,7 +223,7 @@ services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName()); } this.journal.add(JournalEntry.OFFLINED_PARENT); - + // TODO: If the below were multithreaded would we complete steps in less // elapsed time? St.Ack 20100920 @@ -328,7 +329,7 @@ services.postOpenDeployTasks(r, server.getCatalogTracker(), true); } - static class LoggingProgressable implements Progressable { + static class LoggingProgressable implements CancelableProgressable { private final HRegionInfo hri; private long lastLog = -1; private final long interval; @@ -340,12 +341,13 @@ } @Override - public void progress() { + public boolean progress() { long now = System.currentTimeMillis(); if (now - lastLog > this.interval) { LOG.info("Opening " + this.hri.getRegionNameAsString()); this.lastLog = now; } + return true; } } @@ -590,7 +592,7 @@ * Call this method on initial region deploy. Cleans up any mess * left by previous deploys of passed r region. * @param r - * @throws IOException + * @throws IOException */ static void cleanupAnySplitDetritus(final HRegion r) throws IOException { Path splitdir = getSplitDir(r); Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 11224) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -29,10 +29,9 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; /** * Handles opening of a region on a region server. @@ -95,33 +94,29 @@ // Instantiate the region. This also periodically updates OPENING. region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(), server.getConfiguration(), this.rsServices.getFlushRequester(), - new Progressable() { - public void progress() { + new CancelableProgressable() { + public boolean progress() { try { int vsn = ZKAssign.retransitionNodeOpening( server.getZooKeeper(), regionInfo, server.getServerName(), openingInteger.get()); if (vsn == -1) { - throw KeeperException.create(Code.BADVERSION); + // Unable to retransition node, abort region open + return false; } openingInteger.set(vsn); + return true; } catch (KeeperException e) { server.abort("ZK exception refreshing OPENING node; " + name, e); + return false; } } }); } catch (IOException e) { LOG.error("Failed open of " + regionInfo + - "; resetting state of transition node from OPENING to OFFLINE", e); - try { - // TODO: We should rely on the master timing out OPENING instead of this - // TODO: What if this was a split open? The RS made the OFFLINE - // znode, not the master. - ZKAssign.forceNodeOffline(server.getZooKeeper(), regionInfo, - server.getServerName()); - } catch (KeeperException e1) { - LOG.error("Error forcing node back to OFFLINE from OPENING; " + name); - } + "; stopping open and will allow master to timeout operation", e); + // TODO we should be able to do an atomic transition from OPENING to + // OFFLINE to make it so we don't always have to wait for timeout return; } // Region is now open. Close it if error. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 11224) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -55,12 +55,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -324,7 +325,7 @@ * @return What the next sequence (edit) id should be. * @throws IOException e */ - public long initialize(final Progressable reporter) + public long initialize(final CancelableProgressable reporter) throws IOException { // A region can be reopened if failed a split; reset flags this.closing.set(false); @@ -1428,7 +1429,7 @@ lastIndexExclusive++; numReadyToWrite++; } - // Nothing to put -- an exception in the above such as NoSuchColumnFamily? + // Nothing to put -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; // We've now grabbed as many puts off the list as we can @@ -1793,7 +1794,7 @@ * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, - final long minSeqId, final Progressable reporter) + final long minSeqId, final CancelableProgressable reporter) throws UnsupportedEncodingException, IOException { long seqid = minSeqId; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); @@ -1842,7 +1843,7 @@ * @throws IOException */ private long replayRecoveredEdits(final Path edits, - final long minSeqId, final Progressable reporter) + final long minSeqId, final CancelableProgressable reporter) throws IOException { LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); @@ -1851,15 +1852,42 @@ long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; + long intervalEdits = 0; HLog.Entry entry; Store store = null; try { - // How many edits to apply before we send a progress report. - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + // How many edits seen before we check elapsed time + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", + 2000); + // How often to send a progress report (default 1/2 master timeout) + int period = this.conf.getInt("hbase.hstore.report.period", + this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", + 30000) / 2); + long lastReport = EnvironmentEdgeManager.currentTimeMillis(); + while ((entry = reader.next()) != null) { HLogKey key = entry.getKey(); WALEdit val = entry.getEdit(); + + if (reporter != null) { + intervalEdits += val.size(); + if (intervalEdits >= interval) { + // Number of edits interval reached + intervalEdits = 0; + long cur = EnvironmentEdgeManager.currentTimeMillis(); + if (lastReport + period <= cur) { + // Timeout reached + if(!reporter.progress()) { + String msg = "Progressable reporter failed, stopping replay"; + LOG.warn(msg); + throw new IOException(msg); + } + lastReport = cur; + } + } + } + if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } @@ -1896,12 +1924,6 @@ editsCount++; } if (flush) internalFlushcache(null, currentEditSeqId); - - // Every 'interval' edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % interval) == 0) { - reporter.progress(); - } } } catch (EOFException eof) { Path p = HLog.moveAsideBadEditsFile(fs, edits); @@ -2476,7 +2498,7 @@ */ public static HRegion openHRegion(final HRegionInfo info, final HLog wal, final Configuration conf, final FlushRequester flusher, - final Progressable reporter) + final CancelableProgressable reporter) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Opening region: " + info); @@ -2498,7 +2520,7 @@ * @return Returns this * @throws IOException */ - protected HRegion openHRegion(final Progressable reporter) + protected HRegion openHRegion(final CancelableProgressable reporter) throws IOException { checkCompressionCodecs(); Index: src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (revision 0) @@ -0,0 +1,37 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +/** + * Similar interface as {@link org.apache.hadoop.util.Progressable} but returns + * a boolean to support canceling the operation. + *

+ * Used for doing updating of OPENING znode during log replay on region open. + */ +public interface CancelableProgressable { + + /** + * Report progress. Returns true if operations should continue, false if the + * operation should be canceled and rolled back. + * @return whether to continue (true) or cancel (false) the operation + */ + public boolean progress(); + +} \ No newline at end of file