From 4a94635a2929c2bdc957f35784b12de6628b4afd Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 7 Apr 2015 02:51:00 -0700 Subject: [PATCH] HBASE-12751 Allow RowLock to be reader writer Summary: Right now every write operation grabs a row lock. This is to prevent values from changing during a read modify write operation (increment or check and put). However it limits parallelism in several different scenarios. If there are several puts to the same row but different columns or stores then this is very limiting. If there are puts to the same column then mvcc number should ensure a consistent ordering. So locking is not needed. However locking for check and put or increment is still needed. Test Plan: - all tests in org.apache.hadoop.hbase.regionserver either pass or are failing even without this change - needs to be tested with loadtesttool and/or shadow data to see if there's actually any performance improvement Reviewers: nkedel Differential Revision: https://reviews.facebook.net/D32079 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 456 +++++++++++---------- .../apache/hadoop/hbase/regionserver/HStore.java | 4 +- .../MultiVersionConsistencyControl.java | 217 ++++------ .../hadoop/hbase/regionserver/wal/FSHLog.java | 40 +- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 38 +- .../hadoop/hbase/regionserver/wal/HLogKey.java | 14 +- .../hbase/regionserver/wal/ReplayHLogKey.java | 9 +- .../hadoop/hbase/regionserver/wal/WALUtil.java | 28 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 2 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 4 +- .../java/org/apache/hadoop/hbase/wal/WALKey.java | 61 ++- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 2 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 2 +- .../hadoop/hbase/coprocessor/TestWALObserver.java | 19 +- .../hbase/mapreduce/TestWALRecordReader.java | 17 +- .../hbase/master/TestDistributedLogSplitting.java | 6 +- .../hadoop/hbase/regionserver/TestBulkLoad.java | 5 +- .../hbase/regionserver/TestDefaultMemStore.java | 14 +- .../hadoop/hbase/regionserver/TestHRegion.java | 17 +- .../regionserver/TestHRegionReplayEvents.java | 10 +- .../hbase/regionserver/TestHeapMemoryManager.java | 6 +- .../hadoop/hbase/regionserver/TestKeepDeletes.java | 9 + .../TestMultiVersionConsistencyControl.java | 2 +- .../regionserver/TestStoreFileRefresherChore.java | 2 +- .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 28 +- .../hbase/regionserver/wal/TestLogRollAbort.java | 2 +- .../regionserver/wal/TestLogRollingNoCluster.java | 2 +- .../regionserver/wal/TestWALActionsListener.java | 3 +- .../hbase/regionserver/wal/TestWALReplay.java | 10 +- .../regionserver/TestReplicationSourceManager.java | 6 +- .../TestReplicationWALReaderManager.java | 2 +- .../org/apache/hadoop/hbase/wal/FaultyFSLog.java | 4 +- .../hadoop/hbase/wal/TestDefaultWALProvider.java | 36 +- .../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 3 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 21 +- .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 2 +- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 2 +- 37 files changed, 528 insertions(+), 577 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cf48619..10a6ae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -224,19 +224,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Record the sequence id of last flush operation. */ private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM; - /** - * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL - * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. - * Its default value is -1L. This default is used as a marker to indicate - * that the region hasn't opened yet. Once it is opened, it is set to the derived - * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. - * - *

