Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 1068239) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,12 +42,14 @@ import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Executes region split as a "transaction". Call {@link #prepare()} to setup * the transaction, {@link #execute(OnlineRegions)} to run the transaction and @@ -338,7 +339,7 @@ services.postOpenDeployTasks(r, server.getCatalogTracker(), true); } - static class LoggingProgressable implements Progressable { + static class LoggingProgressable implements CancelableProgressable { private final HRegionInfo hri; private long lastLog = -1; private final long interval; @@ -350,12 +351,13 @@ } @Override - public void progress() { + public boolean progress() { long now = System.currentTimeMillis(); if (now - lastLog > this.interval) { LOG.info("Opening " + this.hri.getRegionNameAsString()); this.lastLog = now; } + return true; } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1068239) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; /** @@ -261,12 +261,12 @@ // state so master doesn't timeout this region in transition. region = HRegion.openHRegion(this.regionInfo, this.rsServices.getWAL(), this.server.getConfiguration(), this.rsServices, - new Progressable() { - public void progress() { + new CancelableProgressable() { + public boolean progress() { // We may lose the znode ownership during the open. Currently its // too hard interrupting ongoing region open. Just let it complete // and check we still have the znode after region open. - tickleOpening("open_region_progress"); + return tickleOpening("open_region_progress"); } }); } catch (IOException e) { @@ -325,6 +325,7 @@ } catch (KeeperException e) { server.abort("Exception refreshing OPENING; region=" + encodedName + ", context=" + context, e); + this.version = -1; } boolean b = isGoodVersion(); if (!b) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1068239) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -58,12 +58,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -340,7 +341,7 @@ * @return What the next sequence (edit) id should be. * @throws IOException e */ - public long initialize(final Progressable reporter) + public long initialize(final CancelableProgressable reporter) throws IOException { if (coprocessorHost != null) { coprocessorHost.preOpen(); @@ -1537,7 +1538,7 @@ lastIndexExclusive++; numReadyToWrite++; } - // Nothing to put -- an exception in the above such as NoSuchColumnFamily? + // Nothing to put -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; // We've now grabbed as many puts off the list as we can @@ -1942,7 +1943,7 @@ * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, - final long minSeqId, final Progressable reporter) + final long minSeqId, final CancelableProgressable reporter) throws UnsupportedEncodingException, IOException { long seqid = minSeqId; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); @@ -1991,7 +1992,7 @@ * @throws IOException */ private long replayRecoveredEdits(final Path edits, - final long minSeqId, final Progressable reporter) + final long minSeqId, final CancelableProgressable reporter) throws IOException { LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); @@ -2000,16 +2001,42 @@ long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; + long intervalEdits = 0; HLog.Entry entry; Store store = null; try { - // How many edits to apply before we send a progress report. - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + // How many edits seen before we check elapsed time + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", + 2000); + // How often to send a progress report (default 1/2 master timeout) + int period = this.conf.getInt("hbase.hstore.report.period", + this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", + 30000) / 2); + long lastReport = EnvironmentEdgeManager.currentTimeMillis(); + while ((entry = reader.next()) != null) { HLogKey key = entry.getKey(); WALEdit val = entry.getEdit(); + if (reporter != null) { + intervalEdits += val.size(); + if (intervalEdits >= interval) { + // Number of edits interval reached + intervalEdits = 0; + long cur = EnvironmentEdgeManager.currentTimeMillis(); + if (lastReport + period <= cur) { + // Timeout reached + if(!reporter.progress()) { + String msg = "Progressable reporter failed, stopping replay"; + LOG.warn(msg); + throw new IOException(msg); + } + lastReport = cur; + } + } + } + // Start coprocessor replay here. The coprocessor is for each WALEdit // instead of a KeyValue. if (coprocessorHost != null) { @@ -2059,12 +2086,6 @@ if (coprocessorHost != null) { coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); } - - // Every 'interval' edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % interval) == 0) { - reporter.progress(); - } } } catch (EOFException eof) { Path p = HLog.moveAsideBadEditsFile(fs, edits); @@ -2662,7 +2683,7 @@ */ public static HRegion openHRegion(final HRegionInfo info, final HLog wal, final Configuration conf, final RegionServerServices rsServices, - final Progressable reporter) + final CancelableProgressable reporter) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Opening region: " + info); @@ -2684,7 +2705,7 @@ * @return Returns this * @throws IOException */ - protected HRegion openHRegion(final Progressable reporter) + protected HRegion openHRegion(final CancelableProgressable reporter) throws IOException { checkCompressionCodecs(); Index: src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/CancelableProgressable.java (revision 0) @@ -0,0 +1,37 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +/** + * Similar interface as {@link org.apache.hadoop.util.Progressable} but returns + * a boolean to support canceling the operation. + *

+ * Used for doing updating of OPENING znode during log replay on region open. + */ +public interface CancelableProgressable { + + /** + * Report progress. Returns true if operations should continue, false if the + * operation should be canceled and rolled back. + * @return whether to continue (true) or cancel (false) the operation + */ + public boolean progress(); + +} \ No newline at end of file