diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3959519..9bee5e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2140,7 +2140,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); @@ -2171,9 +2171,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; try { + writeEntry = mvcc.begin(); try { - writeEntry = mvcc.begin(); if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); @@ -2376,8 +2377,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi desc, false, mvcc); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" - + StringUtils.stringifyException(ex)); + + "failed writing ABORT_FLUSH marker to WAL", ex); // ignore this since we will be aborting the RS with DSE. } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index d101f7b..0779926 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.mortbay.log.Log; /** * Manages the read/write consistency. This provides an interface for readers to determine what @@ -201,10 +202,16 @@ public class MultiVersionConcurrencyControl { */ void waitForRead(WriteEntry e) { boolean interrupted = false; + int count = 0; synchronized (readWaiters) { while (readPoint.get() < e.getWriteNumber()) { + if (count % 100 == 0 && count > 0) { + Log.warn("STUCK: readPoint=" + this.readPoint.get() + ", writeNumber=" + + e.getWriteNumber() + ", queueSize=" + this.writeQueue.size()); + } + count++; try { - readWaiters.wait(0); + readWaiters.wait(10); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 2718295..8f85fef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -85,8 +85,13 @@ public class WALUtil { TableName tn = TableName.valueOf(f.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc); - long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false); - mvcc.complete(key.getWriteEntry()); + long trx; + try { + trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false); + } finally { + mvcc.complete(key.getWriteEntry()); + LOG.info("REMOVE writeNumber=" + key.getWriteEntry().getWriteNumber() + ", read=" + mvcc.getReadPoint()); + } if (sync) log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 437e1b1..e911d34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3752,11 +3752,12 @@ public class TestHRegion { private volatile boolean done; private Throwable error = null; + FlushThread() { + super("FlushThread"); + } + public void done() { done = true; - synchronized (this) { - interrupt(); - } } public void checkNoError() { @@ -3780,9 +3781,12 @@ public class TestHRegion { } try { region.flush(true); + } catch (DroppedSnapshotException dse) { + LOG.error("DSE!!!", dse); + error = dse; } catch (IOException e) { if (!done) { - LOG.error("Error while flusing cache", e); + LOG.error("Error while flushing cache", e); error = e; } break; @@ -3795,7 +3799,6 @@ public class TestHRegion { synchronized (this) { notify(); } - } }