From 56fc8acb71e7d9c419d51d6efebf461ca61329af Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 7 Apr 2015 02:51:00 -0700 Subject: [PATCH] HBASE-12751 Allow RowLock to be reader writer Summary: Right now every write operation grabs a row lock. This is to prevent values from changing during a read modify write operation (increment or check and put). However it limits parallelism in several different scenarios. If there are several puts to the same row but different columns or stores then this is very limiting. If there are puts to the same column then mvcc number should ensure a consistent ordering. So locking is not needed. However locking for check and put or increment is still needed. Test Plan: - all tests in org.apache.hadoop.hbase.regionserver either pass or are failing even without this change - needs to be tested with loadtesttool and/or shadow data to see if there's actually any performance improvement Reviewers: nkedel Differential Revision: https://reviews.facebook.net/D32079 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 282 ++++++++++++--------- .../MultiVersionConsistencyControl.java | 7 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 22 +- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 10 +- .../hadoop/hbase/regionserver/wal/WALUtil.java | 11 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 2 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 3 +- .../hadoop/hbase/coprocessor/TestWALObserver.java | 10 +- .../hbase/mapreduce/TestWALRecordReader.java | 17 +- .../hbase/master/TestDistributedLogSplitting.java | 6 +- .../hbase/regionserver/TestAtomicOperation.java | 5 +- .../hadoop/hbase/regionserver/TestBulkLoad.java | 4 +- .../hbase/regionserver/TestDefaultMemStore.java | 14 +- .../hadoop/hbase/regionserver/TestHRegion.java | 15 +- .../regionserver/TestHRegionReplayEvents.java | 6 +- .../hbase/regionserver/TestHeapMemoryManager.java | 6 +- .../TestMultiVersionConsistencyControl.java | 2 +- .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 4 +- .../hbase/regionserver/wal/TestLogRollAbort.java | 2 +- .../regionserver/wal/TestLogRollingNoCluster.java | 2 +- .../regionserver/wal/TestWALActionsListener.java | 2 +- .../hbase/regionserver/wal/TestWALReplay.java | 6 +- .../regionserver/TestReplicationSourceManager.java | 6 +- .../TestReplicationWALReaderManager.java | 2 +- .../org/apache/hadoop/hbase/wal/FaultyFSLog.java | 4 +- .../hadoop/hbase/wal/TestDefaultWALProvider.java | 2 +- .../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 2 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 18 +- .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 2 +- .../hadoop/hbase/wal/WALPerformanceEvaluation.java | 2 +- 30 files changed, 255 insertions(+), 221 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b8e3e52..aff83a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2379,7 +2379,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { - WALKey key = this.appendEmptyEdit(wal, null); + WALKey key = this.appendEmptyEdit(wal); return key.getSequenceId(); } @@ -2835,7 +2835,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; - List memstoreCells = new ArrayList(); // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -2896,7 +2895,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLockInternal(mutation.getRow(), shouldBlock); + // request a read lock for a put or delete; this is safe if we're already + // within a write lock context (e.g. from checkAndMutate) as it will just + // reuse the existing context. + rowLock = getRowLockInternal(mutation.getRow(), shouldBlock, true); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); @@ -2965,14 +2967,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi locked = true; if(isInReplay) { mvccNum = batchOp.getReplaySequenceId(); - } else { - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); } // // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + w = mvcc.beginMemstoreInsertWithSeqNum(); // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2983,26 +2983,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // ------------------------------------ - // STEP 3. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without updating the WAL because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay); - } - - // ------------------------------------ - // STEP 4. Build WAL edit + // STEP 3. Build WAL edit // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -3011,7 +2992,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi != OperationStatusCode.NOT_RUN) { continue; } - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.getMutation(i); Durability tmpDur = getEffectiveDurability(m.getDurability()); @@ -3039,7 +3019,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, currentNonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, - walEdit, getSequenceId(), true, null); + walEdit, getSequenceId(), true); walEdit = new WALEdit(isInReplay); walKey = null; } @@ -3058,7 +3038,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // ------------------------- - // STEP 5. Append the final edit to WAL. Do not sync wal. + // STEP 4. Append the final edit to WAL. Do not sync wal. // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (isInReplay) { @@ -3085,11 +3065,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, - getSequenceId(), true, memstoreCells); + getSequenceId(), true); } if(walKey == null){ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); + walKey = this.appendEmptyEdit(this.wal); + } + + mvccNum = walKey.getSequenceId(); + + // ------------------------------------ + // STEP 5. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without syncing the WAL because we do not roll + // forward the memstore MVCC. The MVCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the MVCC. The MVCC is + // moved only when the sync is complete. + // ---------------------------------- + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); } // ------------------------------- @@ -3126,6 +3127,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi w = null; } + for (int i = firstIndex; i < lastIndexExclusive; i ++) { + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + // ------------------------------------ // STEP 9. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. @@ -3151,7 +3156,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for (int j=0; j cells:familyMaps[j].values()) { + rollbackMemstore(cells); + } + + } } if (w != null) { mvcc.completeMemstoreInsertWithSeqNum(w, walKey); @@ -3393,7 +3403,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -3566,7 +3576,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, List memstoreCells, boolean isInReplay) throws IOException { + long mvccNum, boolean isInReplay) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3577,10 +3587,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i=0; i < listSize; i++) { Cell cell = cells.get(i); - CellUtil.setSequenceId(cell, mvccNum); + if (cell.getSequenceId() == 0) { + CellUtil.setSequenceId(cell, mvccNum); + } Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); if(isInReplay) { // set memstore newly added cells with replay mvcc number CellUtil.setSequenceId(ret.getSecond(), mvccNum); @@ -4916,20 +4927,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * A version of getRowLock(byte[], boolean) to use when a region operation has already been * started (the calling thread has already acquired the region-close-guard lock). + * + * This assumes a default to a fully-exclusive (writer) lock, and is intended + * for read-modify-write operations and other cases where we can't be sure of + * safe concurrent writes. Safe concurrent write operations should use + * getRowLockInternal(row, waitForLock, /readLock/ true) for lower lock latency. */ protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { + return getRowLockInternal(row, waitForLock, false); + } + + /** + * A version of getRowLock(byte[], boolean) to use when a region operation + * has already been started (the calling thread has already acquired the + * region-close-guard lock). + * + * @param readLock false indicates this assumes should be a fully-exclusive + * (writer) lock, and is intendedfor read-modify-write operations + * and other cases where we can't be sure of safe concurrent writes. + * Use true for simple puts and other safe concurrent operations + * for lower lock latency. + */ + protected RowLock getRowLockInternal(byte[] row, boolean waitForLock, boolean readLock) + throws IOException { checkRow(row, "row lock"); HashedBytes rowKey = new HashedBytes(row); - RowLockContext rowLockContext = new RowLockContext(rowKey); - + RowLockContext rowLockContext = new RowLockContext(rowKey, readLock); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; - } else if (existingContext.ownedByCurrentThread()) { - // Row is already locked by current thread, reuse existing context instead. + } else if (readLock && existingContext.isReadLock()) { + // Row is already locked by a read lock, and we're asking for a read lock; + // reuse the current context. + rowLockContext = existingContext; + break; + } else if (existingContext.ownedByCurrentThread() && !existingContext.isReadLock()) { + // Row is already locked for write by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { @@ -6437,7 +6473,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks; long addedSize = 0; List mutations = new ArrayList(); - List memstoreCells = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); long mvccNum = 0; WALKey walKey = null; @@ -6446,14 +6481,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; - // Get a mvcc write number - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - long now = EnvironmentEdgeManager.currentTime(); try { // 4. Let the processor scan the rows, generate mutations and add @@ -6463,10 +6496,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!mutations.isEmpty()) { // 5. Start mvcc transaction - writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(); // 6. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); - // 7. Apply to memstore + + long txid = 0; + // 7. Append no sync + if (!walEdit.isEmpty()) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + processor.getClusterIds(), nonceGroup, nonce); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdit, getSequenceId(), false); + } + if(walKey == null){ + // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit + // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId + walKey = this.appendEmptyEdit(this.wal); + } + mvccNum = walKey.getSequenceId(); + + + // 6. Apply to memstore for (Mutation m : mutations) { // Handle any tag based cell features rewriteCellTags(m.getFamilyCellMap(), m); @@ -6481,25 +6533,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Pair ret = store.add(cell); addedSize += ret.getFirst(); - memstoreCells.add(ret.getSecond()); } } - long txid = 0; - // 8. Append no sync - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - processor.getClusterIds(), nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdit, getSequenceId(), true, memstoreCells); - } - if(walKey == null){ - // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit - // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } // 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -6632,7 +6668,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry w = null; WALKey walKey = null; RowLock rowLock = null; - List memstoreCells = new ArrayList(); boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); @@ -6649,8 +6684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + w = mvcc.beginMemstoreInsertWithSeqNum(); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { @@ -6764,7 +6798,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - CellUtil.setSequenceId(newCell, mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, @@ -6785,42 +6818,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi tempMemstore.put(store, kvs); } + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, + this.sequenceId, true); + } else { + recordMutationWithoutWal(append.getFamilyCellMap()); + } + if (walKey == null) { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendEmptyEdit(this.wal); + } + mvccNum = walKey.getSequenceId(); + //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } allKVs.addAll(entry.getValue()); } - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, - this.sequenceId, true, memstoreCells); - } else { - recordMutationWithoutWal(append.getFamilyCellMap()); - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -6841,7 +6874,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for(Listcells:tempMemstore.values()) { + rollbackMemstore(cells); + } } if (w != null) { mvcc.completeMemstoreInsertWithSeqNum(w, walKey); @@ -6895,7 +6930,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WriteEntry w = null; WALKey walKey = null; long mvccNum = 0; - List memstoreCells = new ArrayList(); boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); @@ -6912,8 +6946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + w = mvcc.beginMemstoreInsertWithSeqNum(); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry> family: @@ -6995,8 +7028,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi val, 0, val.length, newTags); - CellUtil.setSequenceId(newKV, mvccNum); - // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = coprocessorHost.postMutationBeforeWAL( @@ -7023,6 +7054,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + mvccNum = walKey.getSequenceId(); + + // Actually write to WAL now + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, getSequenceId(),true); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal); + } + + //Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -7030,13 +7084,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { Pair ret = store.add(cell); size += ret.getFirst(); - memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } @@ -7044,26 +7096,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, getSequenceId(), true, memstoreCells); - } else { - recordMutationWithoutWal(increment.getFamilyCellMap()); - } - } - if(walKey == null){ - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal, memstoreCells); - } } finally { this.updatesLock.readLock().unlock(); } @@ -7082,7 +7114,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for(List cells:tempMemstore.values()) { + rollbackMemstore(cells); + } } if (w != null) { mvcc.completeMemstoreInsertWithSeqNum(w, walKey); @@ -7692,14 +7726,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); + private final boolean readLock; private final Thread thread; private int lockCount = 0; + // defaults readLock to false (fully exclusive) RowLockContext(HashedBytes row) { this.row = row; + this.readLock = false; this.thread = Thread.currentThread(); } + RowLockContext(HashedBytes row, boolean readLock) { + this.row = row; + this.readLock = readLock; + this.thread = Thread.currentThread(); + } + + boolean isReadLock() { + return readLock; + } + boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } @@ -7719,7 +7766,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } void releaseLock() { - if (!ownedByCurrentThread()) { + // todo: is this safe, or do we need to keep a set of owning threads on read locks? + if (!ownedByCurrentThread() && !readLock) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } @@ -7768,14 +7816,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Return the key used appending with no sync and no append. * @throws IOException */ - private WALKey appendEmptyEdit(final WAL wal, List cells) throws IOException { + private WALKey appendEmptyEdit(final WAL wal) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated // with any edit and we can be sure it went in after all outstanding appends. wal.append(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); + WALEdit.EMPTY_WALEDIT, this.sequenceId, false); return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 96af2c3..d5f7f1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -65,7 +65,7 @@ public class MultiVersionConsistencyControl { * @return WriteEntry instance. */ WriteEntry beginMemstoreInsert() { - return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); + return beginMemstoreInsertWithSeqNum(); } /** @@ -96,11 +96,10 @@ public class MultiVersionConsistencyControl { * big number is safe because we only need it to prevent current change being seen and the number * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order * for MVCC to align with flush sequence. - * @param curSeqNum * @return WriteEntry a WriteEntry instance with the passed in curSeqNum */ - public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { - WriteEntry e = new WriteEntry(curSeqNum); + public WriteEntry beginMemstoreInsertWithSeqNum() { + WriteEntry e = new WriteEntry(0); synchronized (writeQueue) { writeQueue.add(e); return e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 443134d..5b5dfc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1171,8 +1171,7 @@ public class FSHLog implements WAL { justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, - final List memstoreCells) throws IOException { + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. @@ -1188,7 +1187,7 @@ public class FSHLog implements WAL { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); + entry = new FSWALEntry(sequence, key, edits, sequenceId, htd, hri, inMemstore); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); @@ -1214,9 +1213,9 @@ public class FSHLog implements WAL { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** - * UPDATE! + * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, * we will put the result of the actual hdfs sync call as the result. * @param sequence The sequence number on the ring buffer when this thread was set running. @@ -1262,7 +1261,7 @@ public class FSHLog implements WAL { // This function releases one sync future only. return 1; } - + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence @@ -1781,7 +1780,7 @@ public class FSHLog implements WAL { * 'safe point' while the orchestrating thread does some work that requires the first thread * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another * thread. - * + * *

Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1789,7 +1788,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *

