diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java index 9d05cbb..2fad7c1 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java @@ -138,7 +138,7 @@ public class TestPayloadCarryingRpcController { } @Override - public long getMvccVersion() { + public long getSeqNum() { // TODO Auto-generated method stub return 0; } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java index 662eef3..e3dd76b 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java @@ -137,16 +137,15 @@ public interface Cell { byte getTypeByte(); - //6) MvccVersion + //6) Sequence Number /** - * Internal use only. A region-specific sequence ID given to each operation. It always exists for - * cells in the memstore but is not retained forever. It may survive several flushes, but - * generally becomes irrelevant after the cell's row is no longer involved in any operations that - * require strict consistency. - * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists + * Internal use only. A region-specific sequence number given to each operation. This sequence number + * is guaranteed to be always increasing for a given row. There is no ordering guarantee between + * sequence numbers for different rows. + * @return seqNum (always >= 0 if exists), or 0 if it no longer exists */ - long getMvccVersion(); + long getSeqNum(); //7) Value @@ -167,7 +166,7 @@ public interface Cell { * @return Number of value bytes. Must be < valueArray.length - offset. */ int getValueLength(); - + /** * @return the tags byte array */ @@ -177,7 +176,7 @@ public interface Cell { * @return the first offset where the tags start in the Cell */ int getTagsOffset(); - + /** * @return the total length of the tags in the Cell. */ diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index 0c7adb1..3742dbe 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -90,7 +90,7 @@ public class CellComparator implements Comparator, Serializable{ if (c != 0) return c; //mvccVersion: later sorts first - return -Longs.compare(a.getMvccVersion(), b.getMvccVersion()); + return -Longs.compare(a.getSeqNum(), b.getSeqNum()); } @@ -154,7 +154,7 @@ public class CellComparator implements Comparator, Serializable{ hash = 31 * hash + qualifierHash; hash = 31 * hash + (int)cell.getTimestamp(); hash = 31 * hash + cell.getTypeByte(); - hash = 31 * hash + (int)cell.getMvccVersion(); + hash = 31 * hash + (int)cell.getSeqNum(); return hash; } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..b28c939 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -261,16 +258,29 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode(); } - /** Here be dragons **/ - - // used to achieve atomic operations in the memstore. - @Override + /** + * Use {@link #getSeqNum()} + */ + @Deprecated public long getMvccVersion() { - return memstoreTS; + return seqNum; } + /** + * Use {@link #setSeqNum(long)} + */ + @Deprecated public void setMvccVersion(long mvccVersion){ - this.memstoreTS = mvccVersion; + this.seqNum = mvccVersion; + } + + @Override + public long getSeqNum() { + return seqNum; + } + + public void setSeqNum(long seqNum) { + this.seqNum = seqNum; } @Deprecated @@ -284,7 +294,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { } // default value is 0, aka DNC - private long memstoreTS = 0; + private long seqNum = 0; /** Dragon time over, return to normal business */ @@ -887,7 +897,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // Important to clone the memstoreTS as well - otherwise memstore's // update-in-place methods (eg increment) will end up creating // new entries - ret.setMemstoreTS(memstoreTS); + ret.setMemstoreTS(seqNum); return ret; } @@ -898,7 +908,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { */ public KeyValue shallowCopy() { KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length); - shallowCopy.setMemstoreTS(this.memstoreTS); + shallowCopy.setMemstoreTS(this.seqNum); return shallowCopy; } @@ -908,12 +918,13 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // //--------------------------------------------------------------------------- + @Override public String toString() { if (this.bytes == null || this.bytes.length == 0) { return "empty"; } return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + - "/vlen=" + getValueLength() + "/mvcc=" + memstoreTS; + "/vlen=" + getValueLength() + "/mvcc=" + seqNum; } /** @@ -1780,6 +1791,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { public static class RootComparator extends MetaComparator { private final KeyComparator rawcomparator = new RootKeyComparator(); + @Override public KeyComparator getRawComparator() { return this.rawcomparator; } @@ -1797,6 +1809,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { public static class MetaComparator extends KVComparator { private final KeyComparator rawcomparator = new MetaKeyComparator(); + @Override public KeyComparator getRawComparator() { return this.rawcomparator; } @@ -1824,6 +1837,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return this.rawcomparator; } + @Override public int compare(final KeyValue left, final KeyValue right) { int ret = getRawComparator().compare(left.getBuffer(), left.getOffset() + ROW_OFFSET, left.getKeyLength(), @@ -2389,6 +2403,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { * table. */ public static class RootKeyComparator extends MetaKeyComparator { + @Override public int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { // Rows look like this: .META.,ROW_FROM_META,RID @@ -2438,6 +2453,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { this.comparator = c; } + @Override public int compare(KeyValue left, KeyValue right) { return comparator.compareRows(left, right); } @@ -2448,6 +2464,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { * table. */ public static class MetaKeyComparator extends KeyComparator { + @Override public int compareRows(byte [] left, int loffset, int llength, byte [] right, int roffset, int rlength) { // LOG.info("META " + Bytes.toString(left, loffset, llength) + @@ -2527,6 +2544,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { volatile boolean ignoreTimestamp = false; volatile boolean ignoreType = false; + @Override public int compare(byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength) { // Compare row @@ -2686,6 +2704,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return 0; } + @Override public int compare(byte[] left, byte[] right) { return compare(left, 0, left.length, right, 0, right.length); } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 10db743..90906d4 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -67,7 +67,7 @@ public class KeyValueUtil { public static KeyValue copyToNewKeyValue(final Cell cell) { KeyValue kvCell = new KeyValue(copyToNewByteArray(cell)); - kvCell.setMvccVersion(cell.getMvccVersion()); + kvCell.setSeqNum(cell.getSeqNum()); return kvCell; } diff --git hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeArrayScanner.java hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeArrayScanner.java index 6cb670f..2c66c0f 100644 --- hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeArrayScanner.java +++ hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeArrayScanner.java @@ -108,7 +108,7 @@ public class PrefixTreeArrayScanner extends PrefixTreeCell implements CellScanne this.qualifierReader.initOnBlock(blockMeta, block); this.timestampDecoder.initOnBlock(blockMeta, block); this.mvccVersionDecoder.initOnBlock(blockMeta, block); - this.includeMvccVersion = includeMvccVersion; + this.includeSeqNum = includeMvccVersion; resetToBeforeFirstEntry(); } @@ -456,11 +456,11 @@ public class PrefixTreeArrayScanner extends PrefixTreeCell implements CellScanne protected void populateMvccVersion() { if (blockMeta.isAllSameMvccVersion()) { - mvccVersion = blockMeta.getMinMvccVersion(); + seqNum = blockMeta.getMinMvccVersion(); } else { int mvccVersionIndex = currentRowNode.getMvccVersionIndex(currentCellIndex, blockMeta); - mvccVersion = mvccVersionDecoder.getMvccVersion(mvccVersionIndex); + seqNum = mvccVersionDecoder.getMvccVersion(mvccVersionIndex); } } diff --git hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java index 2ac472c..f95a966 100644 --- hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java +++ hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java @@ -47,8 +47,8 @@ public class PrefixTreeCell implements Cell, Comparable { /******************** fields ************************/ protected byte[] block; - //we could also avoid setting the mvccVersion in the scanner/searcher, but this is simpler - protected boolean includeMvccVersion; + //we could also avoid setting the seqNum in the scanner/searcher, but this is simpler + protected boolean includeSeqNum; protected byte[] rowBuffer; protected int rowLength; @@ -62,7 +62,7 @@ public class PrefixTreeCell implements Cell, Comparable { protected int qualifierLength; protected Long timestamp; - protected Long mvccVersion; + protected Long seqNum; protected KeyValue.Type type; @@ -111,11 +111,11 @@ public class PrefixTreeCell implements Cell, Comparable { } @Override - public long getMvccVersion() { - if (!includeMvccVersion) { + public long getSeqNum() { + if (!includeSeqNum) { return 0L; } - return mvccVersion; + return seqNum; } @Override diff --git hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/PrefixTreeEncoder.java hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/PrefixTreeEncoder.java index 7817c38..24bf060 100644 --- hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/PrefixTreeEncoder.java +++ hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/PrefixTreeEncoder.java @@ -237,7 +237,7 @@ public class PrefixTreeEncoder implements CellOutputStream { */ /** - * Add a Cell to the output stream but repeat the previous row. + * Add a Cell to the output stream but repeat the previous row. */ //@Override public void writeWithRepeatRow(Cell cell) { @@ -269,11 +269,11 @@ public class PrefixTreeEncoder implements CellOutputStream { timestamps[totalCells] = cell.getTimestamp(); timestampEncoder.add(cell.getTimestamp()); - // memstore timestamps + // sequence numbers if (includeMvccVersion) { - mvccVersions[totalCells] = cell.getMvccVersion(); - mvccVersionEncoder.add(cell.getMvccVersion()); - totalUnencodedBytes += WritableUtils.getVIntSize(cell.getMvccVersion()); + mvccVersions[totalCells] = cell.getSeqNum(); + mvccVersionEncoder.add(cell.getSeqNum()); + totalUnencodedBytes += WritableUtils.getVIntSize(cell.getSeqNum()); }else{ //must overwrite in case there was a previous version in this array slot mvccVersions[totalCells] = 0L; diff --git hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/TestRowEncoder.java hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/TestRowEncoder.java index 2bbba8b..46f3572 100644 --- hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/TestRowEncoder.java +++ hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/TestRowEncoder.java @@ -175,7 +175,7 @@ public class TestRowEncoder { // assert keys are equal (doesn't compare values) Assert.assertEquals(expected, actual); if (includeMemstoreTS) { - Assert.assertEquals(expected.getMvccVersion(), actual.getMvccVersion()); + Assert.assertEquals(expected.getSeqNum(), actual.getSeqNum()); } // assert values equal Assert.assertTrue(Bytes.equals(expected.getValueArray(), expected.getValueOffset(), diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7f21e3a..ed0fde7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -109,15 +109,15 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.TransactionManager.TrxStatus; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; @@ -651,6 +651,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final HColumnDescriptor family : htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + family); completionService.submit(new Callable() { + @Override public HStore call() throws IOException { return instantiateHStore(family); } @@ -684,10 +685,12 @@ public class HRegion implements HeapSize { // , Writable{ storeOpenerThreadPool.shutdownNow(); } } - mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + mvcc.initialize(maxSeqId); + // TODO: maxMemstoreTS is not needed anymore, remove + // TODO: old maxMemstoreTS's might be mixed with new ones. We should migrate away. return maxSeqId; } @@ -810,7 +813,7 @@ public class HRegion implements HeapSize { // , Writable{ public void setRecovering(boolean newState) { this.getRegionInfo().setRecovering(newState); } - + /** * @return True if current region is in recovering */ @@ -880,7 +883,7 @@ public class HRegion implements HeapSize { // , Writable{ private final Object closeLock = new Object(); /** Conf key for the periodic flush interval */ - public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval"; /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; @@ -976,6 +979,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final Store store : stores.values()) { completionService .submit(new Callable>>() { + @Override public Pair> call() throws IOException { return new Pair>( store.getFamily().getName(), store.close()); @@ -1013,6 +1017,7 @@ public class HRegion implements HeapSize { // , Writable{ if ( this.metricsRegionWrapper != null) { Closeables.closeQuietly(this.metricsRegionWrapper); } + this.mvcc.reset(); status.markComplete("Closed"); LOG.info("Closed " + this); return result; @@ -1066,6 +1071,7 @@ public class HRegion implements HeapSize { // , Writable{ new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + "-" + count++); } @@ -1473,7 +1479,7 @@ public class HRegion implements HeapSize { // , Writable{ // We also set the memstore size to zero here before we allow updates // again so its value will represent the size of the updates received // during the flush - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.TrxStatus w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic @@ -1486,10 +1492,6 @@ public class HRegion implements HeapSize { // , Writable{ List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; try { - // Record the mvcc for all transactions in progress. - w = mvcc.beginMemstoreInsert(); - mvcc.advanceMemstore(w); - if (wal != null) { Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); if (startSeqId == null) { @@ -1529,7 +1531,7 @@ public class HRegion implements HeapSize { // , Writable{ // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.waitForRead(w); + mvcc.waitUntilComplete(); s = "Flushing stores of " + this; status.setStatus(s); @@ -1905,7 +1907,7 @@ public class HRegion implements HeapSize { // , Writable{ Pair[] mutationsAndLocks) throws IOException { return batchMutate(mutationsAndLocks, false); } - + /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. @@ -1954,7 +1956,7 @@ public class HRegion implements HeapSize { // , Writable{ } return batchOp.retCodeDetails; } - + private void doPreMutationHook(BatchOperationInProgress> batchOp) throws IOException { @@ -2005,8 +2007,7 @@ public class HRegion implements HeapSize { // , Writable{ Set deletesCfSet = null; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConsistencyControl.WriteEntry w = null; - long txid = 0; + MultiVersionConsistencyControl.TrxStatus w = null; boolean walSyncSuccessful = false; boolean locked = false; @@ -2109,7 +2110,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + // we should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTimeMillis(); @@ -2141,40 +2142,17 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - // - // ------------------------------------ - // Acquire the latest mvcc number - // ---------------------------------- - w = mvcc.beginMemstoreInsert(); // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } // ------------------------------------ - // STEP 3. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without updating the HLog because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); - } - - // ------------------------------------ - // STEP 4. Build WAL edit + // STEP 3. Build WAL edit // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2183,7 +2161,6 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.NOT_RUN) { continue; } - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); Durability tmpDur = m.getDurability(); @@ -2206,11 +2183,32 @@ public class HRegion implements HeapSize { // , Writable{ } // ------------------------- - // STEP 5. Append the edit to WAL. Do not sync wal. + // STEP 4. Append the edit to WAL. Do not sync wal. // ------------------------- Mutation first = batchOp.operations[firstIndex].getFirst(); - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + w = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + walEdit, first.getClusterId(), now, this.htableDescriptor, mvcc); + + // TODO: after this, somebody else might already sync the wal. So make sure, if exception + // happens in memstore insert, we abort the region server + + // ------------------------------------ + // STEP 5. Write back to memstore + // Write to memstore. The MVCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the MVCC. The MVCC is + // moved only when the sync is complete. + // ---------------------------------- + + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + } // ------------------------------- // STEP 6. Release row locks, etc. @@ -2229,13 +2227,13 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 7. Sync wal. // ------------------------- if (walEdit.size() > 0) { - syncOrDefer(txid, durability); + syncOrDefer(w.getSeqNum(), durability); } walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2244,7 +2242,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); w = null; } @@ -2276,7 +2274,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walSyncSuccessful) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) mvcc.completeMemstoreInsert(w); + if (w != null) mvcc.completeTransaction(w); if (locked) { this.updatesLock.readLock().unlock(); @@ -2358,7 +2356,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row Integer lid = getLock(null, get.getRow(), true); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.completeTransaction(mvcc.startTransaction(log.getSequenceNumber())); List result = null; try { result = get(get, false); @@ -2614,30 +2612,18 @@ public class HRegion implements HeapSize { // , Writable{ * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.TrxStatus localizedWriteEntry) { long size = 0; - boolean freemvcc = false; - - try { - if (localizedWriteEntry == null) { - localizedWriteEntry = mvcc.beginMemstoreInsert(); - freemvcc = true; - } - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List cells = e.getValue(); + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List cells = e.getValue(); - Store store = getStore(family); - for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); - } - } - } finally { - if (freemvcc) { - mvcc.completeMemstoreInsert(localizedWriteEntry); + Store store = getStore(family); + for (Cell cell: cells) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kv.setSeqNum(localizedWriteEntry.getSeqNum()); + size += store.add(kv); } } @@ -3466,6 +3452,7 @@ public class HRegion implements HeapSize { // , Writable{ private long maxResultSize; private HRegion region; + @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } @@ -3658,6 +3645,7 @@ public class HRegion implements HeapSize { // , Writable{ /* * @return True if a filter rules the scanner is over, done. */ + @Override public synchronized boolean isFilterDone() throws IOException { return this.filter != null && this.filter.filterAllRemaining(); } @@ -4555,7 +4543,7 @@ public class HRegion implements HeapSize { // , Writable{ return; } - MultiVersionConsistencyControl.WriteEntry writeEntry = null; + MultiVersionConsistencyControl.TrxStatus writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; List acquiredLocks = null; @@ -4586,39 +4574,36 @@ public class HRegion implements HeapSize { // , Writable{ processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + // 5. Append no sync + if (!walEdit.isEmpty()) { + writeEntry = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getName(), walEdit, + processor.getClusterId(), now, this.htableDescriptor, mvcc); + } + // 6. Apply to memstore for (KeyValue kv : mutations) { - kv.setMemstoreTS(writeEntry.getWriteNumber()); + kv.setSeqNum(writeEntry.getSeqNum()); byte[] family = kv.getFamily(); checkFamily(family); addedSize += stores.get(family).add(kv); } - - long txid = 0; - // 7. Append no sync - if (!walEdit.isEmpty()) { - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getName(), walEdit, - processor.getClusterId(), now, this.htableDescriptor); - } - // 8. Release region lock + // 7. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 9. Release row lock(s) + // 8. Release row lock(s) if (acquiredLocks != null) { for (Integer lid : acquiredLocks) { releaseRowLock(lid); } acquiredLocks = null; } - // 10. Sync edit log - if (txid != 0) { - syncOrDefer(txid, processor.useDurability()); + // 9. Sync edit log + if (writeEntry.getSeqNum() != 0) { + syncOrDefer(writeEntry.getSeqNum(), processor.useDurability()); } walSyncSuccessful = true; } @@ -4631,9 +4616,9 @@ public class HRegion implements HeapSize { // , Writable{ stores.get(kv.getFamily()).rollback(kv); } } - // 11. Roll mvcc forward + // 10. Roll mvcc forward if (writeEntry != null) { - mvcc.completeMemstoreInsert(writeEntry); + mvcc.completeTransaction(writeEntry); writeEntry = null; } if (locked) { @@ -4648,7 +4633,7 @@ public class HRegion implements HeapSize { // , Writable{ } - // 12. Run post-process hook + // 11. Run post-process hook processor.postProcess(this, walEdit); } catch (IOException e) { @@ -4727,26 +4712,26 @@ public class HRegion implements HeapSize { // , Writable{ checkRow(row, "append"); boolean flush = false; boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL; - WALEdit walEdits = null; + WALEdit walEdits = new WALEdit(); + List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); long size = 0; - long txid = 0; checkReadOnly(); // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); - WriteEntry w = null; + TrxStatus w = null; try { Integer lid = getLock(null, row, true); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitUntilComplete(); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + // w = mvcc.beginMemstoreInsert(); TODO: Enis try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4810,14 +4795,10 @@ public class HRegion implements HeapSize { // , Writable{ newKV.getBuffer(), newKV.getQualifierOffset(), kv.getQualifierLength()); - newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Append update to WAL if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } walEdits.add(newKV); } } @@ -4827,20 +4808,23 @@ public class HRegion implements HeapSize { // , Writable{ } // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + w = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); - } else { + this.htableDescriptor, mvcc); + if (!writeToWAL) { recordMutationWithoutWal(append.getFamilyMap()); } //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); + for (KeyValue kv : entry.getValue()) { + kv.setSeqNum(w.getSeqNum()); + } if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); @@ -4861,11 +4845,11 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, append.getDurability()); + syncOrDefer(w.getSeqNum(), append.getDurability()); } } finally { if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); } closeRegionOperation(); } @@ -4896,26 +4880,25 @@ public class HRegion implements HeapSize { // , Writable{ TimeRange tr = increment.getTimeRange(); boolean flush = false; boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL; - WALEdit walEdits = null; + WALEdit walEdits = new WALEdit(); List allKVs = new ArrayList(increment.size()); Map> tempMemstore = new HashMap>(); long size = 0; - long txid = 0; checkReadOnly(); // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); - WriteEntry w = null; + TrxStatus w = null; try { Integer lid = getLock(null, row, true); lock(this.updatesLock.readLock()); // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitUntilComplete(); // now start my own transaction - w = mvcc.beginMemstoreInsert(); + try { long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family @@ -4956,14 +4939,10 @@ public class HRegion implements HeapSize { // , Writable{ // Append new incremented KeyValue to list KeyValue newKV = new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount)); - newKV.setMemstoreTS(w.getWriteNumber()); kvs.add(newKV); // Prepare WAL updates if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } walEdits.add(newKV); } } @@ -4973,19 +4952,23 @@ public class HRegion implements HeapSize { // , Writable{ } // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the orginating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + w = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); - } else { + this.htableDescriptor, mvcc); + if (!writeToWAL) { recordMutationWithoutWal(increment.getFamilyMap()); } + //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); + for (KeyValue kv : entry.getValue()) { + kv.setSeqNum(w.getSeqNum()); + } if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); @@ -5006,11 +4989,11 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, increment.getDurability()); + syncOrDefer(w.getSeqNum(), increment.getDurability()); } } finally { if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); } closeRegionOperation(); if (this.metricsRegion != null) { @@ -5346,7 +5329,7 @@ public class HRegion implements HeapSize { // , Writable{ case DELETE: case BATCH_MUTATE: // when a region is in recovering state, no read, split or merge is allowed - if (this.isRecovering() && (this.disallowWritesInRecovering || + if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 2b7d1f3..86dd551 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -24,9 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; - /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -34,15 +31,16 @@ import org.apache.commons.logging.Log; * the new writes for readers to read (thus forming atomic transactions). */ @InterfaceAudience.Private -public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - - private final Object readWaiters = new Object(); +public class MultiVersionConsistencyControl implements TransactionManager { + private volatile long readSeqNum; + private boolean acceptedWrites; // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); + private final LinkedList writeQueue = + new LinkedList(); + + + private final Object readWaiters = new Object(); private static final ThreadLocal perThreadReadPoint = new ThreadLocal() { @@ -57,7 +55,12 @@ public class MultiVersionConsistencyControl { * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; + reset(); + } + + public void reset() { + this.readSeqNum = 0; + this.acceptedWrites = false; } /** @@ -66,11 +69,12 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { + if (this.acceptedWrites) { + // TODO: not sure whether this is needed throw new RuntimeException("Already used this mvcc. Too late to initialize"); } - this.memstoreRead = this.memstoreWrite = startPoint; + this.readSeqNum = startPoint; } } @@ -109,32 +113,35 @@ public class MultiVersionConsistencyControl { } /** - * Generate and return a {@link WriteEntry} with a new write number. + * Generate and return a {@link TrxStatus} with the write seq number. * To complete the WriteEntry and wait for it to be visible, - * call {@link #completeMemstoreInsert(WriteEntry)}. + * call {@link #completeTransaction(TrxStatus)}. */ - public WriteEntry beginMemstoreInsert() { + @Override + public TrxStatus startTransaction(long seqNum) { synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); + this.acceptedWrites = true; + TrxStatus e = new TrxStatus(seqNum); writeQueue.add(e); return e; } } /** - * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. + * Complete a {@link TrxStatus} that was created by {@link #startTransaction(long)}. * * At the end of this call, the global read point is at least as large as the write point * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. */ - public void completeMemstoreInsert(WriteEntry e) { - advanceMemstore(e); - waitForRead(e); + @Override + public void completeTransaction(TrxStatus e) { + if (!advanceMemstore(e)) { + waitUntilComplete(e.getSeqNum()); + } } /** - * Mark the {@link WriteEntry} as complete and advance the read point as + * Mark the {@link TrxStatus} as complete and advance the read point as * much as possible. * * How much is the read point advanced? @@ -144,56 +151,59 @@ public class MultiVersionConsistencyControl { * @param e * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ - boolean advanceMemstore(WriteEntry e) { + boolean advanceMemstore(TrxStatus e) { synchronized (writeQueue) { e.markCompleted(); long nextReadValue = -1; - boolean ranOnce=false; + if (writeQueue.isEmpty()) { + throw new RuntimeException( + "WriteQueue was empty while it should have contained " + e.getSeqNum()); + } + while (!writeQueue.isEmpty()) { - ranOnce=true; - WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } + TrxStatus queueFirst = writeQueue.getFirst(); if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + nextReadValue = queueFirst.getSeqNum(); writeQueue.removeFirst(); } else { break; } } - if (!ranOnce) { - throw new RuntimeException("never was a first"); - } - if (nextReadValue > 0) { synchronized (readWaiters) { - memstoreRead = nextReadValue; + readSeqNum = nextReadValue; readWaiters.notifyAll(); } } - if (memstoreRead >= e.getWriteNumber()) { + if (readSeqNum >= e.getSeqNum()) { return true; } return false; } } + @Override + public void waitUntilComplete() { + synchronized (writeQueue) { + if (writeQueue.isEmpty()) { + return; + } else { + waitUntilComplete(writeQueue.getLast().getSeqNum()); + } + } + } + /** * Wait for the global readPoint to advance upto * the specified transaction number. */ - public void waitForRead(WriteEntry e) { + private void waitUntilComplete(long seqNum) { boolean interrupted = false; synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { + while (readSeqNum < seqNum) { try { readWaiters.wait(0); } catch (InterruptedException ie) { @@ -207,30 +217,12 @@ public class MultiVersionConsistencyControl { } public long memstoreReadPoint() { - return memstoreRead; - } - - - public static class WriteEntry { - private long writeNumber; - private boolean completed = false; - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } + return readSeqNum; } public static final long FIXED_SIZE = ClassSize.align( ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + + 1 * Bytes.SIZEOF_LONG + + 1 * Bytes.SIZEOF_BOOLEAN + 2 * ClassSize.REFERENCE); - } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TransactionManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TransactionManager.java new file mode 100644 index 0000000..1564040 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TransactionManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * TransactionManager keeps track of transactions using seqNums obtained from the log. + * The transactions have to be received in order of increasing seqNums. They can be + * marked complete in any order. Implementations of this class keep track of outstanding + * transactions. + */ +@InterfaceAudience.Private +public interface TransactionManager { + + public static class TrxStatus { + private long seqNum; + private boolean completed = false; + public TrxStatus(long writeSeqNum) { + this.seqNum = writeSeqNum; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getSeqNum() { + return this.seqNum; + } + } + + /** + * Start a transaction with the given sequenceNumber. It is guaranteed that this sequence number is + * observed in strictly increasing order, and the caller will lock the seqNum until this + * method finishes. This is to ensure that TransactionManager can keep transactions in sorted order. + * @param seqNum the transaction sequence number + * @return a {@link TrxStatus} to track the status of the transaction + */ + public TrxStatus startTransaction(long seqNum); + + public void completeTransaction(TrxStatus e); + + /** + * Waits until all not-completed transactions currently running to complete. + */ + public void waitUntilComplete(); + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 31440a7..91b0014 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.TransactionManager; +import org.apache.hadoop.hbase.regionserver.TransactionManager.TrxStatus; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -117,9 +119,9 @@ class FSHLog implements HLog, Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; - private final AtomicLong unflushedEntries = new AtomicLong(0); + private final AtomicLong lastUnflushedSeqNum = new AtomicLong(0); private volatile long syncedTillHere = 0; - private long lastDeferredTxid; + private long lastDeferredSeqNum; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -442,6 +444,10 @@ class FSHLog implements HLog, Syncable { // every increment of sequence number. LOG.debug("Changed sequenceid from " + id + " to " + newvalue); } + + // TODO: Enis: Should we sync() the log since this might cause a jump in seqNum and the + // other numbers, syncedTillHere, etc should catch up without relying on the optional log + // syncer thread } @Override @@ -654,10 +660,10 @@ class FSHLog implements HLog, Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { + if (this.lastUnflushedSeqNum.get() > this.syncedTillHere) { LOG.debug("cleanupCurrentWriter " + " waiting for transactions to get synced " + - " total " + this.unflushedEntries.get() + + " seqNum " + this.logSeqNum.get() + " synced till here " + syncedTillHere); sync(); } @@ -829,15 +835,17 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, byte [] tableName, WALEdit edits, - final long now, HTableDescriptor htd) + final long now, HTableDescriptor htd, TransactionManager trxManager) throws IOException { - append(info, tableName, edits, now, htd, true); + append(info, tableName, edits, now, htd, true, trxManager); } @Override public void append(HRegionInfo info, byte [] tableName, WALEdit edits, - final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); + final long now, HTableDescriptor htd, boolean isInMemstore, TransactionManager trxManager) + throws IOException { + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, + isInMemstore, trxManager); } /** @@ -866,16 +874,22 @@ class FSHLog implements HLog, Syncable { * @return txid of this transaction * @throws IOException */ - private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + private TrxStatus append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, + TransactionManager trxManager) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; + TrxStatus w; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); + if (edits.isEmpty()) { + // if mutations skip WAL, we still need to start a trx for them with a new seqNum + return trxManager.startTransaction(seqNum); + } + // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -888,27 +902,36 @@ class FSHLog implements HLog, Syncable { HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredSeqNum = seqNum; + } + + this.lastUnflushedSeqNum.set(seqNum); //not all seqNums are appended to log. + if (isInMemstore) { + w = trxManager.startTransaction(seqNum); //have to do this in updateLock + } else { + // if transaction is not in memstore, still return something, but this transaction should not be + // send to TransationManager + w = new TrxStatus(seqNum); } } + System.out.println("appendNoSync:" + seqNum); // Sync if catalog region, and if not then check if that table supports // deferred log flushing if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return w; } @Override - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) + public TrxStatus appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + UUID clusterId, final long now, HTableDescriptor htd, TransactionManager trxManager) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false, true); + return append(info, tableName, edits, clusterId, now, htd, false, true, trxManager); } /** @@ -945,7 +968,7 @@ class FSHLog implements HLog, Syncable { while(!this.isInterrupted() && !closeLogSyncer.get()) { try { - if (unflushedEntries.get() <= syncedTillHere) { + if (lastUnflushedSeqNum.get() <= syncedTillHere) { synchronized (closeLogSyncer) { closeLogSyncer.wait(this.optionalFlushInterval); } @@ -1002,14 +1025,14 @@ class FSHLog implements HLog, Syncable { // sync all known transactions private void syncer() throws IOException { - syncer(this.unflushedEntries.get()); // sync all pending items + syncer(this.lastUnflushedSeqNum.get()); // sync all pending items } - // sync all transactions upto the specified txid - private void syncer(long txid) throws IOException { + // sync all transactions upto the specified seqNum + private void syncer(long seqNum) throws IOException { // if the transaction that we are interested in is already // synced, then return immediately. - if (txid <= this.syncedTillHere) { + if (seqNum <= this.syncedTillHere) { return; } Writer tempWriter; @@ -1022,7 +1045,7 @@ class FSHLog implements HLog, Syncable { tempWriter = this.writer; } try { - long doneUpto; + long doneUpto = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update @@ -1031,16 +1054,17 @@ class FSHLog implements HLog, Syncable { IOException ioe = null; List pending = null; synchronized (flushLock) { - if (txid <= this.syncedTillHere) { + if (seqNum <= this.syncedTillHere) { return; } - doneUpto = this.unflushedEntries.get(); + doneUpto = lastUnflushedSeqNum.get(); pending = logSyncer.getPendingWrites(); + try { logSyncer.hlogFlush(tempWriter, pending); } catch(IOException io) { ioe = io; - LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); + LOG.error("syncer encountered error, will retry. seqNum=" + seqNum, ioe); } } if (ioe != null && pending != null) { @@ -1053,7 +1077,7 @@ class FSHLog implements HLog, Syncable { } } // another thread might have sync'ed avoid double-sync'ing - if (txid <= this.syncedTillHere) { + if (seqNum <= this.syncedTillHere) { return; } try { @@ -1068,6 +1092,7 @@ class FSHLog implements HLog, Syncable { } } this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); + System.out.println("sync:" + syncedTillHere + "->" + seqNum); this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); // TODO: preserving the old behavior for now, but this check is strange. It's not @@ -1267,6 +1292,7 @@ class FSHLog implements HLog, Syncable { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } + return obtainSeqNum(); } @@ -1360,7 +1386,7 @@ class FSHLog implements HLog, Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return lastDeferredSeqNum > syncedTillHere; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 97413b3..3fbfe04 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.regionserver.TransactionManager; +import org.apache.hadoop.hbase.regionserver.TransactionManager.TrxStatus; import org.apache.hadoop.io.Writable; @@ -268,7 +270,7 @@ public interface HLog { * except it causes a sync on the log */ public void append(HRegionInfo info, byte[] tableName, WALEdit edits, - final long now, HTableDescriptor htd) throws IOException; + final long now, HTableDescriptor htd, TransactionManager trxManager) throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) @@ -282,7 +284,8 @@ public interface HLog { * @param isInMemstore Whether the record is in memstore. False for system records. */ public void append(HRegionInfo info, byte[] tableName, WALEdit edits, - final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException; + final long now, HTableDescriptor htd, boolean isInMemstore, TransactionManager trxManager) + throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) @@ -299,8 +302,9 @@ public interface HLog { * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) throws IOException; + public TrxStatus appendNoSync(HRegionInfo info, byte[] tableName, WALEdit edits, + UUID clusterId, final long now, HTableDescriptor htd, TransactionManager trxManager) + throws IOException; public void hsync() throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index f7b1de1..144c73a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -266,7 +266,7 @@ public class HLogUtil { final CompactionDescriptor c) throws IOException { WALEdit e = WALEdit.createCompaction(c); log.append(info, c.getTableName().toByteArray(), e, - EnvironmentEdgeManager.currentTimeMillis(), htd, false); + EnvironmentEdgeManager.currentTimeMillis(), htd, false, null); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index d2bd30c..6e491b1 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -19,14 +19,35 @@ package org.apache.hadoop.hbase.coprocessor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; @@ -34,9 +55,9 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -44,14 +65,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.*; - /** * Tests invocation of the * {@link org.apache.hadoop.hbase.coprocessor.MasterObserver} interface hooks at @@ -78,6 +91,7 @@ public class TestWALObserver { private String logName; private Path oldLogDir; private Path logDir; + private TransactionManager trxManager; @BeforeClass public static void setupBeforeClass() throws Exception { @@ -116,6 +130,7 @@ public class TestWALObserver { if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -186,7 +201,7 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTimeMillis(); - log.append(hri, hri.getTableName(), edit, now, htd); + log.append(hri, hri.getTableName(), edit, now, htd, trxManager); // the edit shall have been change now by the coprocessor. foundFamily0 = false; @@ -249,13 +264,14 @@ public class TestWALObserver { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, EnvironmentEdgeManager.getDelegate(), wal, htd); } - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, trxManager); // sync to fs. wal.sync(); User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime"); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { Path p = runWALSplit(newConf); LOG.info("WALSplit path == " + p); @@ -302,7 +318,7 @@ public class TestWALObserver { /* * Creates an HRI around an HTD that has tableName and three * column families named. - * + * * @param tableName Name of table to use when we create HTableDescriptor. */ private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) { @@ -336,7 +352,7 @@ public class TestWALObserver { /** * Copied from HRegion. - * + * * @param familyMap * map of family->edits * @param walEdit @@ -380,7 +396,7 @@ public class TestWALObserver { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee .currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, trxManager); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index 485b22c..6988d4d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MapReduceTestUtil; import org.junit.AfterClass; import org.junit.Before; @@ -67,6 +67,8 @@ public class TestHLogRecordReader { private static Path logDir; private static String logName; + private TransactionManager trxManager; + private static String getName() { return "TestHLogRecordReader"; } @@ -77,7 +79,7 @@ public class TestHLogRecordReader { for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } - + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -91,7 +93,7 @@ public class TestHLogRecordReader { fs = TEST_UTIL.getDFSCluster().getFileSystem(); hbaseDir = TEST_UTIL.createRootDir(); - + logName = HConstants.HREGION_LOGDIR_NAME; logDir = new Path(hbaseDir, logName); @@ -114,10 +116,10 @@ public class TestHLogRecordReader { long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, tableName, edit, ts, htd); + log.append(info, tableName, edit, ts, htd, trxManager); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, tableName, edit, ts+1, htd); + log.append(info, tableName, edit, ts+1, htd, trxManager); log.rollWriter(); Thread.sleep(1); @@ -125,10 +127,10 @@ public class TestHLogRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, tableName, edit, ts1+1, htd); + log.append(info, tableName, edit, ts1+1, htd, trxManager); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, tableName, edit, ts1+2, htd); + log.append(info, tableName, edit, ts1+2, htd, trxManager); log.close(); HLogInputFormat input = new HLogInputFormat(); @@ -164,7 +166,7 @@ public class TestHLogRecordReader { edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, trxManager); Thread.sleep(1); // make sure 2nd log gets a later timestamp long secondTs = System.currentTimeMillis(); @@ -174,7 +176,7 @@ public class TestHLogRecordReader { edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, trxManager); log.close(); long thirdTs = System.currentTimeMillis(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 53bb9d1..e33f6a9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; @@ -84,7 +85,6 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.log4j.Level; @@ -102,7 +102,7 @@ public class TestDistributedLogSplitting { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this - // turns it off for this test. TODO: Figure out why scr breaks recovery. + // turns it off for this test. TODO: Figure out why scr breaks recovery. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); } @@ -415,9 +415,9 @@ public class TestDistributedLogSplitting { Thread.sleep(2000); LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size()); - + startMasterAndWaitUntilLogSplit(cluster); - + // wait for abort completes TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { @Override @@ -433,7 +433,7 @@ public class TestDistributedLogSplitting { ht.close(); } - + @Test(timeout = 300000) public void testMasterStartsUpWithLogReplayWork() throws Exception { LOG.info("testMasterStartsUpWithLogReplayWork"); @@ -498,9 +498,9 @@ public class TestDistributedLogSplitting { Thread.sleep(2000); LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size()); - + startMasterAndWaitUntilLogSplit(cluster); - + // wait for all regions are fully recovered TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { @Override @@ -518,8 +518,8 @@ public class TestDistributedLogSplitting { ht.close(); } - - + + @Test(timeout = 300000) public void testLogReplayTwoSequentialRSDown() throws Exception { LOG.info("testRecoveredEditsReplayTwoSequentialRSDown"); @@ -682,7 +682,7 @@ public class TestDistributedLogSplitting { this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1")); String originalCheckSum = TEST_UTIL.checksumRows(ht); - + // abort RA and trigger replay abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); @@ -737,10 +737,10 @@ public class TestDistributedLogSplitting { } makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false); makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); - + LOG.info("Disabling table\n"); TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable")); - + // abort RS LOG.info("Aborting region server: " + hrs.getServerName()); hrs.abort("testing"); @@ -795,7 +795,7 @@ public class TestDistributedLogSplitting { assertEquals(NUM_LOG_LINES, count); LOG.info("Verify replayed edits"); assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); - + // clean up for (HRegionInfo hri : regions) { @SuppressWarnings("deprecation") @@ -852,11 +852,11 @@ public class TestDistributedLogSplitting { } } makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100); - + // abort RS LOG.info("Aborting region server: " + hrs.getServerName()); hrs.abort("testing"); - + // wait for abort completes TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { @Override @@ -864,7 +864,7 @@ public class TestDistributedLogSplitting { return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); } }); - + // wait for regions come online TEST_UTIL.waitFor(180000, 100, new Waiter.Predicate() { @Override @@ -922,6 +922,7 @@ public class TestDistributedLogSplitting { 100); new Thread() { + @Override public void run() { waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); for (RegionServerThread rst : rsts) { @@ -1128,7 +1129,7 @@ public class TestDistributedLogSplitting { assertTrue(isMetaRegionInRecovery); master.getMasterFileSystem().splitMetaLog(hrs.getServerName()); - + isMetaRegionInRecovery = false; recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); @@ -1146,7 +1147,7 @@ public class TestDistributedLogSplitting { return installTable(zkw, tname, fname, nrs, 0); } - HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, int existingRegions) throws Exception { // Create a table with regions byte [] table = Bytes.toBytes(tname); @@ -1216,6 +1217,8 @@ public class TestDistributedLogSplitting { HTableDescriptor htd = new HTableDescriptor(tname); byte[] value = new byte[edit_size]; + TransactionManager trxManager = org.mockito.Mockito.mock(TransactionManager.class); + List hris = new ArrayList(); for (HRegionInfo region : regions) { if (!region.getTableNameAsString().equalsIgnoreCase(tname)) { @@ -1243,7 +1246,7 @@ public class TestDistributedLogSplitting { byte[] family = Bytes.toBytes(fname); byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd); + log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd, trxManager); counts[i % n] += 1; } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 86a30c8..05925d0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -33,17 +34,23 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.experimental.categories.Category; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import org.junit.experimental.categories.Category; /** memstore test case */ @Category(MediumTests.class) @@ -167,7 +174,7 @@ public class TestMemStore extends TestCase { /** * A simple test which verifies the 3 possible states when scanning across snapshot. * @throws IOException - * @throws CloneNotSupportedException + * @throws CloneNotSupportedException */ public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException { // we are going to the scanning across snapshot with two kvs @@ -235,33 +242,34 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + long seqNum = 1; + MultiVersionConsistencyControl.TrxStatus w = + mvcc.startTransaction(seqNum++); KeyValue kv1 = new KeyValue(row, f, q1, v); - kv1.setMemstoreTS(w.getWriteNumber()); + kv1.setSeqNum(w.getSeqNum()); memstore.add(kv1); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsert(); + w = mvcc.startTransaction(seqNum++); KeyValue kv2 = new KeyValue(row, f, q2, v); - kv2.setMemstoreTS(w.getWriteNumber()); + kv2.setSeqNum(w.getSeqNum()); memstore.add(kv2); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); @@ -282,18 +290,19 @@ public class TestMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); final byte[] v2 = Bytes.toBytes("value2"); + long seqNum = 1; // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.TrxStatus w = + mvcc.startTransaction(seqNum++); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSeqNum(w.getSeqNum()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSeqNum(w.getSeqNum()); memstore.add(kv12); - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); @@ -301,13 +310,13 @@ public class TestMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsert(); + w = mvcc.startTransaction(seqNum++); KeyValue kv21 = new KeyValue(row, f, q1, v2); - kv21.setMemstoreTS(w.getWriteNumber()); + kv21.setSeqNum(w.getSeqNum()); memstore.add(kv21); KeyValue kv22 = new KeyValue(row, f, q2, v2); - kv22.setMemstoreTS(w.getWriteNumber()); + kv22.setSeqNum(w.getSeqNum()); memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS @@ -316,7 +325,7 @@ public class TestMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with @@ -337,18 +346,19 @@ public class TestMemStore extends TestCase { final byte[] q1 = Bytes.toBytes("q1"); final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); + long seqNum = 1; // INSERT 1: Write both columns val1 - MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.TrxStatus w = + mvcc.startTransaction(seqNum++); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMemstoreTS(w.getWriteNumber()); + kv11.setSeqNum(w.getSeqNum()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMemstoreTS(w.getWriteNumber()); + kv12.setSeqNum(w.getSeqNum()); memstore.add(kv12); - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); @@ -356,10 +366,10 @@ public class TestMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsert(); + w = mvcc.startTransaction(seqNum++); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); - kvDel.setMemstoreTS(w.getWriteNumber()); + kvDel.setSeqNum(w.getSeqNum()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS @@ -368,7 +378,7 @@ public class TestMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); // NOW WE SHOULD SEE DELETE MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); @@ -387,21 +397,20 @@ public class TestMemStore extends TestCase { final MultiVersionConsistencyControl mvcc; final MemStore memstore; + final AtomicLong seqNumGenerator; AtomicReference caughtException; - - public ReadOwnWritesTester(int id, - MemStore memstore, - MultiVersionConsistencyControl mvcc, - AtomicReference caughtException) - { + public ReadOwnWritesTester(int id, MemStore memstore, MultiVersionConsistencyControl mvcc, + AtomicReference caughtException, AtomicLong seqNumGenerator) { this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); + this.seqNumGenerator = seqNumGenerator; } + @Override public void run() { try { internalRun(); @@ -412,26 +421,29 @@ public class TestMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.TrxStatus w; + synchronized (seqNumGenerator) { + w = mvcc.startTransaction(seqNumGenerator.incrementAndGet()); + } // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); KeyValue kv = new KeyValue(row, f, q1, i, v); - kv.setMemstoreTS(w.getWriteNumber()); + kv.setSeqNum(w.getSeqNum()); memstore.add(kv); - mvcc.completeMemstoreInsert(w); + mvcc.completeTransaction(w); // Assert that we can read back MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); KeyValue ret = s.next(); assertNotNull("Didnt find own write at all", ret); - assertEquals("Didnt read own writes", + assertEquals("Didnt read own writes. seqnum:" + w.getSeqNum() + ", readPoint:" + readPoint, kv.getTimestamp(), ret.getTimestamp()); } } @@ -444,8 +456,9 @@ public class TestMemStore extends TestCase { ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS]; AtomicReference caught = new AtomicReference(); + AtomicLong seqNumGenerator = new AtomicLong(1); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, seqNumGenerator); threads[i].start(); } @@ -865,7 +878,7 @@ public class TestMemStore extends TestCase { KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); - kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1); + kv1.setSeqNum(1); kv2.setSeqNum(1);kv3.setSeqNum(1); l.add(kv1); l.add(kv2); l.add(kv3); this.memstore.upsert(l, 2);// readpoint is 2 @@ -873,7 +886,7 @@ public class TestMemStore extends TestCase { assert(newSize > oldSize); KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); - kv4.setMvccVersion(1); + kv4.setSeqNum(1); l.clear(); l.add(kv4); this.memstore.upsert(l, 3); assertEquals(newSize, this.memstore.size.get()); @@ -881,12 +894,12 @@ public class TestMemStore extends TestCase { } //////////////////////////////////// - // Test for periodic memstore flushes + // Test for periodic memstore flushes // based on time of oldest edit //////////////////////////////////// /** - * Tests that the timeOfOldestEdit is updated correctly for the + * Tests that the timeOfOldestEdit is updated correctly for the * various edit operations in memstore. * @throws Exception */ @@ -902,7 +915,7 @@ public class TestMemStore extends TestCase { memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); t = memstore.timeOfOldestEdit(); assertTrue(t == 1234); - // snapshot() will reset timeOfOldestEdit. The method will also assert the + // snapshot() will reset timeOfOldestEdit. The method will also assert the // value is reset to Long.MAX_VALUE t = runSnapshot(memstore); @@ -915,7 +928,7 @@ public class TestMemStore extends TestCase { // test the case that the timeOfOldestEdit is updated after a KV upsert List l = new ArrayList(); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); - kv1.setMvccVersion(100); + kv1.setSeqNum(100); l.add(kv1); memstore.upsert(l, 1000); t = memstore.timeOfOldestEdit(); @@ -929,7 +942,7 @@ public class TestMemStore extends TestCase { * Tests the HRegion.shouldFlush method - adds an edit in the memstore * and checks that shouldFlush returns true, and another where it disables * the periodic flush functionality and tests whether shouldFlush returns - * false. + * false. * @throws Exception */ public void testShouldFlush() throws Exception { @@ -967,7 +980,7 @@ public class TestMemStore extends TestCase { long t = 1234; @Override public long currentTimeMillis() { - return t; + return t; } public void setCurrentTimeMillis(long t) { this.t = t; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index 40fafd9..1e9b129 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -17,14 +17,15 @@ */ package org.apache.hadoop.hbase.regionserver; -import junit.framework.TestCase; -import org.apache.hadoop.hbase.SmallTests; -import org.junit.experimental.categories.Category; - import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + /** * This is a hammer test that verifies MultiVersionConsistencyControl in a * multiple writer single reader scenario. @@ -35,19 +36,27 @@ public class TestMultiVersionConsistencyControl extends TestCase { final AtomicBoolean finished; final MultiVersionConsistencyControl mvcc; final AtomicBoolean status; + final AtomicLong seqNumGenerator; - Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { + Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status, + AtomicLong seqNumGenerator) { this.finished = finished; this.mvcc = mvcc; this.status = status; + this.seqNumGenerator = seqNumGenerator; } private Random rnd = new Random(); public boolean failed = false; + @Override public void run() { while (!finished.get()) { - MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.TrxStatus e; + synchronized (seqNumGenerator) { + e = mvcc.startTransaction(seqNumGenerator.incrementAndGet()); + } + // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); @@ -58,7 +67,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { } catch (InterruptedException e1) { } try { - mvcc.completeMemstoreInsert(e); + mvcc.completeTransaction(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -80,6 +89,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { final AtomicBoolean readerFailed = new AtomicBoolean(false); final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { + @Override public void run() { long prev = mvcc.memstoreReadPoint(); while (!finished.get()) { @@ -97,6 +107,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { }; // writer thread parallelism. + final AtomicLong seqNumGenerator = new AtomicLong(); int n = 20; Thread[] writers = new Thread[n]; AtomicBoolean[] statuses = new AtomicBoolean[n]; @@ -104,7 +115,7 @@ public class TestMultiVersionConsistencyControl extends TestCase { for (int i = 0; i < n; ++i) { statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i], seqNumGenerator)); writers[i].start(); } readThread.start(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 579d249..8c1c1df 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Tool; @@ -86,11 +87,13 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool this.htd = htd; } + @Override public void run() { byte[] key = new byte[keySize]; byte[] value = new byte[valueSize]; Random rand = new Random(Thread.currentThread().getId()); HLog hlog = region.getLog(); + TransactionManager trxManager = org.mockito.Mockito.mock(TransactionManager.class); try { long startTime = System.currentTimeMillis(); @@ -102,9 +105,9 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTableName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, htd); + HConstants.DEFAULT_CLUSTER_ID, now, htd, trxManager); } else { - hlog.append(hri, hri.getTableName(), walEdit, now, htd); + hlog.append(hri, hri.getTableName(), walEdit, now, htd, trxManager); } } long totalTime = (System.currentTimeMillis() - startTime); @@ -178,6 +181,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool final long whenToRoll = roll; HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) { int appends = 0; + @Override protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, HTableDescriptor htd) throws IOException { @@ -241,7 +245,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool * @throws IOException */ private long verify(final Path wal, final boolean verbose) throws IOException { - HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), + HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf()); long previousSeqid = -1; long count = 0; @@ -353,7 +357,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool * Call this method to avoid the {@link #main(String[])} System.exit. * @param args * @return errCode - * @throws Exception + * @throws Exception */ static int innerMain(final String [] args) throws Exception { return ToolRunner.run(HBaseConfiguration.create(), new HLogPerformanceEvaluation(), args); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index c24dfc7..ab01ce7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -18,13 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; -import java.util.TreeMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,13 +41,21 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; +import org.apache.hadoop.hbase.regionserver.TransactionManager; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -77,7 +91,9 @@ public class TestHLog { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Path hbaseDir; private static Path oldLogDir; - + + private TransactionManager trxManager; + @Before public void setUp() throws Exception { @@ -85,7 +101,7 @@ public class TestHLog { for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } - + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -158,7 +174,7 @@ public class TestHLog { final byte [] tableName = Bytes.toBytes(getName()); final byte [] rowName = tableName; Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); - HLog log = HLogFactory.createHLog(fs, hbaseDir, + HLog log = HLogFactory.createHLog(fs, hbaseDir, HConstants.HREGION_LOGDIR_NAME, conf); final int howmany = 3; HRegionInfo[] infos = new HRegionInfo[3]; @@ -187,7 +203,7 @@ public class TestHLog { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); log.append(infos[i], tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, trxManager); } } log.rollWriter(); @@ -238,7 +254,7 @@ public class TestHLog { in.close(); HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf); - + final int total = 20; HLog.Reader reader = null; @@ -251,7 +267,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, trxManager); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -269,7 +285,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, trxManager); } reader = HLogFactory.createReader(fs, walPath, conf); count = 0; @@ -288,7 +304,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, trxManager); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -368,10 +384,10 @@ public class TestHLog { } } } - + /* * We pass different values to recoverFileLease() so that different code paths are covered - * + * * For this test to pass, requires: * 1. HDFS-200 (append support) * 2. HDFS-988 (SafeMode should freeze file operations @@ -394,7 +410,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); - wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, trxManager); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -456,6 +472,7 @@ public class TestHLog { class RecoverLogThread extends Thread { public Exception exception = null; + @Override public void run() { try { FSUtils.getInstance(fs, rlConf) @@ -507,7 +524,7 @@ public class TestHLog { HLog log = null; try { log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); - + // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); @@ -522,7 +539,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(info, tableName, cols, System.currentTimeMillis(), htd); + log.append(info, tableName, cols, System.currentTimeMillis(), htd, trxManager); log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); @@ -578,7 +595,7 @@ public class TestHLog { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, trxManager); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); @@ -632,7 +649,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, trxManager); } assertEquals(COL_COUNT, visitor.increments); log.unregisterWALActionsListener(visitor); @@ -640,7 +657,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, trxManager); assertEquals(COL_COUNT, visitor.increments); } finally { if (log != null) log.closeAndDelete(); @@ -653,7 +670,7 @@ public class TestHLog { final byte [] tableName = Bytes.toBytes("testLogCleaning"); final byte [] tableName2 = Bytes.toBytes("testLogCleaning2"); - HLog log = HLogFactory.createHLog(fs, hbaseDir, + HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); try { HRegionInfo hri = new HRegionInfo(tableName, @@ -730,7 +747,7 @@ public class TestHLog { @Test public void testWALCoprocessorLoaded() throws Exception { // test to see whether the coprocessor is loaded or not. - HLog log = HLogFactory.createHLog(fs, hbaseDir, + HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf); try { WALCoprocessorHost host = log.getCoprocessorHost(); @@ -751,10 +768,10 @@ public class TestHLog { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, tableName, cols, timestamp, htd); + log.append(hri, tableName, cols, timestamp, htd, trxManager); } } - + /** * @throws IOException @@ -947,7 +964,7 @@ public class TestHLog { @Override public void logRollRequested() { // TODO Auto-generated method stub - + } @Override diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 3ac0739..c99f6e2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -42,11 +42,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException; -import org.apache.log4j.Level; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -62,19 +57,25 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.ipc.RemoteException; +import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -128,6 +129,8 @@ public class TestHLogSplit { private static String ZOMBIE; private static String [] GROUP = new String [] {"supergroup"}; + private TransactionManager trxManager; + static enum Corruptions { INSERT_GARBAGE_ON_FIRST_LINE, INSERT_GARBAGE_IN_THE_MIDDLE, @@ -142,7 +145,7 @@ public class TestHLogSplit { TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl", InstrumentedSequenceFileLogWriter.class, HLog.Writer.class); TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); - TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); // Create fake maping user to group and set it to the conf. @@ -177,6 +180,7 @@ public class TestHLogSplit { REGIONS.clear(); Collections.addAll(REGIONS, "bbb", "ccc"); InstrumentedSequenceFileLogWriter.activateFailure = false; + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -873,6 +877,7 @@ public class TestHLogSplit { // Set up a splitter that will throw an IOE on the output side HLogSplitter logSplitter = new HLogSplitter( conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { + @Override protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -937,6 +942,7 @@ public class TestHLogSplit { "Blocklist for " + OLDLOGDIR + " has changed"}; private int count = 0; + @Override public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { if (count < 3) { throw new IOException(errors[count++]); @@ -964,7 +970,7 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); final AtomicInteger count = new AtomicInteger(); - + CancelableProgressable localReporter = new CancelableProgressable() { @Override @@ -976,6 +982,7 @@ public class TestHLogSplit { FileSystem spiedFs = Mockito.spy(fs); Mockito.doAnswer(new Answer() { + @Override public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(1500); // Sleep a while and wait report status invoked return (FSDataInputStream)invocation.callRealMethod(); @@ -1046,6 +1053,7 @@ public class TestHLogSplit { localConf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { /* Produce a mock writer that doesn't write anywhere */ + @Override protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -1078,6 +1086,7 @@ public class TestHLogSplit { /* Produce a mock reader that generates fake entries */ + @Override protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf, CancelableProgressable reporter) throws IOException { Reader mockReader = Mockito.mock(Reader.class); @@ -1144,7 +1153,7 @@ public class TestHLogSplit { kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); - log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, trxManager); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); @@ -1364,6 +1373,7 @@ public class TestHLogSplit { HLogSplitter logSplitter = new HLogSplitter( conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) { + @Override protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index eba84a1..e250769 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,7 +56,7 @@ public class TestLogRollingNoCluster { Path dir = TEST_UTIL.getDataTestDir(); HLog wal = HLogFactory.createHLog(fs, dir, "logs", TEST_UTIL.getConfiguration()); - + Appender [] appenders = null; final int count = THREAD_COUNT; @@ -88,12 +89,14 @@ public class TestLogRollingNoCluster { private final HLog wal; private final int count; private Exception e = null; + private TransactionManager trxManager; Appender(final HLog wal, final int index, final int count) { super("" + index); this.wal = wal; this.count = count; this.log = LogFactory.getLog("Appender:" + getName()); + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } /** @@ -123,7 +126,7 @@ public class TestLogRollingNoCluster { this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC.getName(), - edit, now, HTableDescriptor.META_TABLEDESC); + edit, now, HTableDescriptor.META_TABLEDESC, trxManager); } String msg = getName() + " finished"; if (isException()) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 86047ad..535696a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; @@ -26,7 +28,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -34,8 +43,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; - /** * Test that the actions are called while playing with an HLog */ @@ -53,6 +60,8 @@ public class TestWALActionsListener { private static String logName; private static Configuration conf; + private TransactionManager trxManager; + @BeforeClass public static void setUpBeforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); @@ -69,6 +78,7 @@ public class TestWALActionsListener { public void setUp() throws Exception { fs.delete(logDir, true); fs.delete(oldLogDir, true); + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -100,7 +110,7 @@ public class TestWALActionsListener { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(b)); - hlog.append(hri, b, edit, 0, htd); + hlog.append(hri, b, edit, 0, htd, trxManager); if (i == 10) { hlog.registerWALActionsListener(laterobserver); } @@ -165,6 +175,7 @@ public class TestWALActionsListener { closedCount++; } + @Override public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { //To change body of implemented methods use File | Settings | File Templates. } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 0ffd132..ac5de13 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -21,13 +21,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,11 +68,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; @@ -98,6 +97,7 @@ public class TestWALReplay { private Path logDir; private FileSystem fs; private Configuration conf; + private TransactionManager trxManager; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -128,6 +128,7 @@ public class TestWALReplay { if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -147,7 +148,7 @@ public class TestWALReplay { } /** - * + * * @throws Exception */ @Test @@ -343,6 +344,7 @@ public class TestWALReplay { User user = HBaseTestingUtility.getDifferentUser(newConf, tableNameStr); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { runWALSplit(newConf); HLog wal2 = createWAL(newConf); @@ -437,6 +439,7 @@ public class TestWALReplay { User user = HBaseTestingUtility.getDifferentUser(newConf, tableNameStr); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { runWALSplit(newConf); FileSystem newFS = FileSystem.get(newConf); @@ -715,14 +718,14 @@ public class TestWALReplay { long now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, trxManager); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, trxManager); // Sync. wal.sync(); @@ -735,6 +738,7 @@ public class TestWALReplay { User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime"); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { runWALSplit(newConf); FileSystem newFS = FileSystem.get(newConf); @@ -746,6 +750,7 @@ public class TestWALReplay { try { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { + @Override protected boolean internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { @@ -822,7 +827,7 @@ public class TestWALReplay { for (FileStatus fileStatus : listStatus1) { editCount = Integer.parseInt(fileStatus.getPath().getName()); } - // The sequence number should be same + // The sequence number should be same assertEquals( "The sequence number of the recoverd.edits and the current edit seq should be same", lastestSeqNumber, editCount); @@ -850,7 +855,7 @@ public class TestWALReplay { htd.addFamily(a); return htd; } - + private MockHLog createMockWAL(Configuration conf) throws IOException { MockHLog wal = new MockHLog(FileSystem.get(conf), hbaseRootDir, logName, conf); // Set down maximum recovery so we dfsclient doesn't linger retrying something @@ -876,7 +881,7 @@ public class TestWALReplay { @Override public void requestDelayedFlush(HRegion region, long when) { // TODO Auto-generated method stub - + } } @@ -891,7 +896,7 @@ public class TestWALReplay { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, trxManager); } } @@ -941,7 +946,7 @@ public class TestWALReplay { * @throws IOException */ private HLog createWAL(final Configuration c) throws IOException { - HLog wal = HLogFactory.createHLog(FileSystem.get(c), + HLog wal = HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 2d334e6..327e52d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.regionserver.TransactionManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -102,11 +103,13 @@ public class TestReplicationSourceManager { private static Path oldLogDir; private static Path logDir; - + private static CountDownLatch latch; private static List files = new ArrayList(); + private TransactionManager trxManager; + @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -161,6 +164,7 @@ public class TestReplicationSourceManager { public void setUp() throws Exception { fs.delete(logDir, true); fs.delete(oldLogDir, true); + trxManager = org.mockito.Mockito.mock(TransactionManager.class); } @After @@ -193,7 +197,7 @@ public class TestReplicationSourceManager { LOG.info(i); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, trxManager); } // Simulate a rapid insert that's followed @@ -204,7 +208,7 @@ public class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, trxManager); } assertEquals(6, manager.getHLogs().get(slaveId).size()); @@ -214,14 +218,14 @@ public class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false, false); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, trxManager); assertEquals(1, manager.getHLogs().size()); // TODO Need a case with only 2 HLogs and we only want to delete the first one } - + @Test public void testClaimQueues() throws Exception { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); @@ -303,13 +307,13 @@ public class TestReplicationSourceManager { assertTrue(result.contains(server.getServerName().getServerName())); assertTrue(result.contains(s1.getServerName().getServerName())); assertTrue(result.contains(s2.getServerName().getServerName())); - + // close out the resources. rz.close(); server.abort("", null); } - - + + static class DummyNodeFailoverWorker extends Thread { private SortedMap> logZnodesMap; Server server; @@ -356,7 +360,7 @@ public class TestReplicationSourceManager { return 0; } } - + static class DummyServer implements Server { String hostname;