Control of this sequence is handed off to the WAL implementation. It is responsible - * for tagging edits with the correct sequence id since it is responsible for getting the - * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT - * OUTSIDE OF THE WAL. The value you get will not be what you think it is. - */ - private final AtomicLong sequenceId = new AtomicLong(-1L); /** * The sequence id of the last replayed open region event from the primary region. This is used @@ -952,8 +939,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId, getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, - getSequenceId()); + WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc); } private void writeRegionCloseMarker(WAL wal) throws IOException { @@ -967,17 +953,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( - RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(), + RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.memstoreRead.get(), getRegionServerServices().getServerName(), storeFiles); - WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, - getSequenceId()); + WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), - getSequenceId().get(), 0); + mvcc.memstoreReadPoint(), 0); } } @@ -1909,11 +1894,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxFlushedSeqId = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store .getFamily().getName()) - 1; - if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) { + if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < mvcc.memstoreReadPoint()) { if (LOG.isDebugEnabled()) { LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this + " will be flushed because its max flushed seqId(" + maxFlushedSeqId - + ") is far away from current(" + sequenceId.get() + "), max allowed is " + + ") is far away from current(" + mvcc.memstoreReadPoint() + "), max allowed is " + flushPerChanges); } return true; @@ -1939,7 +1924,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean shouldFlush() { // This is a rough measure. if (this.maxFlushedSeqId > 0 - && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) { + && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.memstoreReadPoint())) { return true; } long modifiedFlushCheckInterval = flushCheckInterval; @@ -2053,12 +2038,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // wal can be null replaying edits. if (wal != null) { w = mvcc.beginMemstoreInsert(); - long flushOpSeqId = getNextSequenceId(wal); + long flushOpSeqId = w.getWriteNumber(); FlushResult flushResult = new FlushResultImpl( FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); - w.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(w); + mvcc.completeMemstoreInsert(w); w = null; return new PrepareFlushResult(flushResult, myseqid); } else { @@ -2135,7 +2119,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); } - flushOpSeqId = getNextSequenceId(wal); + flushOpSeqId = w.getWriteNumber(); long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); // no oldestUnflushedSeqId means we flushed all stores. // or the unflushed stores are all empty. @@ -2158,7 +2142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo(), flushOpSeqId, committedFiles); // no sync. Sync is below where we do not hold the updates lock trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, false); + desc, false); } // Prepare flush (take a snapshot) @@ -2172,7 +2156,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, false); + desc,false); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + StringUtils.stringifyException(t)); @@ -2206,8 +2190,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - w.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(w); + mvcc.completeMemstoreInsert(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block w = null; } finally { @@ -2232,7 +2215,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo(), -1, new TreeMap>()); try { WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, true); + desc, true); return true; } catch (IOException e) { LOG.warn(getRegionInfo().getEncodedName() + " : " @@ -2295,7 +2278,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, true); + desc, true); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. @@ -2309,7 +2292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, false); + desc, false); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" @@ -2372,7 +2355,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { - WALKey key = this.appendEmptyEdit(wal, null); + WALKey key = this.appendEmptyEdit(wal); + mvcc.completeMemstoreInsert(key.getWriteEntry()); return key.getSequenceId(); } @@ -2828,7 +2812,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; - List memstoreCells = new ArrayList(); // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -2896,7 +2879,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), shouldBlock); + // request a read lock for a put or delete; this is safe if we're already + // within a write lock context (e.g. from checkAndMutate) as it will just + // reuse the existing context. + rowLock = getRowLockInternal(mutation.getRow(), shouldBlock, false); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -2963,16 +2949,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - if(isInReplay) { - mvccNum = batchOp.getReplaySequenceId(); - } else { - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - } - // - // ------------------------------------ - // Acquire the latest mvcc number - // ---------------------------------- - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2983,26 +2960,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // ------------------------------------ - // STEP 3. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without updating the WAL because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay); - } - - // ------------------------------------ - // STEP 4. Build WAL edit + // STEP 3. Build WAL edit // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -3011,7 +2969,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi != OperationStatusCode.NOT_RUN) { continue; } - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.getMutation(i); Durability tmpDur = getEffectiveDurability(m.getDurability()); @@ -3037,9 +2994,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), now, m.getClusterIds(), - currentNonceGroup, currentNonce); + currentNonceGroup, currentNonce, mvcc); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, - walEdit, getSequenceId(), true, null); + walEdit, true); walEdit = new WALEdit(isInReplay); walKey = null; } @@ -3058,22 +3015,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // ------------------------- - // STEP 5. Append the final edit to WAL. Do not sync wal. + // STEP 4. Append the final edit to WAL. Do not sync wal. // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (isInReplay) { // use wal key from the original walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce); + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); long replaySeqId = batchOp.getReplaySequenceId(); walKey.setOrigLogSeqNum(replaySeqId); // ensure that the sequence id of the region is at least as big as orig log seq id while (true) { - long seqId = getSequenceId().get(); + long seqId = mvcc.memstoreWrite.get(); if (seqId >= replaySeqId) break; - if (getSequenceId().compareAndSet(seqId, replaySeqId)) break; + if (mvcc.initialize(/* newSeqId = */ replaySeqId, /* expected = */ seqId)) break; } } if (walEdit.size() > 0) { @@ -3081,15 +3038,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce); + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); } + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + } - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, - getSequenceId(), true, memstoreCells); + // ------------------------------------ + // Acquire the latest mvcc number + // ---------------------------------- + if (walKey == null) { + // If this is a skip wal operation just get the read point from mvcc + walKey = this.appendEmptyEdit(this.wal); + } + if (!isInReplay) { + w = walKey.getWriteEntry(); + mvccNum = w.getWriteNumber(); + } else { + mvccNum = batchOp.getReplaySequenceId(); } - if(walKey == null){ - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); + + // ------------------------------------ + // STEP 5. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without syncing the WAL because we do not roll + // forward the memstore MVCC. The MVCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the MVCC. The MVCC is + // moved only when the sync is complete. + // ---------------------------------- + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); } // ------------------------------- @@ -3117,15 +3101,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postBatchMutate(miniBatchOp); } - // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + mvcc.completeMemstoreInsert(w); w = null; } + for (int i = firstIndex; i < lastIndexExclusive; i ++) { + if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + } + // ------------------------------------ // STEP 9. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. @@ -3151,10 +3140,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for (int j=0; j cells:familyMaps[j].values()) { + rollbackMemstore(cells); + } + } } if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + mvcc.advanceMemstore(w); } if (locked) { @@ -3241,7 +3234,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.waitForPreviousTransactionsComplete(); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); try { if (this.getCoprocessorHost() != null) { Boolean processed = null; @@ -3334,7 +3327,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.waitForPreviousTransactionsComplete(); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); try { List result = get(get, false); @@ -3393,7 +3386,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -3566,7 +3559,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, List memstoreCells, boolean isInReplay) throws IOException { + long mvccNum, boolean isInReplay) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3577,10 +3570,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i=0; i < listSize; i++) { Cell cell = cells.get(i); - CellUtil.setSequenceId(cell, mvccNum); + if (cell.getSequenceId() == 0) { + CellUtil.setSequenceId(cell, mvccNum); + } Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); if(isInReplay) { // set memstore newly added cells with replay mvcc number CellUtil.setSequenceId(ret.getSecond(), mvccNum); @@ -4338,12 +4332,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.maxFlushedSeqId = flush.getFlushSequenceNumber(); // advance the mvcc read point so that the new flushed file is visible. - // there may be some in-flight transactions, but they won't be made visible since they are - // either greater than flush seq number or they were already dropped via flush. - // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other - // stores while they are still in flight because the flush commit marker will not contain - // flushes from ALL stores. - getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber()); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); } catch (FileNotFoundException ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " @@ -4417,8 +4406,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long totalFreedSize = 0; this.updatesLock.writeLock().lock(); try { - mvcc.waitForPreviousTransactionsComplete(); - long currentSeqId = getSequenceId().get(); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + + long currentSeqId = mvcc.memstoreRead.get(); if (seqId >= currentSeqId) { // then we can drop the memstore contents since everything is below this seqId LOG.info(getRegionInfo().getEncodedName() + " : " @@ -4581,9 +4571,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dropPrepareFlushIfPossible(); // advance the mvcc read point so that the new flushed file is visible. - // there may be some in-flight transactions, but they won't be made visible since they are - // either greater than flush seq number or they were already dropped via flush. - getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); // If we were waiting for observing a flush or region opening event for not showing partial // data after a secondary region crash, we can allow reads now. @@ -4674,7 +4662,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } if (bulkLoadEvent.getBulkloadSeqNum() > 0) { - getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum()); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.initialize(bulkLoadEvent.getBulkloadSeqNum()); } } finally { closeBulkRegionOperation(); @@ -4769,11 +4758,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dropPrepareFlushIfPossible(); // advance the mvcc read point so that the new flushed files are visible. - // there may be some in-flight transactions, but they won't be made visible since they are - // either greater than flush seq number or they were already picked up via flush. - for (Store s : getStores()) { - getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS()); - } + // either greater than flush seq number or they were already picked up via flush. + for (Store s : getStores()) { + getMVCC().initialize(s.getMaxMemstoreTS()); + } + // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely // skip all edits that are to be replayed in the future with that has a smaller seqId @@ -4935,19 +4924,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * A version of getRowLock(byte[], boolean) to use when a region operation has already been * started (the calling thread has already acquired the region-close-guard lock). + * + * This assumes a default to a fully-exclusive (writer) lock, and is intended + * for read-modify-write operations and other cases where we can't be sure of + * safe concurrent writes. Safe concurrent write operations should use + * getRowLockInternal(row, waitForLock, /readLock/ true) for lower lock latency. */ protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { + return getRowLockInternal(row, waitForLock, false); + } + + /** + * A version of getRowLock(byte[], boolean) to use when a region operation + * has already been started (the calling thread has already acquired the + * region-close-guard lock). + * + * @param readLock false indicates this assumes should be a fully-exclusive + * (writer) lock, and is intendedfor read-modify-write operations + * and other cases where we can't be sure of safe concurrent writes. + * Use true for simple puts and other safe concurrent operations + * for lower lock latency. + */ + protected RowLock getRowLockInternal(byte[] row, boolean waitForLock, boolean readLock) + throws IOException { + checkRow(row, "row lock"); HashedBytes rowKey = new HashedBytes(row); - RowLockContext rowLockContext = new RowLockContext(rowKey); - + RowLockContext rowLockContext = new RowLockContext(rowKey, readLock); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; - } else if (existingContext.ownedByCurrentThread()) { - // Row is already locked by current thread, reuse existing context instead. + } else if (readLock && existingContext.isReadLock()) { + // Row is already locked by a read lock, and we're asking for a read lock; + // reuse the current context. + rowLockContext = existingContext; + break; + } else if (existingContext.ownedByCurrentThread() && !existingContext.isReadLock()) { + // Row is already locked for write by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { @@ -5148,7 +5163,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(), - loadDescriptor, sequenceId); + loadDescriptor); } catch (IOException ioe) { if (this.rsServices != null) { // Have to abort region server because some hfiles has been loaded but we can't write @@ -5827,7 +5842,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (initialize) { // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when // verifying the WALEdits. - region.setSequenceId(region.initialize(null)); + region.getMVCC().initialize(region.initialize(null)); } return region; } @@ -6041,7 +6056,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Refuse to open the region if a required class cannot be loaded checkClassLoading(); this.openSeqNum = initialize(reporter); - this.setSequenceId(openSeqNum); + this.mvcc.initialize(openSeqNum); if (wal != null && getRegionServerServices() != null && !writestate.readOnly && !recovering) { // Only write the region open event marker to WAL if (1) we are not read-only @@ -6463,7 +6478,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks; long addedSize = 0; List mutations = new ArrayList(); - List memstoreCells = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); long mvccNum = 0; WALKey walKey = null; @@ -6472,14 +6486,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; - // Get a mvcc write number - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - long now = EnvironmentEdgeManager.currentTime(); try { // 4. Let the processor scan the rows, generate mutations and add @@ -6488,11 +6500,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - // 5. Start mvcc transaction - writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - // 6. Call the preBatchMutate hook + + // 5. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); - // 7. Apply to memstore + + long txid = 0; + // 6. Append no sync + if (!walEdit.isEmpty()) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + processor.getClusterIds(), nonceGroup, nonce, mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdit, false); + } + if(walKey == null){ + // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit + // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId + walKey = this.appendEmptyEdit(this.wal); + } + + // 7. Start mvcc transaction + writeEntry = walKey.getWriteEntry(); + mvccNum = walKey.getSequenceId(); + + + + // 8. Apply to memstore for (Mutation m : mutations) { // Handle any tag based cell features rewriteCellTags(m.getFamilyCellMap(), m); @@ -6507,25 +6541,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Pair ret = store.add(cell); addedSize += ret.getFirst(); - memstoreCells.add(ret.getSecond()); } } - long txid = 0; - // 8. Append no sync - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - processor.getClusterIds(), nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdit, getSequenceId(), true, memstoreCells); - } - if(walKey == null){ - // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit - // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } // 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -6557,7 +6575,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // 13. Roll mvcc forward if (writeEntry != null) { - mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + mvcc.completeMemstoreInsert(writeEntry); } if (locked) { this.updatesLock.readLock().unlock(); @@ -6658,25 +6676,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry w = null; WALKey walKey = null; RowLock rowLock = null; - List memstoreCells = new ArrayList(); boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); + assert rowLock != null; try { lock(this.updatesLock.readLock()); try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.waitForPreviousTransactionsComplete(); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preAppendAfterRowLock(append); if(r!= null) { return r; } } - // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { @@ -6790,7 +6806,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - CellUtil.setSequenceId(newCell, mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, @@ -6811,42 +6826,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi tempMemstore.put(store, kvs); } + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, mvcc); + txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); + } else { + recordMutationWithoutWal(append.getFamilyCellMap()); + } + if (walKey == null) { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendEmptyEdit(this.wal); + } + // now start my own transaction + w = walKey.getWriteEntry(); + mvccNum = walKey.getSequenceId(); + + //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { + CellUtil.setSequenceId(cell, mvccNum); Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } allKVs.addAll(entry.getValue()); } - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, - this.sequenceId, true, memstoreCells); - } else { - recordMutationWithoutWal(append.getFamilyCellMap()); - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -6867,10 +6885,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for(Listcells:tempMemstore.values()) { + rollbackMemstore(cells); + } } if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.APPEND); } @@ -6920,26 +6940,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RowLock rowLock = null; WriteEntry w = null; WALKey walKey = null; - long mvccNum = 0; - List memstoreCells = new ArrayList(); + long seqId = 0; boolean doRollBackMemstore = false; try { - rowLock = getRowLock(row); + rowLock = getRowLockInternal(row, true, false); try { lock(this.updatesLock.readLock()); try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.waitForPreviousTransactionsComplete(); + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); if (r != null) { return r; } } - // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family: @@ -7021,8 +7038,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi val, 0, val.length, newTags); - CellUtil.setSequenceId(newKV, mvccNum); - // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = coprocessorHost.postMutationBeforeWAL( @@ -7049,6 +7064,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + + // Actually write to WAL now + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, true); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + } + if (walKey == null) { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal); + } + + + // now start my own transaction + w = walKey.getWriteEntry(); + seqId = walKey.getSequenceId(); + //Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -7056,13 +7097,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { + CellUtil.setSequenceId(cell, seqId); Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } @@ -7070,26 +7110,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, getSequenceId(), true, memstoreCells); - } else { - recordMutationWithoutWal(increment.getFamilyCellMap()); - } - } - if(walKey == null){ - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } } finally { this.updatesLock.readLock().unlock(); } @@ -7108,10 +7128,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for(List cells:tempMemstore.values()) { + rollbackMemstore(cells); + } } if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + mvcc.completeMemstoreInsert(w); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { @@ -7142,7 +7164,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 43 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -7282,7 +7304,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new IOException("Not a known catalog table: " + p.toString()); } try { - region.initialize(null); + region.mvcc.initialize(region.initialize(null)); if (majorCompact) { region.compact(true); } else { @@ -7702,29 +7724,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return sequenceId */ @VisibleForTesting - public AtomicLong getSequenceId() { - return this.sequenceId; - } - - /** - * sets this region's sequenceId. - * @param value new value - */ - private void setSequenceId(long value) { - this.sequenceId.set(value); + public long getSequenceId() { + return this.mvcc.memstoreReadPoint(); } @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); + private final boolean readLock; private final Thread thread; private int lockCount = 0; + // defaults readLock to false (fully exclusive) RowLockContext(HashedBytes row) { this.row = row; + this.readLock = false; this.thread = Thread.currentThread(); } + RowLockContext(HashedBytes row, boolean readLock) { + this.row = row; + this.readLock = readLock; + this.thread = Thread.currentThread(); + } + + boolean isReadLock() { + return readLock; + } + boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } @@ -7744,7 +7771,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } void releaseLock() { - if (!ownedByCurrentThread()) { + // todo: is this safe, or do we need to keep a set of owning threads on read locks? + if (!ownedByCurrentThread() && !readLock) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } @@ -7793,14 +7821,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Return the key used appending with no sync and no append. * @throws IOException */ - private WALKey appendEmptyEdit(final WAL wal, List cells) throws IOException { + private WALKey appendEmptyEdit(final WAL wal) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), - WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); + WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated // with any edit and we can be sure it went in after all outstanding appends. wal.append(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); + WALEdit.EMPTY_WALEDIT, false); return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d4157f9..ceafbae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -641,7 +641,7 @@ public class HStore implements Store { // readers might pick it up. This assumes that the store is not getting any writes (otherwise // in-flight transactions might be made visible) if (!toBeAddedFiles.isEmpty()) { - region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId()); + region.getMVCC().initialize(this.getMaxSequenceId()); } // notify scanners, close file readers, and recompute store size @@ -1288,7 +1288,7 @@ public class HStore implements Store { CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId()); + this.region.getRegionInfo(), compactionDescriptor); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 96af2c3..fe8b886 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; @@ -34,8 +33,9 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private static final long NO_WRITE_NUMBER = 0; - private volatile long memstoreRead = 0; + final AtomicLong memstoreRead = new AtomicLong(0); + final AtomicLong memstoreWrite = new AtomicLong(0); + private final Object readWaiters = new Object(); // This is the pending queue of writes. @@ -52,88 +52,54 @@ public class MultiVersionConsistencyControl { * Initializes the memstoreRead/Write points appropriately. * @param startPoint */ - public void initialize(long startPoint) { + public boolean initialize(long startPoint) { + return initialize(startPoint, -1); + } + + public boolean initialize(long startPoint, long expected) { synchronized (writeQueue) { - writeQueue.clear(); - memstoreRead = startPoint; + long currentRead = this.memstoreRead.get(); + long currentWrite = this.memstoreWrite.get(); + if (currentRead != currentWrite) { + throw new RuntimeException("Already used this mvcc. Too late to initialize"); + } + if (expected != -1 && expected != currentRead) { + return false; + } + + if (startPoint < currentRead) { + return false; + } + + memstoreRead.set(startPoint); + memstoreWrite.set(startPoint); } + return true; } /** - * - * @param initVal The value we used initially and expected it'll be reset later - * @return WriteEntry instance. + * Generate and return a {@link WriteEntry} with a new write number. + * To complete the WriteEntry and wait for it to be visible, + * call {@link #completeMemstoreInsert(WriteEntry)}. */ - WriteEntry beginMemstoreInsert() { - return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); - } - - /** - * Get a mvcc write number before an actual one(its log sequence Id) being assigned - * @param sequenceId - * @return long a faked write number which is bigger enough not to be seen by others before a real - * one is assigned - */ - public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { - // the 1 billion is just an arbitrary big number to guard no scanner will reach it before - // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers - // because each handler could increment sequence num twice and max concurrent in-flight - // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key - // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all - return sequenceId.incrementAndGet() + 1000000000; - } - - /** - * This function starts a MVCC transaction with current region's log change sequence number. Since - * we set change sequence number when flushing current change to WAL(late binding), the flush - * order may differ from the order to start a MVCC transaction. For example, a change begins a - * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we - * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent - * transactions will reuse the number till current MVCC completes(success or fail). The "faked" - * big number is safe because we only need it to prevent current change being seen and the number - * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order - * for MVCC to align with flush sequence. - * @param curSeqNum - * @return WriteEntry a WriteEntry instance with the passed in curSeqNum - */ - public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { - WriteEntry e = new WriteEntry(curSeqNum); + public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { + long nextWriteNumber = memstoreWrite.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } /** - * Complete a {@link WriteEntry} that was created by - * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read - * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is - * visible to MVCC readers. - * @throws IOException - */ - public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) - throws IOException { - if(e == null) return; - if (seqId != null) { - e.setWriteNumber(seqId.getSequenceId()); - } else { - // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside - // function beginMemstoreInsertWithSeqNum in case of failures - e.setWriteNumber(NO_WRITE_NUMBER); - } - waitForPreviousTransactionsComplete(e); - } - - /** - * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the - * end of this call, the global read point is at least as large as the write point of the passed - * in WriteEntry. Thus, the write is visible to MVCC readers. + * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. + * + * At the end of this call, the global read point is at least as large as the write point + * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. */ public void completeMemstoreInsert(WriteEntry e) { - waitForPreviousTransactionsComplete(e); + advanceMemstore(e); + waitForRead(e); } /** @@ -148,106 +114,75 @@ public class MultiVersionConsistencyControl { * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ boolean advanceMemstore(WriteEntry e) { - long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); + long nextReadValue = -1; + boolean ranOnce=false; while (!writeQueue.isEmpty()) { + ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + if (queueFirst.isCompleted()) { - // Using Max because Edit complete in WAL sync order not arriving order - nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); + nextReadValue = queueFirst.getWriteNumber(); writeQueue.removeFirst(); } else { break; } } - if (nextReadValue > memstoreRead) { - memstoreRead = nextReadValue; + if (!ranOnce) { + throw new RuntimeException("never was a first"); } - // notify waiters on writeQueue before return - writeQueue.notifyAll(); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - readWaiters.notifyAll(); + if (nextReadValue > 0) { + synchronized (readWaiters) { + memstoreRead.set( nextReadValue); + readWaiters.notifyAll(); + } } - } - - if (memstoreRead >= e.getWriteNumber()) { - return true; - } - return false; - } - - /** - * Advances the current read point to be given seqNum if it is smaller than - * that. - */ - void advanceMemstoreReadPointIfNeeded(long seqNum) { - synchronized (writeQueue) { - if (this.memstoreRead < seqNum) { - memstoreRead = seqNum; + if (memstoreRead.get() >= e.getWriteNumber()) { + return true; } + return false; } } /** - * Wait for all previous MVCC transactions complete + * Wait for the global readPoint to advance upto + * the specified transaction number. */ - public void waitForPreviousTransactionsComplete() { - WriteEntry w = beginMemstoreInsert(); - waitForPreviousTransactionsComplete(w); - } - - public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { + public void waitForRead(WriteEntry e) { boolean interrupted = false; - WriteEntry w = waitedEntry; - - try { - WriteEntry firstEntry = null; - do { - synchronized (writeQueue) { - // writeQueue won't be empty at this point, the following is just a safety check - if (writeQueue.isEmpty()) { - break; - } - firstEntry = writeQueue.getFirst(); - if (firstEntry == w) { - // all previous in-flight transactions are done - break; - } - try { - writeQueue.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - break; - } + synchronized (readWaiters) { + while (memstoreRead.get() < e.getWriteNumber()) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; } - } while (firstEntry != null); - } finally { - if (w != null) { - advanceMemstore(w); } } - if (interrupted) { - Thread.currentThread().interrupt(); - } + if (interrupted) Thread.currentThread().interrupt(); } public long memstoreReadPoint() { - return memstoreRead; + return memstoreRead.get(); } + public static class WriteEntry { private long writeNumber; - private volatile boolean completed = false; - + private boolean completed = false; WriteEntry(long writeNumber) { this.writeNumber = writeNumber; } @@ -257,12 +192,10 @@ public class MultiVersionConsistencyControl { boolean isCompleted() { return this.completed; } - long getWriteNumber() { + @InterfaceAudience.Private + public long getWriteNumber() { return this.writeNumber; } - void setWriteNumber(long val){ - this.writeNumber = val; - } } public static final long FIXED_SIZE = ClassSize.align( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 549f0ce..9b715ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1154,27 +1154,12 @@ public class FSHLog implements WAL { } } } - - /** - * @param now - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tableName - * @param clusterIds that have consumed the change - * @return New log key. - */ - protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, - long now, List clusterIds, long nonceGroup, long nonce) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); - } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, - final List memstoreCells) throws IOException { + final WALEdit edits, final boolean inMemstore) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. @@ -1188,9 +1173,9 @@ public class FSHLog implements WAL { try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the - // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the - // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); + // edit with its edit/sequence id. + // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. + entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); @@ -1216,9 +1201,9 @@ public class FSHLog implements WAL { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** - * UPDATE! + * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, * we will put the result of the actual hdfs sync call as the result. * @param sequence The sequence number on the ring buffer when this thread was set running. @@ -1266,7 +1251,7 @@ public class FSHLog implements WAL { // This function releases one sync future only. return 1; } - + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence @@ -1785,7 +1770,7 @@ public class FSHLog implements WAL { * 'safe point' while the orchestrating thread does some work that requires the first thread * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another * thread. - * + * *

Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1793,7 +1778,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *

