diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2909f47..1dcdb21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1736,6 +1736,7 @@ public class HRegion implements HeapSize { // , Writable{ Bytes.BYTES_COMPARATOR); long flushSeqId = -1L; + long trxId = 0; try { try { w = mvcc.beginMemstoreInsert(); @@ -1762,17 +1763,36 @@ public class HRegion implements HeapSize { // , Writable{ committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } + // write the snapshot start to WAL + if (wal != null) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock + } + // Prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { flush.prepare(); } - - // write the snapshot start to WAL - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, - getRegionInfo(), flushSeqId, committedFiles); - HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId); - + } catch (IOException ex) { + if (wal != null) { + if (trxId > 0) { // check whether we have already written START_FLUSH to WAL + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); + } catch (Throwable t) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(t)); + // ignore this since we will be aborting the RS with DSE. + } + } + // we have called wal.startCacheFlush(), now we have to abort it + wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + throw ex; // let upper layers deal with it. + } } finally { this.updatesLock.writeLock().unlock(); } @@ -1780,9 +1800,16 @@ public class HRegion implements HeapSize { // , Writable{ ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes when deferred log sync is enabled + // sync unflushed WAL changes // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) wal.sync(); + if (wal != null) { + try { + wal.sync(trxId); // ensure that flush marker is sync'ed + } catch (IOException ioe) { + LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: " + + StringUtils.stringifyException(ioe)); + } + } // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents @@ -1832,6 +1859,14 @@ public class HRegion implements HeapSize { // , Writable{ // Set down the memstore size by amount of flush. this.addAndGetGlobalMemstoreSize(-totalFlushableSize); + + if (wal != null) { + // write flush marker to WAL. If fail, we should throw DroppedSnapshotException + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMPLETE_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, true); + } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1840,10 +1875,16 @@ public class HRegion implements HeapSize { // , Writable{ // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. if (wal != null) { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, - getRegionInfo(), flushSeqId, committedFiles); - HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId); + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); + } catch (Throwable ex) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(ex)); + // ignore this since we will be aborting the RS with DSE. + } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + @@ -1855,10 +1896,6 @@ public class HRegion implements HeapSize { // , Writable{ // If we get to here, the HStores have been written. if (wal != null) { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMPLETE_FLUSH, - getRegionInfo(), flushSeqId, committedFiles); - HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId); wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 271ebb7..2c4652b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -273,14 +273,15 @@ public class HLogUtil { /** * Write a flush marker indicating a start / abort or a complete of a region flush */ - public static void writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final FlushDescriptor f, AtomicLong sequenceId) throws IOException { + public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info, + final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException { TableName tn = TableName.valueOf(f.getTableName().toByteArray()); HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null); - log.sync(); + long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null); + if (sync) log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } + return trx; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e7ff570..606ea5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; -import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; @@ -899,8 +898,11 @@ public class TestHRegion { } } - class IsStartFlushWALMarker extends ArgumentMatcher { - FlushAction action = FlushAction.START_FLUSH; + class IsFlushWALMarker extends ArgumentMatcher { + volatile FlushAction[] actions; + public IsFlushWALMarker(FlushAction... actions) { + this.actions = actions; + } @Override public boolean matches(Object edit) { List kvs = ((WALEdit)edit).getKeyValues(); @@ -916,22 +918,18 @@ public class TestHRegion { return false; } if (desc != null) { - return desc.getAction() == action; + for (FlushAction action : actions) { + if (desc.getAction() == action) { + return true; + } + } } } return false; } - } - - class IsCompleteFlushWALMarker extends IsStartFlushWALMarker { - public IsCompleteFlushWALMarker() { - action = FlushAction.START_FLUSH; - } - } - - class IsAbortFlushWALMarker extends IsStartFlushWALMarker { - public IsAbortFlushWALMarker() { - action = FlushAction.ABORT_FLUSH; + public IsFlushWALMarker set(FlushAction... actions) { + this.actions = actions; + return this; } } @@ -948,11 +946,6 @@ public class TestHRegion { HLog hlog = spy(HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), TEST_UTIL.getConfiguration())); - // throw exceptions if the WalEdit is a start flush action - when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - (WALEdit)argThat(new IsStartFlushWALMarker()), (AtomicLong)any(), Mockito.anyBoolean(), - (List)any())).thenThrow(new IOException("Fail to append flush marker")); - this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family); try { @@ -962,24 +955,62 @@ public class TestHRegion { put.add(family, Bytes.toBytes(i), Bytes.toBytes(i)); region.put(put); - // start cache flush will throw exception - FlushResult result = region.flushcache(); - assertFalse(result.isFlushSucceeded()); + // 1. Test case where START_FLUSH throws exception + IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); - // start flush will not fail, but complete flush will fail + // throw exceptions if the WalEdit is a start flush action when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - (WALEdit)argThat(new IsStartFlushWALMarker()), (AtomicLong)any(), Mockito.anyBoolean(), - (List)any())).thenCallRealMethod(); - when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - (WALEdit)argThat(new IsCompleteFlushWALMarker()), (AtomicLong)any(), Mockito.anyBoolean(), - (List)any())).thenThrow(new IOException("Fail to append flush marker")); + (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), + (List)any())) + .thenThrow(new IOException("Fail to append flush marker")); - result = region.flushcache(); - assertFalse(result.isFlushSucceeded()); + // start cache flush will throw exception + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException unexpected) { + // this should not be a dropped snapshot exception. Meaning that RS will not abort + throw unexpected; + } catch (IOException expected) { + // expected + } + + // 2. Test case where START_FLUSH succeeds but COMPLETE_FLUSH will throw exception + isFlushWALMarker.set(FlushAction.COMPLETE_FLUSH); + + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException expected) { + // we expect this exception, since we were able to write the snapshot, but failed to + // write the flush marker to WAL + } catch (IOException unexpected) { + throw unexpected; + } + + region.close(); + this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family); + region.put(put); + + // 3. Test case where ABORT_FLUSH will throw exception. + // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with + // DroppedSnapshotException. Below COMPLETE_FLUSH will cause flush to abort + isFlushWALMarker.set(FlushAction.COMPLETE_FLUSH, FlushAction.ABORT_FLUSH); + + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException expected) { + // we expect this exception, since we were able to write the snapshot, but failed to + // write the flush marker to WAL + } catch (IOException unexpected) { + throw unexpected; + } } finally { - //HRegion.closeHRegion(this.region); - //this.region = null; + HRegion.closeHRegion(this.region); + this.region = null; } }