diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a8ffa8d..907cdc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -972,7 +972,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId, getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc); + WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc); } private void writeRegionCloseMarker(WAL wal) throws IOException { @@ -988,7 +988,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc); + WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a @@ -1446,11 +1446,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (final Store store : stores.values()) { long flushableSize = store.getFlushableSize(); if (!(abort || flushableSize == 0 || writestate.readOnly)) { - getRegionServerServices().abort("Assertion failed while closing store " + if (getRegionServerServices() != null) { + getRegionServerServices().abort("Assertion failed while closing store " + getRegionInfo().getRegionNameAsString() + " " + store + ". flushableSize expected=0, actual= " + flushableSize + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor " + "operation failed and left the memstore in a partially updated state.", null); + } } completionService .submit(new Callable>>() { @@ -2138,7 +2140,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); @@ -2169,9 +2171,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; + MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin(); try { try { - writeEntry = mvcc.begin(); if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); @@ -2374,8 +2376,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi desc, false, mvcc); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" - + StringUtils.stringifyException(ex)); + + "failed writing ABORT_FLUSH marker to WAL", ex); // ignore this since we will be aborting the RS with DSE. } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); @@ -5355,7 +5356,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(), - loadDescriptor); + loadDescriptor, mvcc); } catch (IOException ioe) { if (this.rsServices != null) { // Have to abort region server because some hfiles has been loaded but we can't write @@ -7809,7 +7810,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Update counters for numer of puts without wal and the size of possible data loss. + * Update counters for number of puts without wal and the size of possible data loss. * These information are exposed by the region server metrics. */ private void recordMutationWithoutWal(final Map> familyMap) { @@ -8031,9 +8032,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); - // Call append but with an empty WALEdit. The returned seqeunce id will not be associated + // Call append but with an empty WALEdit. The returned sequence id will not be associated // with any edit and we can be sure it went in after all outstanding appends. - wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); + try { + wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); + } catch (Throwable t) { + // If exception, our mvcc won't get cleaned up by client, so do it here. + getMVCC().complete(key.getWriteEntry()); + } return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index d101f7b..a3541e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.mortbay.log.Log; /** * Manages the read/write consistency. This provides an interface for readers to determine what @@ -201,10 +202,15 @@ public class MultiVersionConcurrencyControl { */ void waitForRead(WriteEntry e) { boolean interrupted = false; + int count = 0; synchronized (readWaiters) { while (readPoint.get() < e.getWriteNumber()) { + if (count % 100 == 0 && count > 0) { + Log.warn("STUCK: " + this); + } + count++; try { - readWaiters.wait(0); + readWaiters.wait(10); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. @@ -217,6 +223,23 @@ public class MultiVersionConcurrencyControl { } } + @VisibleForTesting + public String toString() { + StringBuffer sb = new StringBuffer(256); + sb.append("readPoint="); + sb.append(this.readPoint.get()); + sb.append(", writePoint="); + sb.append(this.writePoint); + synchronized (this.writeQueue) { + for (WriteEntry we: this.writeQueue) { + sb.append(", ["); + sb.append(we); + sb.append("]"); + } + } + return sb.toString(); + } + public long getReadPoint() { return readPoint.get(); } @@ -250,6 +273,11 @@ public class MultiVersionConcurrencyControl { public long getWriteNumber() { return this.writeNumber; } + + @Override + public String toString() { + return this.writeNumber + ", " + this.completed; + } } public static final long FIXED_SIZE = ClassSize.align( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 0e4a585..b955931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1876,12 +1876,6 @@ public class FSHLog implements WAL { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a - // region sequence id only, a region edit/sequence id that is not associated with an actual - // edit. It has to go through all the rigmarole to be sure we have the right ordering. - if (entry.getEdit().isEmpty()) { - return; - } // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), @@ -1898,8 +1892,12 @@ public class FSHLog implements WAL { entry.getEdit()); } } - - writer.append(entry); + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual + // edit. It has to go through all the rigmarole to be sure we have the right ordering. + if (!entry.getEdit().isEmpty()) { + writer.append(entry); + } assert highestUnsyncedSequence < entry.getSequence(); highestUnsyncedSequence = entry.getSequence(); sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 2718295..c09708a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -65,9 +65,13 @@ public class WALUtil { TableName tn = TableName.valueOf(c.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc); - log.append(htd, info, key, WALEdit.createCompaction(info, c), false); - mvcc.complete(key.getWriteEntry()); - log.sync(); + long trx; + try { + trx = log.append(htd, info, key, WALEdit.createCompaction(info, c), false); + log.sync(trx); + } finally { + mvcc.complete(key.getWriteEntry()); + } if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -85,9 +89,13 @@ public class WALUtil { TableName tn = TableName.valueOf(f.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc); - long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false); - mvcc.complete(key.getWriteEntry()); - if (sync) log.sync(trx); + long trx = -1; + try { + trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false); + if (sync) log.sync(trx); + } finally { + mvcc.complete(key.getWriteEntry()); + } if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } @@ -98,12 +106,18 @@ public class WALUtil { * Write a region open marker indicating that the region is opened */ public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info, - final RegionEventDescriptor r) throws IOException { + final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) + throws IOException { TableName tn = TableName.valueOf(r.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false); - log.sync(trx); + long trx = -1; + try { + trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false); + log.sync(trx); + } finally { + mvcc.complete(key.getWriteEntry()); + } if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } @@ -123,17 +137,23 @@ public class WALUtil { public static long writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd, final HRegionInfo info, - final WALProtos.BulkLoadDescriptor descriptor) + final WALProtos.BulkLoadDescriptor descriptor, + final MultiVersionConcurrencyControl mvcc) throws IOException { TableName tn = info.getTable(); WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); // Add it to the log but the false specifies that we don't need to add it to the memstore - long trx = wal.append(htd, + long trx; + try { + trx = wal.append(htd, info, key, WALEdit.createBulkLoadEvent(info, descriptor), false); - wal.sync(trx); + wal.sync(trx); + } finally { + mvcc.complete(key.getWriteEntry()); + } if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(descriptor)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 48ede4c..2859dfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -97,6 +97,12 @@ public class WALKey implements SequenceId, Comparable { try { this.seqNumAssignedLatch.await(); } catch (InterruptedException ie) { + // If interrupted... clear out our entry else we can block up mvcc. + MultiVersionConcurrencyControl mvcc = getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = this.writeEntry; + if (we != null) mvcc.complete(we); + } InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index cb95d6f..5792f29 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3762,6 +3762,10 @@ public class TestHRegion { private volatile boolean done; private Throwable error = null; + FlushThread() { + super("FlushThread"); + } + public void done() { done = true; synchronized (this) { @@ -3792,20 +3796,21 @@ public class TestHRegion { region.flush(true); } catch (IOException e) { if (!done) { - LOG.error("Error while flusing cache", e); + LOG.error("Error while flushing cache", e); error = e; } break; + } catch (Throwable t) { + LOG.error("Uncaught exception", t); + throw t; } } - } public void flush() { synchronized (this) { notify(); } - } } @@ -3908,6 +3913,7 @@ public class TestHRegion { private byte[][] qualifiers; private PutThread(int numRows, byte[][] families, byte[][] qualifiers) { + super("PutThread"); this.numRows = numRows; this.families = families; this.qualifiers = qualifiers; @@ -3963,8 +3969,9 @@ public class TestHRegion { } } catch (InterruptedIOException e) { // This is fine. It means we are done, or didn't get the lock on time + LOG.info("Interrupted", e); } catch (IOException e) { - LOG.error("error while putting records", e); + LOG.error("Error while putting records", e); error = e; break; }