To start up the drama, Thread A creates an instance of this class each time it would do * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it @@ -1815,7 +1800,7 @@ public class FSHLog implements WAL { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1824,7 +1809,7 @@ public class FSHLog implements WAL { * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with * an exception, then something is up w/ our syncing. * @return The passed syncFuture - * @throws FailedSyncBeforeLogCloseException + * @throws FailedSyncBeforeLogCloseException */ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, FailedSyncBeforeLogCloseException { @@ -1836,7 +1821,7 @@ public class FSHLog implements WAL { } return syncFuture; } - + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -2039,7 +2024,6 @@ public class FSHLog implements WAL { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 1ea9d4f..9b8c240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; @@ -51,23 +52,18 @@ class FSWALEntry extends Entry { // The below data members are denoted 'transient' just to highlight these are not persisted; // they are only in memory and held here while passing over the ring buffer. private final transient long sequence; - private final transient AtomicLong regionSequenceIdReference; private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; - private final transient List memstoreCells; private final Set familyNames; FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, - final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, - final HTableDescriptor htd, final HRegionInfo hri, List memstoreCells) { + final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) { super(key, edit); - this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; this.hri = hri; this.sequence = sequence; - this.memstoreCells = memstoreCells; if (inMemstore) { // construct familyNames here to reduce the work of log sinker. ArrayList cells = this.getEdit().getCells(); @@ -110,25 +106,27 @@ class FSWALEntry extends Entry { return this.sequence; } - /** - * Stamp this edit with a region edit/sequence id. - * Call when safe to do so: i.e. the context is such that the increment on the passed in - * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the - * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this - * method to be called. - * @return The region edit/sequence id we set for this edit. - * @throws IOException - * @see #getRegionSequenceId() - */ long stampRegionSequenceId() throws IOException { - long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) { - for (Cell cell : this.memstoreCells) { - CellUtil.setSequenceId(cell, regionSequenceId); + long regionSequenceId = WALKey.NO_SEQUENCE_ID; + MultiVersionConsistencyControl mvcc = getKey().getMvcc(); + MultiVersionConsistencyControl.WriteEntry we = null; + + if (mvcc != null) { + we = mvcc.beginMemstoreInsert(); + regionSequenceId = we.getWriteNumber(); + } + + if (!this.getEdit().isReplay() && inMemstore) { + for (Cell c:getEdit().getCells()) { + CellUtil.setSequenceId(c, regionSequenceId); } } + + // This has to stay in this order + WALKey key = getKey(); key.setLogSeqNum(regionSequenceId); + key.setWriteEntry(we); return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 8caf8df..a50c427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -28,10 +28,12 @@ import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.Writable; @@ -87,8 +89,8 @@ public class HLogKey extends WALKey implements Writable { * @param clusterIds the clusters that have consumed the change(used in Replication) */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { - super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); + long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { + super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc); } /** @@ -105,8 +107,8 @@ public class HLogKey extends WALKey implements Writable { * @param nonce */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, long nonce) { - super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce); + final long now, List clusterIds, long nonceGroup, long nonce, final MultiVersionConsistencyControl mvcc) { + super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc); } /** @@ -122,8 +124,8 @@ public class HLogKey extends WALKey implements Writable { * @param nonce */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, - long nonceGroup, long nonce) { - super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce); + long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { + super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java index 55c057b..ad5f64c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; /** * An HLogKey specific to WalEdits coming from replay. @@ -32,13 +33,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public class ReplayHLogKey extends HLogKey { public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, long nonce) { - super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce); + final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { + super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc); } public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { - super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); + long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { + super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 399623f..5293aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; @@ -54,13 +55,16 @@ public class WALUtil { * this regionserver dies in the middle (This part is not yet implemented). It also prevents * the compaction from finishing if this regionserver has already lost its lease on the log. * @param sequenceId Used by WAL to get sequence Id for the waledit. + * @param mvcc */ - public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info, - final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { + public static void writeCompactionMarker(WAL log, + HTableDescriptor htd, + HRegionInfo info, + final CompactionDescriptor c) throws IOException { TableName tn = TableName.valueOf(c.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); + log.append(htd, info, key, WALEdit.createCompaction(info, c), false); log.sync(); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); @@ -71,12 +75,11 @@ public class WALUtil { * Write a flush marker indicating a start / abort or a complete of a region flush */ public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info, - final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException { + final FlushDescriptor f, boolean sync) throws IOException { TableName tn = TableName.valueOf(f.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, - null); + long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false); if (sync) log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); @@ -88,12 +91,11 @@ public class WALUtil { * Write a region open marker indicating that the region is opened */ public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info, - final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException { + final RegionEventDescriptor r) throws IOException { TableName tn = TableName.valueOf(r.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), - sequenceId, false, null); + long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false); log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); @@ -116,8 +118,7 @@ public class WALUtil { public static long writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd, final HRegionInfo info, - final WALProtos.BulkLoadDescriptor descriptor, - final AtomicLong sequenceId) throws IOException { + final WALProtos.BulkLoadDescriptor descriptor) throws IOException { TableName tn = info.getTable(); WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); @@ -125,10 +126,7 @@ public class WALUtil { long trx = wal.append(htd, info, key, - WALEdit.createBulkLoadEvent(info, descriptor), - sequenceId, - false, - new ArrayList()); + WALEdit.createBulkLoadEvent(info, descriptor), false); wal.sync(trx); if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5bffea5..4e111bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -156,7 +156,7 @@ class DisabledWALProvider implements WALProvider { @Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) { + boolean inMemstore) { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..605acba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -118,12 +118,10 @@ public interface WAL { * @param inMemstore Always true except for case where we are writing a compaction completion * record into the WAL; in this case the entry is just so we can finish an unfinished compaction * -- it is not an edit for memstore. - * @param memstoreKVs list of KVs added into memstore * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. */ - long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) + long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 621c200..4078d0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -32,6 +32,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,6 +73,27 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; public class WALKey implements SequenceId, Comparable { private static final Log LOG = LogFactory.getLog(WALKey.class); + public MultiVersionConsistencyControl getMvcc() { + return mvcc; + } + + public MultiVersionConsistencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { + try { + this.seqNumAssignedLatch.await(); + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for write entry"); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + return writeEntry; + } + + public void setWriteEntry(MultiVersionConsistencyControl.WriteEntry writeEntry) { + this.writeEntry = writeEntry; + this.seqNumAssignedLatch.countDown(); + } + // should be < 0 (@see HLogKey#readFields(DataInput)) // version 2 supports WAL compression // public members here are only public because of HLogKey @@ -148,7 +170,9 @@ public class WALKey implements SequenceId, Comparable { private long nonceGroup = HConstants.NO_NONCE; private long nonce = HConstants.NO_NONCE; - static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); + private MultiVersionConsistencyControl mvcc; + private MultiVersionConsistencyControl.WriteEntry writeEntry; + public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); // visible for deprecated HLogKey @InterfaceAudience.Private @@ -156,16 +180,17 @@ public class WALKey implements SequenceId, Comparable { public WALKey() { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE); + new ArrayList(), HConstants.NO_NONCE, HConstants.NO_NONCE, null); } @VisibleForTesting - public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + public WALKey(final byte[] encodedRegionName, final TableName tablename, + long logSeqNum, final long now, UUID clusterId) { List clusterIds = new ArrayList(); clusterIds.add(clusterId); init(encodedRegionName, tablename, logSeqNum, now, clusterIds, - HConstants.NO_NONCE, HConstants.NO_NONCE); + HConstants.NO_NONCE, HConstants.NO_NONCE, null); } public WALKey(final byte[] encodedRegionName, final TableName tablename) { @@ -174,7 +199,7 @@ public class WALKey implements SequenceId, Comparable { public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, - EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE); + EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, null); } /** @@ -191,8 +216,8 @@ public class WALKey implements SequenceId, Comparable { * @param clusterIds the clusters that have consumed the change(used in Replication) */ public WALKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); + long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc); } /** @@ -209,9 +234,10 @@ public class WALKey implements SequenceId, Comparable { * @param nonce */ public WALKey(final byte [] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, long nonce) { + final long now, List clusterIds, long nonceGroup, + final long nonce, final MultiVersionConsistencyControl mvcc) { init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce); + nonceGroup, nonce, mvcc); } /** @@ -227,14 +253,14 @@ public class WALKey implements SequenceId, Comparable { * @param nonce */ public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, - long nonceGroup, long nonce) { + long nonceGroup, long nonce, final MultiVersionConsistencyControl mvcc) { init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(), - EMPTY_UUIDS, nonceGroup, nonce); + EMPTY_UUIDS, nonceGroup, nonce, mvcc); } @InterfaceAudience.Private protected void init(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { + long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, MultiVersionConsistencyControl mvcc) { this.logSeqNum = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; @@ -242,6 +268,7 @@ public class WALKey implements SequenceId, Comparable { this.tablename = tablename; this.nonceGroup = nonceGroup; this.nonce = nonce; + this.mvcc = mvcc; } /** @@ -275,7 +302,7 @@ public class WALKey implements SequenceId, Comparable { @InterfaceAudience.Private public void setLogSeqNum(final long sequence) { this.logSeqNum = sequence; - this.seqNumAssignedLatch.countDown(); + } /** @@ -301,14 +328,6 @@ public class WALKey implements SequenceId, Comparable { */ @Override public long getSequenceId() throws IOException { - try { - this.seqNumAssignedLatch.await(); - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted waiting for next log sequence number"); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } return this.logSeqNum; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 2c68a68..fbfd5bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -2278,7 +2278,7 @@ public class WALSplitter { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), - clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce()); + clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null); logEntry.setFirst(key); logEntry.setSecond(val); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 7d644bd..720091b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -285,7 +285,7 @@ public class TestIOFencing { FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), new Path("store_dir")); WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(), - oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100)); + oldHri, compactionDescriptor); // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index cdcdeed..633e544 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -187,7 +187,6 @@ public class TestWALObserver { Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE)); deleteDir(basedir); fs.mkdirs(new Path(basedir, hri.getEncodedName())); - final AtomicLong sequenceId = new AtomicLong(0); // TEST_FAMILY[0] shall be removed from WALEdit. // TEST_FAMILY[1] value shall be changed. @@ -236,7 +235,7 @@ public class TestWALObserver { long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), - edit, sequenceId, true, null); + edit, true); log.sync(txid); // the edit shall have been change now by the coprocessor. @@ -272,7 +271,6 @@ public class TestWALObserver { final HTableDescriptor htd = createBasic3FamilyHTD(Bytes .toString(TEST_TABLE)); final HRegionInfo hri = new HRegionInfo(tableName, null, null); - final AtomicLong sequenceId = new AtomicLong(0); fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName())); @@ -299,7 +297,7 @@ public class TestWALObserver { final int countPerFamily = 5; for (HColumnDescriptor hcd : htd.getFamilies()) { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); + EnvironmentEdgeManager.getDelegate(), wal, htd); } LOG.debug("Verify that only the non-legacy CP saw edits."); @@ -323,7 +321,7 @@ public class TestWALObserver { final WALEdit edit = new WALEdit(); final byte[] nonce = Bytes.toBytes("1772"); edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce)); - final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null); + final long txid = wal.append(htd, hri, legacyKey, edit, true); wal.sync(txid); LOG.debug("Make sure legacy cps can see supported edits after having been skipped."); @@ -361,7 +359,7 @@ public class TestWALObserver { final long now = EnvironmentEdgeManager.currentTime(); long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), - new WALEdit(), sequenceId, true, null); + new WALEdit(), true); log.sync(txid); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); @@ -404,10 +402,9 @@ public class TestWALObserver { // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { for (HColumnDescriptor hcd : htd.getFamilies()) { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); + EnvironmentEdgeManager.getDelegate(), wal, htd); } - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, true); // sync to fs. wal.sync(); @@ -527,7 +524,7 @@ public class TestWALObserver { private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { + final HTableDescriptor htd) throws IOException { String familyStr = Bytes.toString(family); long txid = -1; for (int j = 0; j < count; j++) { @@ -538,7 +535,7 @@ public class TestWALObserver { // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - ee.currentTime()), edit, sequenceId, true, null); + ee.currentTime()), edit, true); } if (-1 != txid) { wal.sync(txid); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 013053e..e2180f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -126,11 +126,10 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); final AtomicLong sequenceId = new AtomicLong(0); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null); + log.append(htd, info, getWalKey(ts), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(htd, info, getWalKey(ts+1), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts+1), edit, true); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -141,12 +140,10 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(htd, info, getWalKey(ts1+1), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts1+1), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(htd, info, getWalKey(ts1+2), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts1+2), edit, true); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -188,8 +185,7 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, - null); + long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -199,8 +195,7 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, - null); + txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true); log.sync(txid); log.shutdown(); walfactory.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index e4a9f71..6b2ce94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -1268,7 +1268,7 @@ public class TestDistributedLogSplitting { e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()), - e, sequenceId, true, null); + e, true); } wal.sync(); wal.shutdown(); @@ -1360,7 +1360,7 @@ public class TestDistributedLogSplitting { value++; e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), - tableName, System.currentTimeMillis()), e, sequenceId, true, null); + tableName, System.currentTimeMillis()), e, true); } wal.sync(); wal.shutdown(); @@ -1565,7 +1565,7 @@ public class TestDistributedLogSplitting { byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); log.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName, - System.currentTimeMillis()), e, sequenceId, true, null); + System.currentTimeMillis()), e, true); if (0 == i % syncEvery) { log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index aa57e22..a56a186 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -96,7 +96,7 @@ public class TestBulkLoad { { oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)), with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)), - with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class))); + with(any(boolean.class))); will(returnValue(0l)); oneOf(log).sync(with(any(long.class))); } @@ -122,8 +122,7 @@ public class TestBulkLoad { Expectations expection = new Expectations() { { oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)), - with(any(WALKey.class)), with(bulkEventMatcher), - with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class))); + with(any(WALKey.class)), with(bulkEventMatcher), with(any(boolean.class))); will(returnValue(0l)); oneOf(log).sync(with(any(long.class))); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 18e3d22..bff0cb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -247,7 +247,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setSequenceId(w.getWriteNumber()); @@ -261,7 +261,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setSequenceId(w.getWriteNumber()); memstore.add(kv2); @@ -291,7 +291,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setSequenceId(w.getWriteNumber()); @@ -307,7 +307,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setSequenceId(w.getWriteNumber()); memstore.add(kv21); @@ -343,7 +343,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setSequenceId(w.getWriteNumber()); @@ -359,7 +359,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setSequenceId(w.getWriteNumber()); @@ -417,7 +417,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index d46c2e3..4849c40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -804,7 +804,7 @@ public class TestHRegion { .getRegionFileSystem().getStoreDir(Bytes.toString(family))); WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1)); + this.region.getRegionInfo(), compactionDescriptor); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); @@ -1030,9 +1030,8 @@ public class TestHRegion { IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); // throw exceptions if the WalEdit is a start flush action - when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), - (List)any())) + when(wal.append((HTableDescriptor) any(), (HRegionInfo) any(), (WALKey) any(), + (WALEdit) argThat(isFlushWALMarker), Mockito.anyBoolean())) .thenThrow(new IOException("Fail to append flush marker")); // start cache flush will throw exception @@ -4629,7 +4628,7 @@ public class TestHRegion { //verify append called or not verify(wal, expectAppend ? times(1) : never()) .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List)any()); + (WALEdit)any(), Mockito.anyBoolean()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -5773,7 +5772,7 @@ public class TestHRegion { TEST_UTIL.getConfiguration(), rss, null); verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5868,7 +5867,7 @@ public class TestHRegion { // verify that we have not appended region open event to WAL because this region is still // recovering verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), anyBoolean()); // not put the region out of recovering state new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo") @@ -5876,7 +5875,7 @@ public class TestHRegion { // now we should have put the entry verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5940,7 +5939,7 @@ public class TestHRegion { // 2 times, one for region open, the other close region verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getAllValues().get(1); assertNotNull(edit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 70ae657..db17b83 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -759,7 +759,7 @@ public class TestHRegionReplayEvents { // ensure all files are visible in secondary for (Store store : secondaryRegion.getStores()) { - assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get()); + assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId()); } LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); @@ -1085,7 +1085,7 @@ public class TestHRegionReplayEvents { assertGet(region, family, row); // region seqId should have advanced at least to this seqId - assertEquals(origSeqId, region.getSequenceId().get()); + assertEquals(origSeqId, region.getSequenceId()); // replay an entry that is smaller than current read point // caution: adding an entry below current read point might cause partial dirty reads. Normal @@ -1114,7 +1114,7 @@ public class TestHRegionReplayEvents { // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), anyBoolean()); // test for replay prepare flush putDataByReplay(secondaryRegion, 0, 10, cq, families); @@ -1128,11 +1128,11 @@ public class TestHRegionReplayEvents { .build()); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), anyBoolean()); secondaryRegion.close(); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), anyBoolean()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 2965071..a0830d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -311,11 +311,11 @@ public class TestHeapMemoryManager { } private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { - long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); + long expectedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); if (expectedDeltaPercent > 0) { - assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace)); + assertTrue(expectedMinDelta <= (newHeapSpace - oldHeapSpace)); } else { - assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace)); + assertTrue(expectedMinDelta <= (oldHeapSpace - newHeapSpace)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java index 9286e0d..d19d709 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -246,6 +247,14 @@ public class TestKeepDeletes { Put p = new Put(T1, ts); p.add(c0, c0, T1); region.put(p); + + Get gOne = new Get(T1); + gOne.setMaxVersions(); + gOne.setTimeRange(0L, ts + 1); + Result rOne = region.get(gOne); + assertFalse(rOne.isEmpty()); + + Delete d = new Delete(T1, ts+2); d.deleteColumn(c0, c0, ts); region.delete(d); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 09b2226..9fbf908 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -50,7 +50,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { MultiVersionConsistencyControl.WriteEntry e = - mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); + mvcc.beginMemstoreInsert(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index ed0ac25..ad085f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -156,7 +156,7 @@ public class TestStoreFileRefresherChore { } } - @Test (timeout = 60000) + @Test public void testIsStale() throws IOException { int period = 0; byte[][] families = new byte[][] {Bytes.toBytes("cf")}; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 7d76117..b29d1ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -155,15 +155,15 @@ public class TestFSHLog { } } - protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, - AtomicLong sequenceId) throws IOException { + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times) + throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), - cols, sequenceId, true, null); + cols, true); } log.sync(); } @@ -261,10 +261,10 @@ public class TestFSHLog { final AtomicLong sequenceId2 = new AtomicLong(1); // add edits and roll the wal try { - addEdits(wal, hri1, t1, 2, sequenceId1); + addEdits(wal, hri1, t1, 2); wal.rollWriter(); // add some more edits and roll the wal. This would reach the log number threshold - addEdits(wal, hri1, t1, 2, sequenceId1); + addEdits(wal, hri1, t1, 2); wal.rollWriter(); // with above rollWriter call, the max logs limit is reached. assertTrue(wal.getNumRolledLogFiles() == 2); @@ -275,7 +275,7 @@ public class TestFSHLog { assertEquals(1, regionsToFlush.length); assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // insert edits in second region - addEdits(wal, hri2, t2, 2, sequenceId2); + addEdits(wal, hri2, t2, 2); // get the regions to flush, it should still read region1. regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(regionsToFlush.length, 1); @@ -292,12 +292,12 @@ public class TestFSHLog { // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. - addEdits(wal, hri1, t1, 2, sequenceId1); - addEdits(wal, hri2, t2, 2, sequenceId2); + addEdits(wal, hri1, t1, 2); + addEdits(wal, hri2, t2, 2); wal.rollWriter(); // add edits and roll the writer, to reach the max logs limit. assertEquals(1, wal.getNumRolledLogFiles()); - addEdits(wal, hri1, t1, 2, sequenceId1); + addEdits(wal, hri1, t1, 2); wal.rollWriter(); // it should return two regions to flush, as the oldest wal file has entries // for both regions. @@ -309,7 +309,7 @@ public class TestFSHLog { wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. - addEdits(wal, hri1, t1, 2, sequenceId1); + addEdits(wal, hri1, t1, 2); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); @@ -449,18 +449,18 @@ public class TestFSHLog { for (int i = 0; i < countPerFamily; i++) { final HRegionInfo info = region.getRegionInfo(); final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), clusterIds, -1, -1); - wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null); + System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC()); + wal.append(htd, info, logkey, edits, true); } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. - long currentSequenceId = region.getSequenceId().get(); + long currentSequenceId = region.getSequenceId(); // Now release the appends goslow.setValue(false); synchronized (goslow) { goslow.notifyAll(); } - assertTrue(currentSequenceId >= region.getSequenceId().get()); + assertTrue(currentSequenceId >= region.getSequenceId()); } finally { region.close(true); wal.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 4e07040..ef944f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -201,7 +201,7 @@ public class TestLogRollAbort { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 41e05ae..bbf372e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -137,7 +137,7 @@ public class TestLogRollingNoCluster { final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO; final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor(); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, now), edit, sequenceId, true, null); + TableName.META_TABLE_NAME, now), edit, true); wal.sync(txid); } String msg = getName() + " finished"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 69482d1..0351430 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -88,7 +88,6 @@ public class TestWALActionsListener { list.add(observer); final WALFactory wals = new WALFactory(conf, list, "testActionListener"); DummyWALActionsListener laterobserver = new DummyWALActionsListener(); - final AtomicLong sequenceId = new AtomicLong(1); HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES), SOME_BYTES, SOME_BYTES, false); final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes()); @@ -102,7 +101,7 @@ public class TestWALActionsListener { htd.addFamily(new HColumnDescriptor(b)); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.valueOf(b), 0), edit, sequenceId, true, null); + TableName.valueOf(b), 0), edit, true); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 64e81fa..5fa7eb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -799,16 +799,14 @@ public class TestWALReplay { long now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, true); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, true); // Sync. wal.sync(); @@ -887,7 +885,7 @@ public class TestWALReplay { for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); } - long lastestSeqNumber = region.getSequenceId().get(); + long lastestSeqNumber = region.getSequenceId(); // get the current seq no wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first @@ -1000,7 +998,7 @@ public class TestWALReplay { edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()), - edit, sequenceId, true, null); + edit, true); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9315f62..6835689 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -211,7 +211,7 @@ public class TestReplicationSourceManager { } LOG.info(i); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true ,null); + System.currentTimeMillis()), edit, true); wal.sync(txid); } @@ -224,7 +224,7 @@ public class TestReplicationSourceManager { for (int i = 0; i < 3; i++) { wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, true); } wal.sync(); @@ -236,7 +236,7 @@ public class TestReplicationSourceManager { "1", 0, false, false); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, true); wal.sync(); assertEquals(1, manager.getWALs().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index 577f0ba..a70794a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -196,7 +196,7 @@ public class TestReplicationWALReaderManager { private void appendToLogPlus(int count) throws IOException { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null); + System.currentTimeMillis()), getWALEdits(count), true); log.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index 3212822..2954096 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -66,11 +66,11 @@ public class FaultyFSLog extends FSHLog { @Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean isInMemstore, List cells) throws IOException { + boolean inMemstore) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); } - return super.append(htd, info, key, edits, sequenceId, isInMemstore, cells); + return super.append(htd, info, key, edits, inMemstore); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index cf0488a..3f51884 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -148,14 +148,14 @@ public class TestDefaultWALProvider { protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, - int times, AtomicLong sequenceId) throws IOException { + int times) throws IOException { final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), - cols, sequenceId, true, null); + cols, true); } log.sync(); } @@ -202,26 +202,26 @@ public class TestDefaultWALProvider { // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, htd, 1, sequenceId); + addEdits(log, hri, htd, 1); log.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, htd, 2, sequenceId); + addEdits(log, hri, htd, 2); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, htd, 1, sequenceId); - addEdits(log, hri2, htd2, 1, sequenceId); - addEdits(log, hri, htd, 1, sequenceId); - addEdits(log, hri2, htd2, 1, sequenceId); + addEdits(log, hri, htd, 1); + addEdits(log, hri2, htd2, 1); + addEdits(log, hri, htd, 1); + addEdits(log, hri2, htd2, 1); log.rollWriter(); assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, htd2, 1, sequenceId); + addEdits(log, hri2, htd2, 1); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); @@ -230,7 +230,7 @@ public class TestDefaultWALProvider { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, htd2, 1, sequenceId); + addEdits(log, hri2, htd2, 1); log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); @@ -277,34 +277,32 @@ public class TestDefaultWALProvider { hri1.setSplit(false); hri2.setSplit(false); // variables to mock region sequenceIds. - final AtomicLong sequenceId1 = new AtomicLong(1); - final AtomicLong sequenceId2 = new AtomicLong(1); // start with the testing logic: insert a waledit, and roll writer - addEdits(wal, hri1, table1, 1, sequenceId1); + addEdits(wal, hri1, table1, 1); wal.rollWriter(); // assert that the wal is rolled assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits in the second wal file, and roll writer. - addEdits(wal, hri1, table1, 1, sequenceId1); + addEdits(wal, hri1, table1, 1); wal.rollWriter(); // assert that the wal is rolled assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. - addEdits(wal, hri1, table1, 3, sequenceId1); + addEdits(wal, hri1, table1, 3); flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); // roll log; all old logs should be archived. wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); // add an edit to table2, and roll writer - addEdits(wal, hri2, table2, 1, sequenceId2); + addEdits(wal, hri2, table2, 1); wal.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table1, and roll writer - addEdits(wal, hri1, table1, 2, sequenceId1); + addEdits(wal, hri1, table1, 2); wal.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. - addEdits(wal, hri2, table2, 2, sequenceId2); + addEdits(wal, hri2, table2, 2); flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); // the log : region-sequenceId map is // log1: region2 (unflushed) @@ -314,7 +312,7 @@ public class TestDefaultWALProvider { wal.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. - addEdits(wal, hri2, table2, 2, sequenceId2); + addEdits(wal, hri2, table2, 2); flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 6ae8ee1..2417d31 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -91,7 +91,6 @@ public class TestSecureWAL { final byte[] value = Bytes.toBytes("Test value"); FileSystem fs = TEST_UTIL.getTestFileSystem(); final WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestSecureWAL"); - final AtomicLong sequenceId = new AtomicLong(1); // Write the WAL final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes()); @@ -100,7 +99,7 @@ public class TestSecureWAL { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 968c5c7..f7c1fab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -183,7 +183,6 @@ public class TestWALFactory { htd.addFamily(new HColumnDescriptor("column")); // Add edits for three regions. - final AtomicLong sequenceId = new AtomicLong(1); for (int ii = 0; ii < howmany; ii++) { for (int i = 0; i < howmany; i++) { final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes()); @@ -196,7 +195,7 @@ public class TestWALFactory { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); log.append(htd, infos[i], new WALKey(infos[i].getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, true); } log.sync(); log.rollWriter(); @@ -238,7 +237,6 @@ public class TestWALFactory { out.close(); in.close(); - final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; WAL.Reader reader = null; @@ -253,7 +251,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -272,7 +270,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } wal.sync(); reader = wals.createReader(fs, walPath); @@ -294,7 +292,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -363,7 +361,6 @@ public class TestWALFactory { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes()); - final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; HTableDescriptor htd = new HTableDescriptor(tableName); @@ -373,7 +370,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -503,7 +500,7 @@ public class TestWALFactory { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), - cols, sequenceId, true, null); + cols, true); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(info.getEncodedNameAsBytes()); @@ -559,7 +556,7 @@ public class TestWALFactory { final WAL log = wals.getWAL(hri.getEncodedNameAsBytes()); final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), - cols, sequenceId, true, null); + cols, true); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); @@ -612,7 +609,7 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + System.currentTimeMillis()), cols, true); } log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -622,7 +619,7 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + System.currentTimeMillis()), cols, true); log.sync(); assertEquals(COL_COUNT, visitor.increments); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 52e28eb..d94e27e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -114,7 +114,7 @@ public class TestWALReaderOnSecureWAL { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 7d4a0f8..c8f0761 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -180,7 +180,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); final WALKey logkey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now); - wal.append(htd, hri, logkey, walEdit, region.getSequenceId(), true, null); + wal.append(htd, hri, logkey, walEdit, true); if (!this.noSync) { if (++lastSync >= this.syncInterval) { wal.sync(); -- 2.3.0