To start up the drama, Thread A creates an instance of this class each time it would do * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it @@ -1811,7 +1810,7 @@ public class FSHLog implements WAL { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1820,7 +1819,7 @@ public class FSHLog implements WAL { * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with * an exception, then something is up w/ our syncing. * @return The passed syncFuture - * @throws FailedSyncBeforeLogCloseException + * @throws FailedSyncBeforeLogCloseException */ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, FailedSyncBeforeLogCloseException { @@ -1832,7 +1831,7 @@ public class FSHLog implements WAL { } return syncFuture; } - + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -2035,7 +2034,6 @@ public class FSHLog implements WAL { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 1ea9d4f..90f7b8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -55,19 +55,17 @@ class FSWALEntry extends Entry { private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; - private final transient List memstoreCells; private final Set familyNames; FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, - final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, - final HTableDescriptor htd, final HRegionInfo hri, List memstoreCells) { + final AtomicLong referenceToRegionSequenceId, + final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) { super(key, edit); this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; this.hri = hri; this.sequence = sequence; - this.memstoreCells = memstoreCells; if (inMemstore) { // construct familyNames here to reduce the work of log sinker. ArrayList cells = this.getEdit().getCells(); @@ -122,8 +120,8 @@ class FSWALEntry extends Entry { */ long stampRegionSequenceId() throws IOException { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) { - for (Cell cell : this.memstoreCells) { + if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(getEdit().getCells()) && inMemstore) { + for (Cell cell : this.getEdit().getCells()) { CellUtil.setSequenceId(cell, regionSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 94ef072..6403486 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -60,7 +60,7 @@ public class WALUtil { TableName tn = TableName.valueOf(c.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null); + log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false); log.sync(); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); @@ -75,8 +75,7 @@ public class WALUtil { TableName tn = TableName.valueOf(f.getTableName().toByteArray()); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, - null); + long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false); if (sync) log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); @@ -93,7 +92,7 @@ public class WALUtil { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), - sequenceId, false, null); + sequenceId, false); log.sync(trx); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); @@ -126,9 +125,7 @@ public class WALUtil { info, key, WALEdit.createBulkLoadEvent(info, descriptor), - sequenceId, - false, - new ArrayList()); + sequenceId, false); wal.sync(trx); if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5bffea5..28d5ebf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -156,7 +156,7 @@ class DisabledWALProvider implements WALProvider { @Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) { + AtomicLong sequenceId, boolean inMemstore) { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..f575c49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -118,12 +118,11 @@ public interface WAL { * @param inMemstore Always true except for case where we are writing a compaction completion * record into the WAL; in this case the entry is just so we can finish an unfinished compaction * -- it is not an edit for memstore. - * @param memstoreKVs list of KVs added into memstore * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. */ long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) + AtomicLong sequenceId, boolean inMemstore) throws IOException; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index cdcdeed..1b7adec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -236,7 +236,7 @@ public class TestWALObserver { long now = EnvironmentEdgeManager.currentTime(); // we use HLogKey here instead of WALKey directly to support legacy coprocessors. long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), - edit, sequenceId, true, null); + edit, sequenceId, true); log.sync(txid); // the edit shall have been change now by the coprocessor. @@ -323,7 +323,7 @@ public class TestWALObserver { final WALEdit edit = new WALEdit(); final byte[] nonce = Bytes.toBytes("1772"); edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce)); - final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null); + final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true); wal.sync(txid); LOG.debug("Make sure legacy cps can see supported edits after having been skipped."); @@ -361,7 +361,7 @@ public class TestWALObserver { final long now = EnvironmentEdgeManager.currentTime(); long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), - new WALEdit(), sequenceId, true, null); + new WALEdit(), sequenceId, true); log.sync(txid); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); @@ -407,7 +407,7 @@ public class TestWALObserver { EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); } wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + true); // sync to fs. wal.sync(); @@ -538,7 +538,7 @@ public class TestWALObserver { // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - ee.currentTime()), edit, sequenceId, true, null); + ee.currentTime()), edit, sequenceId, true); } if (-1 != txid) { wal.sync(txid); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index d9fe0d0..4e19bd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -126,11 +126,10 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); final AtomicLong sequenceId = new AtomicLong(0); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null); + log.append(htd, info, getWalKey(ts), edit, sequenceId, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(htd, info, getWalKey(ts+1), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts+1), edit, sequenceId, true); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -141,12 +140,10 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(htd, info, getWalKey(ts1+1), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts1+1), edit, sequenceId, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(htd, info, getWalKey(ts1+2), edit, sequenceId, - true, null); + log.append(htd, info, getWalKey(ts1+2), edit, sequenceId, true); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -188,8 +185,7 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, - null); + long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -199,8 +195,7 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, - null); + txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true); log.sync(txid); log.shutdown(); walfactory.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 8a16c0d..72358fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -1267,7 +1267,7 @@ public class TestDistributedLogSplitting { e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()), - e, sequenceId, true, null); + e, sequenceId, true); } wal.sync(); wal.shutdown(); @@ -1359,7 +1359,7 @@ public class TestDistributedLogSplitting { value++; e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), - tableName, System.currentTimeMillis()), e, sequenceId, true, null); + tableName, System.currentTimeMillis()), e, sequenceId, true); } wal.sync(); wal.shutdown(); @@ -1564,7 +1564,7 @@ public class TestDistributedLogSplitting { byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); log.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName, - System.currentTimeMillis()), e, sequenceId, true, null); + System.currentTimeMillis()), e, sequenceId, true); if (0 == i % syncEvery) { log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 478e239..8916fad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -615,11 +615,12 @@ public class TestAtomicOperation { } @Override - public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException { + public RowLock getRowLockInternal(final byte[] row, boolean waitForLock, boolean readLock) + throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } - return new WrappedRowLock(super.getRowLockInternal(row, waitForLock)); + return new WrappedRowLock(super.getRowLockInternal(row, waitForLock, readLock)); } public class WrappedRowLock extends RowLockImpl { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index aa57e22..e170447 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -96,7 +96,7 @@ public class TestBulkLoad { { oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)), with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)), - with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class))); + with(any(AtomicLong.class)), with(any(boolean.class))); will(returnValue(0l)); oneOf(log).sync(with(any(long.class))); } @@ -123,7 +123,7 @@ public class TestBulkLoad { { oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)), with(any(WALKey.class)), with(bulkEventMatcher), - with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class))); + with(any(AtomicLong.class)), with(any(boolean.class))); will(returnValue(0l)); oneOf(log).sync(with(any(long.class))); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index e1e5b89..2cc4e28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -247,7 +247,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setSequenceId(w.getWriteNumber()); @@ -261,7 +261,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setSequenceId(w.getWriteNumber()); memstore.add(kv2); @@ -291,7 +291,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setSequenceId(w.getWriteNumber()); @@ -307,7 +307,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setSequenceId(w.getWriteNumber()); memstore.add(kv21); @@ -343,7 +343,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setSequenceId(w.getWriteNumber()); @@ -359,7 +359,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + w = mvcc.beginMemstoreInsertWithSeqNum(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setSequenceId(w.getWriteNumber()); @@ -417,7 +417,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); + mvcc.beginMemstoreInsertWithSeqNum(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 6a5e844..ef51dff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1031,9 +1031,8 @@ public class TestHRegion { IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); // throw exceptions if the WalEdit is a start flush action - when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), - (List)any())) + when(wal.append((HTableDescriptor) any(), (HRegionInfo) any(), (WALKey) any(), + (WALEdit) argThat(isFlushWALMarker), (AtomicLong) any(), Mockito.anyBoolean())) .thenThrow(new IOException("Fail to append flush marker")); // start cache flush will throw exception @@ -4629,7 +4628,7 @@ public class TestHRegion { //verify append called or not verify(wal, expectAppend ? times(1) : never()) .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List)any()); + (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -5773,7 +5772,7 @@ public class TestHRegion { TEST_UTIL.getConfiguration(), rss, null); verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), (AtomicLong)any(), anyBoolean()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5868,7 +5867,7 @@ public class TestHRegion { // verify that we have not appended region open event to WAL because this region is still // recovering verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), (AtomicLong)any(), anyBoolean()); // not put the region out of recovering state new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo") @@ -5876,7 +5875,7 @@ public class TestHRegion { // now we should have put the entry verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + , editCaptor.capture(), (AtomicLong)any(), anyBoolean()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5940,7 +5939,7 @@ public class TestHRegion { // 2 times, one for region open, the other close region verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + editCaptor.capture(), (AtomicLong)any(), anyBoolean()); WALEdit edit = editCaptor.getAllValues().get(1); assertNotNull(edit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 1ced627..f41f49a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1115,7 +1115,7 @@ public class TestHRegionReplayEvents { // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean()); // test for replay prepare flush putDataByReplay(secondaryRegion, 0, 10, cq, families); @@ -1129,11 +1129,11 @@ public class TestHRegionReplayEvents { .build()); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean()); secondaryRegion.close(); verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List) any()); + (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 2965071..a0830d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -311,11 +311,11 @@ public class TestHeapMemoryManager { } private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { - long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); + long expectedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); if (expectedDeltaPercent > 0) { - assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace)); + assertTrue(expectedMinDelta <= (newHeapSpace - oldHeapSpace)); } else { - assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace)); + assertTrue(expectedMinDelta <= (oldHeapSpace - newHeapSpace)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 09b2226..e6a77f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -50,7 +50,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { AtomicLong startPoint = new AtomicLong(); while (!finished.get()) { MultiVersionConsistencyControl.WriteEntry e = - mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet()); + mvcc.beginMemstoreInsertWithSeqNum(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 77071ce..b492cdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -163,7 +163,7 @@ public class TestFSHLog { WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), - cols, sequenceId, true, null); + cols, sequenceId, true); } log.sync(); } @@ -450,7 +450,7 @@ public class TestFSHLog { final HRegionInfo info = region.getRegionInfo(); final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1); - wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null); + wal.append(htd, info, logkey, edits, region.getSequenceId(), true); } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 4e07040..c0c3b30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -201,7 +201,7 @@ public class TestLogRollAbort { HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 41e05ae..6a7be45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -137,7 +137,7 @@ public class TestLogRollingNoCluster { final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO; final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor(); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, now), edit, sequenceId, true, null); + TableName.META_TABLE_NAME, now), edit, sequenceId, true); wal.sync(txid); } String msg = getName() + " finished"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 25c83a8..db869cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -102,7 +102,7 @@ public class TestWALActionsListener { htd.addFamily(new HColumnDescriptor(b)); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), - TableName.valueOf(b), 0), edit, sequenceId, true, null); + TableName.valueOf(b), 0), edit, sequenceId, true); wal.sync(txid); if (i == 10) { wal.registerWALActionsListener(laterobserver); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index afdcdc7..ea1c138 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -801,7 +801,7 @@ public class TestWALReplay { edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + true); // Delete the c family to verify deletes make it over. edit = new WALEdit(); @@ -809,7 +809,7 @@ public class TestWALReplay { edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, - true, null); + true); // Sync. wal.sync(); @@ -1001,7 +1001,7 @@ public class TestWALReplay { edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, ee.currentTime()), - edit, sequenceId, true, null); + edit, sequenceId, true); } wal.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 854d4c0..d8b8a9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -211,7 +211,7 @@ public class TestReplicationSourceManager { } LOG.info(i); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true ,null); + System.currentTimeMillis()), edit, sequenceId, true); wal.sync(txid); } @@ -224,7 +224,7 @@ public class TestReplicationSourceManager { for (int i = 0; i < 3; i++) { wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, sequenceId, true); } wal.sync(); @@ -236,7 +236,7 @@ public class TestReplicationSourceManager { "1", 0, false, false); wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, sequenceId, true); wal.sync(); assertEquals(1, manager.getWALs().size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index 577f0ba..51c71fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -196,7 +196,7 @@ public class TestReplicationWALReaderManager { private void appendToLogPlus(int count) throws IOException { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null); + System.currentTimeMillis()), getWALEdits(count), sequenceId, true); log.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index 3212822..9b63d49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -66,11 +66,11 @@ public class FaultyFSLog extends FSHLog { @Override public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, - AtomicLong sequenceId, boolean isInMemstore, List cells) throws IOException { + AtomicLong sequenceId, boolean inMemstore) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); } - return super.append(htd, info, key, edits, sequenceId, isInMemstore, cells); + return super.append(htd, info, key, edits, sequenceId, inMemstore); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index df8ceaf..7060b1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -155,7 +155,7 @@ public class TestDefaultWALProvider { WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), - cols, sequenceId, true, null); + cols, sequenceId, true); } log.sync(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 6f05839..0192ab1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -100,7 +100,7 @@ public class TestSecureWAL { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index bbe4018..1ae0932 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -196,7 +196,7 @@ public class TestWALFactory { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); log.append(htd, infos[i], new WALKey(infos[i].getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), edit, sequenceId, true, null); + System.currentTimeMillis()), edit, sequenceId, true); } log.sync(); log.rollWriter(); @@ -253,7 +253,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -272,7 +272,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } wal.sync(); reader = wals.createReader(fs, walPath); @@ -294,7 +294,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -373,7 +373,7 @@ public class TestWALFactory { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -503,7 +503,7 @@ public class TestWALFactory { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), - cols, sequenceId, true, null); + cols, sequenceId, true); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(info.getEncodedNameAsBytes()); @@ -559,7 +559,7 @@ public class TestWALFactory { final WAL log = wals.getWAL(hri.getEncodedNameAsBytes()); final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), - cols, sequenceId, true, null); + cols, sequenceId, true); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); @@ -612,7 +612,7 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + System.currentTimeMillis()), cols, sequenceId, true); } log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -622,7 +622,7 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + System.currentTimeMillis()), cols, sequenceId, true); log.sync(); assertEquals(COL_COUNT, visitor.increments); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index deaef50..e26fab7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -114,7 +114,7 @@ public class TestWALReaderOnSecureWAL { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), kvs, sequenceId, true, null); + System.currentTimeMillis()), kvs, sequenceId, true); } wal.sync(); final Path walPath = DefaultWALProvider.getCurrentFileName(wal); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 05d5e51..f72f9fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -180,7 +180,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); final WALKey logkey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now); - wal.append(htd, hri, logkey, walEdit, region.getSequenceId(), true, null); + wal.append(htd, hri, logkey, walEdit, region.getSequenceId(), true); if (!this.noSync) { if (++lastSync >= this.syncInterval) { wal.sync(); -- 2.3.0