diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 7dbad6c..93eac88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -305,7 +305,8 @@ public class HFile { try { ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); } catch (UnsupportedOperationException uoe) { - LOG.debug("Unable to set drop behind on " + path, uoe); + if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path); + else if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe); } } return createWriter(fs, path, ostream, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2293311..0edaac8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -204,12 +204,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; - // in milliseconds - private static final String MAX_WAIT_FOR_SEQ_ID_KEY = - "hbase.hregion.max.wait.for.seq.id"; - - private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 60000; - /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -241,7 +235,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. * Its default value is -1L. This default is used as a marker to indicate * that the region hasn't opened yet. Once it is opened, it is set to the derived - * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. + * #openSeqNum, the largest sequence id of all hfiles opened under this Region. * *

Control of this sequence is handed off to the WAL implementation. It is responsible * for tagging edits with the correct sequence id since it is responsible for getting the @@ -340,7 +334,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean isLoadingCfsOnDemandDefault = false; - private int maxWaitForSeqId; private final AtomicInteger majorInProgress = new AtomicInteger(0); private final AtomicInteger minorInProgress = new AtomicInteger(0); @@ -673,7 +666,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); - maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -2137,7 +2129,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConcurrencyControl.WriteEntry w = null; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); @@ -2168,7 +2160,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long trxId = 0; try { try { - w = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsert(); if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); @@ -2250,14 +2242,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - w.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(w); + writeEntry.setWriteNumber(flushOpSeqId); + mvcc.waitForPreviousTransactionsComplete(writeEntry); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block - w = null; + writeEntry = null; } finally { - if (w != null) { - // in case of failure just mark current w as complete - mvcc.advanceMemstore(w); + if (writeEntry != null) { + // in case of failure just mark current writeEntry as complete + mvcc.advanceMemstore(writeEntry); } } return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, @@ -2434,7 +2426,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { WALKey key = this.appendEmptyEdit(wal, null); - return key.getSequenceId(maxWaitForSeqId); + return key.getSequenceId(); } ////////////////////////////////////////////////////////////////////////////// @@ -2853,7 +2845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConcurrencyControl.WriteEntry w = null; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; long txid = 0; boolean doRollBackMemstore = false; boolean locked = false; @@ -3006,7 +2998,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -3121,7 +3113,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); } - if(walKey == null){ + if (walKey == null){ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendEmptyEdit(this.wal, memstoreCells); } @@ -3155,9 +3147,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); - w = null; + if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + writeEntry = null; } // ------------------------------------ @@ -3186,9 +3178,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); - } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } if (locked) { @@ -6722,6 +6714,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi processor.postBatchMutate(this); } } finally { + // TODO: Make this method look like all other methods that are doing append/sync and + // memstore rollback such as append and doMiniBatchMutation. Currently it is a little + // different. Make them all share same code! if (!mutations.isEmpty() && !walSyncSuccessful) { LOG.warn("Wal sync failed. Roll back " + mutations.size() + " memstore keyvalues for row(s):" + StringUtils.byteToHexString( @@ -6732,6 +6727,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getStore(cell).rollback(cell); } } + if (writeEntry != null) { + mvcc.cancelMemstoreInsert(writeEntry); + writeEntry = null; + } } // 13. Roll mvcc forward if (writeEntry != null) { @@ -6833,7 +6832,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); long mvccNum = 0; - WriteEntry w = null; + WriteEntry writeEntry = null; WALKey walKey = null; RowLock rowLock = null; List memstoreCells = new ArrayList(); @@ -6854,7 +6853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // now start my own transaction mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { @@ -7036,10 +7035,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); - } + closeRegionOperation(Operation.APPEND); } @@ -7086,7 +7086,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); RowLock rowLock = null; - WriteEntry w = null; + WriteEntry writeEntry = null; WALKey walKey = null; long mvccNum = 0; List memstoreCells = new ArrayList(); @@ -7107,7 +7107,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // now start my own transaction mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family: @@ -7277,9 +7277,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); - } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { @@ -7310,7 +7310,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -7970,13 +7970,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private WALKey appendEmptyEdit(final WAL wal, List cells) throws IOException { + return appendEmptyEdit(wal, getTableDesc(), getRegionInfo(), this.sequenceId, cells); + } + + @VisibleForTesting + public static WALKey appendEmptyEdit(final WAL wal, final HTableDescriptor htd, + final HRegionInfo hri, final AtomicLong sequenceId, final List cells) + throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), + WALKey key = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated // with any edit and we can be sure it went in after all outstanding appends. - wal.append(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); + wal.append(htd, hri, key, + WALEdit.EMPTY_WALEDIT, sequenceId, false, cells); return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 7649ac9..3bda21a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -109,7 +109,9 @@ public class LogRoller extends HasThread { if (!periodic) { synchronized (rollLog) { try { - if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency); + if (!rollLog.get()) { + rollLog.wait(this.threadWakeFrequency); + } } catch (InterruptedException e) { // Fall through } @@ -180,5 +182,4 @@ public class LogRoller extends HasThread { requester); } } - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index 028d81a..2d65387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -79,10 +79,11 @@ public class MultiVersionConcurrencyControl { // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers // because each handler could increment sequence num twice and max concurrent in-flight // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key + // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key. // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all. + // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done? return sequenceId.incrementAndGet() + 1000000000; } @@ -128,6 +129,23 @@ public class MultiVersionConcurrencyControl { } /** + * Cancel a write insert that failed. + * Removes the write entry without advancing read point or without interfering with write + * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method + * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see + * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark + * it as for special handling). + * @param writeEntry Failed attempt at write. Does cleanup. + */ + public void cancelMemstoreInsert(WriteEntry writeEntry) { + // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance + // readpoint and gets my little writeEntry completed and removed from queue of outstanding + // events which seems right. St.Ack 20150901. + writeEntry.setWriteNumber(NO_WRITE_NUMBER); + advanceMemstore(writeEntry); + } + + /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the * end of this call, the global read point is at least as large as the write point of the passed * in WriteEntry. Thus, the write is visible to MVCC readers. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index fa69d63..3ec99cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -758,6 +758,19 @@ public class FSHLog implements WAL { } /** + * Used to manufacture race condition reliably. For testing only. + * @see #beforeWaitOnSafePoint() + */ + @VisibleForTesting + protected void afterCreatingZigZagLatch() {} + + /** + * @see #afterCreatingZigZagLatch() + */ + @VisibleForTesting + protected void beforeWaitOnSafePoint() {}; + + /** * Cleans up current writer closing it and then puts in place the passed in * nextWriter. * @@ -786,6 +799,7 @@ public class FSHLog implements WAL { SyncFuture syncFuture = null; SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? null: this.ringBufferEventHandler.attainSafePoint(); + afterCreatingZigZagLatch(); TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); try { // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the @@ -799,6 +813,7 @@ public class FSHLog implements WAL { syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); } } catch (FailedSyncBeforeLogCloseException e) { + // If unflushed/unsynced entries on close, it is reason to abort. if (isUnflushedEntries()) throw e; // Else, let is pass through to the close. LOG.warn("Failed last sync but no outstanding unsync edits so falling through to close; " + @@ -1030,6 +1045,7 @@ public class FSHLog implements WAL { * @param clusterIds that have consumed the change * @return New log key. */ + @SuppressWarnings("deprecation") protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, long now, List clusterIds, long nonceGroup, long nonce) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. @@ -1082,6 +1098,7 @@ public class FSHLog implements WAL { */ private class SyncRunner extends HasThread { private volatile long sequence; + // Keep around last exception thrown. Clear on successful sync. private final BlockingQueue syncFutures; /** @@ -1200,28 +1217,27 @@ public class FSHLog implements WAL { // while we run. TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); - Throwable t = null; + Throwable lastException = null; try { Trace.addTimelineAnnotation("syncing writer"); writer.sync(); Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { - LOG.error("Error syncing, request close of wal ", e); - t = e; + LOG.error("Error syncing, request close of WAL", e); + lastException = e; } catch (Exception e) { LOG.warn("UNEXPECTED", e); - t = e; + lastException = e; } finally { // reattach the span to the future before releasing. takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, t); - if (t != null) { - requestLogRoll(); - } else checkLogRoll(); + syncCount += releaseSyncFutures(currentSequence, lastException); + if (lastException != null) requestLogRoll(); + else checkLogRoll(); } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { @@ -1677,6 +1693,11 @@ public class FSHLog implements WAL { private volatile int syncFuturesCount = 0; private volatile SafePointZigZagLatch zigzagLatch; /** + * Set if we get an exception appending or syncing so that all subsequence appends and syncs + * on this WAL fail until WAL is replaced. + */ + private Exception exception = null; + /** * Object to block on while waiting on safe point. */ private final Object safePointWaiter = new Object(); @@ -1696,17 +1717,30 @@ public class FSHLog implements WAL { } private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { + // There could be handler-count syncFutures outstanding. for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); this.syncFuturesCount = 0; } + /** + * @return True if outstanding sync futures still + */ + private boolean isOutstandingSyncs() { + for (int i = 0; i < this.syncFuturesCount; i++) { + if (!this.syncFutures[i].isDone()) return true; + } + return false; + } + @Override // We can set endOfBatch in the below method if at end of our this.syncFutures array public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) throws Exception { // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // add appends to dfsclient as they come in. Batching appends doesn't give any significant - // benefit on measurement. Handler sync calls we will batch up. + // benefit on measurement. Handler sync calls we will batch up. If we get an exception + // appending an edit, we fail all subsequent appends and syncs with the same exception until + // the WAL is reset. try { if (truck.hasSyncFuturePayload()) { @@ -1716,12 +1750,17 @@ public class FSHLog implements WAL { } else if (truck.hasFSWALEntryPayload()) { TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { - append(truck.unloadFSWALEntryPayload()); + FSWALEntry entry = truck.unloadFSWALEntryPayload(); + // If already an exception, do not try to append. Throw. + if (this.exception != null) throw this.exception; + append(entry); } catch (Exception e) { + // Failed append. Record the exception. Throw it from here on out til new WAL in place + this.exception = e; // If append fails, presume any pending syncs will fail too; let all waiting handlers // know of the exception. cleanupOutstandingSyncsOnException(sequence, e); - // Return to keep processing. + // Return to keep processing events coming off the ringbuffer return; } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); @@ -1748,13 +1787,20 @@ public class FSHLog implements WAL { } // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the - // syncRunner. We should never get an exception in here. HBASE-11145 was because queue - // was sized exactly to the count of user handlers but we could have more if we factor in - // meta handlers doing opens and closes. + // syncRunner. We should never get an exception in here. int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; try { - this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + if (this.exception != null) { + // Do not try to sync. If a this.exception, then we failed an append. Do not try to + // sync a failed append. Fall through to the attainSafePoint below. It is part of the + // means by which we put in place a new WAL. A new WAL is how we clean up. + // Don't throw because then we'll not get to attainSafePoint. + cleanupOutstandingSyncsOnException(sequence, this.exception); + } else { + this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + } } catch (Exception e) { + // Should NEVER get here. cleanupOutstandingSyncsOnException(sequence, e); throw e; } @@ -1777,16 +1823,24 @@ public class FSHLog implements WAL { private void attainSafePoint(final long currentSequence) { if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. + beforeWaitOnSafePoint(); try { // Wait on outstanding syncers; wait for them to finish syncing (unless we've been - // shutdown or unless our latch has been thrown because we have been aborted). + // shutdown or unless our latch has been thrown because we have been aborted or unless + // this WAL is broken and we can't get a sync/append to complete). while (!this.shutdown && this.zigzagLatch.isCocked() && - highestSyncedSequence.get() < currentSequence) { + highestSyncedSequence.get() < currentSequence && + // We could be in here and all syncs are failing or failed. Check for this. Otherwise + // we'll just be stuck here for ever. In other words, ensure there syncs running. + isOutstandingSyncs()) { synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } } - // Tell waiting thread we've attained safe point + // Tell waiting thread we've attained safe point. Can clear this.throwable if set here + // because we know that next event through the ringbuffer will be going to a new WAL + // after we do the zigzaglatch dance. + this.exception = null; this.zigzagLatch.safePointAttained(); } catch (InterruptedException e) { LOG.warn("Interrupted ", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 62ab458..7de8367 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -42,9 +42,9 @@ import org.apache.htrace.Span; * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync * call every time a Handler asks for it. *

- * SyncFutures are immutable but recycled. Call {@link #reset(long, Span)} before use even + * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even * if it the first time, start the sync, then park the 'hitched' thread on a call to - * {@link #get()} + * #get(). */ @InterfaceAudience.Private class SyncFuture { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 69c2aec..e8056e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -30,7 +30,6 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.util.ByteStringer; @@ -302,24 +301,8 @@ public class WALKey implements SequenceId, Comparable { */ @Override public long getSequenceId() throws IOException { - return getSequenceId(-1); - } - - /** - * Wait for sequence number is assigned & return the assigned value - * @param maxWaitForSeqId maximum duration, in milliseconds, to wait for seq number to be assigned - * @return long the new assigned sequence number - * @throws IOException - */ - public long getSequenceId(int maxWaitForSeqId) throws IOException { try { - if (maxWaitForSeqId < 0) { - this.seqNumAssignedLatch.await(); - } else { - if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out waiting for seq number to be assigned"); - } - } + this.seqNumAssignedLatch.await(); } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for next log sequence number"); InterruptedIOException iie = new InterruptedIOException(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 826c9b3..34413fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -71,6 +72,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -90,6 +92,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; @@ -131,6 +134,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -155,6 +159,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; import org.junit.After; import org.junit.Assert; @@ -166,6 +171,7 @@ import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.mockito.exceptions.verification.WantedButNotInvoked; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -233,6 +239,307 @@ public class TestHRegion { } /** + * Reproduce locking up that happens when we get an exceptions appending and syncing. + * See HBASE-14317. + * First I need to set up some mocks for Server and RegionServerServices. I also need to + * set up a dodgy WAL that will throw an exception when we go to append to it. + */ + @Test (timeout=300000) + public void testLockupAroundBadAssignSync() throws IOException { + // Dodgy WAL. Will throw exceptions when flags set. + class DodgyFSLog extends FSHLog { + volatile boolean throwSyncException = false; + volatile boolean throwAppendException = false; + + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwSyncException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwAppendException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + } + + // Make up mocked server and services. + Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(CONF); + when(server.isStopped()).thenReturn(false); + when(server.isAborted()).thenReturn(false); + RegionServerServices services = mock(RegionServerServices.class); + // OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with + // the test. + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + getName()); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + LogRoller logRoller = new LogRoller(server, services); + logRoller.addWAL(dodgyWAL); + logRoller.start(); + + boolean threwOnSync = false; + boolean threwOnAppend = false; + boolean threwOnBoth = false; + + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, dodgyWAL, COLUMN_FAMILY_BYTES); + try { + // Get some random bytes. + byte[] value = Bytes.toBytes(getName()); + try { + // First get something into memstore + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), value); + region.put(put); + } catch (IOException ioe) { + fail(); + } + + try { + dodgyWAL.throwAppendException = true; + dodgyWAL.throwSyncException = false; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("3"), value); + region.put(put); + } catch (IOException ioe) { + threwOnAppend = true; + } + // When we get to here.. we should be ok. A new WAL has been put in place. There were no + // appends to sync. We should be able to continue. + + try { + dodgyWAL.throwAppendException = true; + dodgyWAL.throwSyncException = true; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("4"), value); + region.put(put); + } catch (IOException ioe) { + threwOnBoth = true; + } + // Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able + // to just continue. + + // So, should be no abort at this stage. Verify. + Mockito.verify(server, Mockito.atLeast(0)). + abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + try { + dodgyWAL.throwAppendException = false; + dodgyWAL.throwSyncException = true; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("2"), value); + region.put(put); + } catch (IOException ioe) { + threwOnSync = true; + } + // An append in the WAL but the sync failed is a server abort condition. That is our + // current semantic. Verify. It takes a while for abort to be called. Just hang here till it + // happens. If it don't we'll timeout the whole test. That is fine. + while (true) { + try { + Mockito.verify(server, Mockito.atLeast(1)). + abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + break; + } catch (WantedButNotInvoked t) { + Threads.sleep(1); + } + } + } finally { + assertTrue("The regionserver should have thrown an exception", threwOnBoth); + assertTrue("The regionserver should have thrown an exception", threwOnAppend); + assertTrue("The regionserver should have thrown an exception", threwOnSync); + } + } + + /** + * Reproduce locking up that happens when we get an inopportune sync during setup for + * zigzaglatch wait. See HBASE-14317. If below is broken, we will see this test timeout because + * it is locked up. + *

First I need to set up some mocks for Server and RegionServerServices. I also need to + * set up a dodgy WAL that will throw an exception when we go to append to it. + */ + @Test (timeout=30000) + public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException { + // A WAL that we can have throw exceptions when a flag is set. + class DodgyFSLog extends FSHLog { + // Set this when want the WAL to start throwing exceptions. + volatile boolean throwException = false; + + // Latch to hold up processing until after another operation has had time to run. + CountDownLatch latch = new CountDownLatch(1); + + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + protected void afterCreatingZigZagLatch() { + // If throwException set, then append will throw an exception causing the WAL to be + // rolled. We'll come in here. Hold up processing until a sync can get in before + // the zigzag has time to complete its setup and get its own sync in. This is what causes + // the lock up we've seen in production. + if (throwException) { + try { + LOG.info("LATCHED"); + this.latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + @Override + protected void beforeWaitOnSafePoint() { + if (throwException) { + LOG.info("COUNTDOWN"); + this.latch.countDown(); + } + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwException) { + throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwException) { + throw new IOException("FAKE! Failed to replace a bad datanode...APPEND"); + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + } + + // Mocked up server and regionserver services. Needed below. + Server server = Mockito.mock(Server.class); + Mockito.when(server.getConfiguration()).thenReturn(CONF); + Mockito.when(server.isStopped()).thenReturn(false); + Mockito.when(server.isAborted()).thenReturn(false); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + + // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test. + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + getName()); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + Path originalWAL = dodgyWAL.getCurrentFileName(); + // I need a log roller running. + LogRoller logRoller = new LogRoller(server, services); + logRoller.addWAL(dodgyWAL); + // There is no 'stop' once a logRoller is running.. it just dies. + logRoller.start(); + // Now get a region and start adding in edits. + HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); + final HRegion region = initHRegion(tableName, null, null, getName(), + CONF, false, Durability.SYNC_WAL, dodgyWAL, COLUMN_FAMILY_BYTES); + byte [] bytes = Bytes.toBytes(getName()); + try { + // First get something into memstore. Make a Put and then pull the Cell out of it. Will + // manage append and sync carefully in below to manufacture hang. We keep adding same + // edit. WAL subsystem doesn't care. + Put put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName()); + WALEdit edit = new WALEdit(); + List cells = new ArrayList(); + for (CellScanner cs = put.cellScanner(); cs.advance();) { + edit.add(cs.current()); + cells.add(cs.current()); + } + // Put something in memstore and out in the WAL. Do a big number of appends so we push + // out other side of the ringbuffer. If small numbers, stuff doesn't make it to WAL + for (int i = 0; i < 1000; i++) { + dodgyWAL.append(htd, region.getRegionInfo(), key, edit, region.getSequenceId(), true, + cells); + } + // Set it so we start throwing exceptions. + dodgyWAL.throwException = true; + // This append provokes a WAL roll. + dodgyWAL.append(htd, region.getRegionInfo(), key, edit, region.getSequenceId(), true, cells); + boolean exception = false; + Mockito.verify(server, Mockito.atLeast(0)). + abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + try { + dodgyWAL.sync(); + } catch (Exception e) { + exception = true; + } + assertTrue("Did not get sync exception", exception); + // Get a memstore flush going too so we have same hung profile as up in the issue over + // in HBASE-14317. Flush hangs trying to get sequenceid because the ringbuffer is held up + // by the zigzaglatch waiting on syncs to come home. + Thread t = new Thread ("flusher") { + public void run() { + try { + region.flush(false); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + }; + t.setDaemon(true); + t.start(); + // Wait till it gets into flushing. It will get stuck on getSequenceId. Then proceed. + while (!region.writestate.flushing) Threads.sleep(1); + // Now assert I got a new WAL file put in place even though loads of errors above. + assertTrue(originalWAL != dodgyWAL.getCurrentFileName()); + // Can I append to it? + dodgyWAL.throwException = false; + region.put(put); + } finally { + if (region != null) region.close(); + if (dodgyWAL != null) dodgyWAL.close(); + } + } + + /** * Test that I can use the max flushed sequence id after the close. * @throws IOException */ @@ -293,6 +600,8 @@ public class TestHRegion { HBaseTestingUtility.closeRegionAndWAL(region); } + + /* * This test is for verifying memstore snapshot size is correctly updated in case of rollback * See HBASE-10845 @@ -916,7 +1225,7 @@ public class TestHRegion { // now verify that the flush markers are written wal.shutdown(); - WAL.Reader reader = wals.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), + WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), TEST_UTIL.getConfiguration()); try { List flushDescriptors = new ArrayList(); @@ -1032,7 +1341,7 @@ public class TestHRegion { } } - @Test + @Test (timeout=60000) @SuppressWarnings("unchecked") public void testFlushMarkersWALFail() throws Exception { // test the cases where the WAL append for flush markers fail. @@ -5702,7 +6011,6 @@ public class TestHRegion { putData(startRow, numRows, qualifier, families); int splitRow = startRow + numRows; putData(splitRow, numRows, qualifier, families); - int endRow = splitRow + numRows; region.flush(true); HRegion [] regions = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java index 7b6e7b3..c811cda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java @@ -129,7 +129,5 @@ public class TestMultiVersionConcurrencyControl extends TestCase { for (int i = 0; i < n; ++i) { assertTrue(statuses[i].get()); } - } - -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java new file mode 100644 index 0000000..eceb924 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControlBasic.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Very basic tests. + * @see TestMultiVersionConcurrencyControl for more. + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMultiVersionConcurrencyControlBasic { + @Test + public void testSimpleMvccOps() { + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + long readPoint = mvcc.memstoreReadPoint(); + MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.beginMemstoreInsert(); + mvcc.completeMemstoreInsert(writeEntry); + long readPoint2 = mvcc.memstoreReadPoint(); + assertEquals(readPoint, readPoint2); + long seqid = 238; + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid); + mvcc.completeMemstoreInsert(writeEntry); + assertEquals(seqid, mvcc.memstoreReadPoint()); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid + 1); + assertTrue(mvcc.advanceMemstore(writeEntry)); + assertEquals(seqid + 1, mvcc.memstoreReadPoint()); + } + + @Test + public void testCancel() { + long seqid = 238; + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + MultiVersionConcurrencyControl.WriteEntry writeEntry = + mvcc.beginMemstoreInsertWithSeqNum(seqid); + assertTrue(mvcc.advanceMemstore(writeEntry)); + assertEquals(seqid, mvcc.memstoreReadPoint()); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid + 1); + mvcc.cancelMemstoreInsert(writeEntry); + assertEquals(seqid, mvcc.memstoreReadPoint()); + } +}