diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index 9b7107b..d760aa2 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -68,19 +68,6 @@ public class CellComparator implements Comparator, Serializable { if (!ignoreSequenceid) { // Negate following comparisons so later edits show up first - - // compare log replay tag value if there is any - // when either keyvalue tagged with log replay sequence number, we need to compare them: - // 1) when both keyvalues have the tag, then use the tag values for comparison - // 2) when one has and the other doesn't have, the one without the log - // replay tag wins because - // it means the edit isn't from recovery but new one coming from clients during recovery - // 3) when both doesn't have, then skip to the next mvcc comparison - long leftChangeSeqNum = getReplaySeqNum(a); - long RightChangeSeqNum = getReplaySeqNum(b); - if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) { - return Longs.compare(RightChangeSeqNum, leftChangeSeqNum); - } // mvccVersion: later sorts first return Longs.compare(b.getMvccVersion(), a.getMvccVersion()); } else { @@ -88,22 +75,6 @@ public class CellComparator implements Comparator, Serializable { } } - /** - * Return replay log sequence number for the cell - * - * @param c - * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG - */ - private static long getReplaySeqNum(final Cell c) { - Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(), - TagType.LOG_REPLAY_TAG_TYPE); - - if (tag != null) { - return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()); - } - return Long.MAX_VALUE; - } - public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset() diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 516fd81..9920626 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -746,6 +746,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); + this.seqId = c.getSequenceId(); } /** diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index 45c8476..9415d71 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -26,6 +26,5 @@ public final class TagType { // Please declare new Tag Types here to avoid step on pre-existing tag types. public static final byte ACL_TAG_TYPE = (byte) 1; public static final byte VISIBILITY_TAG_TYPE = (byte) 2; - public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index b67a1c4..3128880 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -867,12 +867,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); if (LOG.isDebugEnabled()) { - LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); + LOG.debug("Distributed log replay=" + dlr); } - // For distributed log replay, hfile version must be 3 at least; we need tag support. - return dlr && (version >= 3); + return dlr; } private boolean resubmit(ServerName serverName, String path, int version) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4a4d004..91de182 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // continue; } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay); } // ------------------------------------ @@ -3189,12 +3189,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. * @param output newly added KVs into memstore + * @param isInReplay true when adding replayed KVs into memstore * @return the additional memory usage of the memstore caused by the * new entries. * @throws IOException */ private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, List memstoreCells) throws IOException { + long mvccNum, List memstoreCells, boolean isInReplay) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3209,6 +3210,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // Pair ret = store.add(cell); size += ret.getFirst(); memstoreCells.add(ret.getSecond()); + if(isInReplay) { + // set memstore newly added cells with replay mvcc number + CellUtil.setSequenceId(ret.getSecond(), mvccNum); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 41f1a99..9abcef6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1432,8 +1432,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ? region.getCoprocessorHost() : null; // do not invoke coprocessors if this is a secondary region replica List> walEntries = new ArrayList>(); - // when tag is enabled, we need tag replay edits with log sequence number - boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); // Skip adding the edits to WAL if this is a secondary region replica boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); @@ -1454,7 +1452,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Pair walEntry = (coprocessorHost == null) ? null : new Pair(); List edits = WALSplitter.getMutationsFromWALEntry(entry, - cells, walEntry, needAddReplayTag, durability); + cells, walEntry, durability); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index cc03e09..df76073 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -92,8 +92,9 @@ public class DefaultCompactor extends Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } + writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); + true, fd.maxTagsLength > 0); boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); if (!finished) { writer.close(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 20af77d..4416df1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -115,13 +115,13 @@ public class StripeCompactor extends Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; + final Compression.Algorithm compression = store.getFamily().getCompactionCompression(); StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { @Override public Writer createWriter() throws IOException { return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); + fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0); } }; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index ef7b6ff..3f381cc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -2135,34 +2135,6 @@ public class WALSplitter { public final long nonce; } - /** - * Tag original sequence number for each edit to be replayed - * @param seqId - * @param cell - */ - private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) { - // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet - boolean needAddRecoveryTag = true; - if (cell.getTagsLength() > 0) { - Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), - TagType.LOG_REPLAY_TAG_TYPE); - if (tmpTag != null) { - // found an existing log replay tag so reuse it - needAddRecoveryTag = false; - } - } - if (needAddRecoveryTag) { - List newTags = new ArrayList(); - Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId)); - newTags.add(replayTag); - if (cell.getTagsLength() > 0) { - newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength())); - } - return new TagRewriteCell(cell, Tag.fromList(newTags)); - } - return cell; - } - /** * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & * WALEdit from the passed in WALEntry @@ -2170,12 +2142,11 @@ public class WALSplitter { * @param cells * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * extracted from the passed in WALEntry. - * @param addLogReplayTag * @return list of Pair to be replayed * @throws IOException */ public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair logEntry, boolean addLogReplayTag, Durability durability) + Pair logEntry, Durability durability) throws IOException { if (entry == null) { @@ -2223,11 +2194,7 @@ public class WALSplitter { if (CellUtil.isDelete(cell)) { ((Delete) m).addDeleteMarker(cell); } else { - Cell tmpNewCell = cell; - if (addLogReplayTag) { - tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell); - } - ((Put) m).add(tmpNewCell); + ((Put) m).add(cell); } m.setDurability(durability); previousCell = cell; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 19050d5..f37c1eb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -1188,7 +1188,6 @@ public class TestDistributedLogSplitting { LOG.info("testSameVersionUpdatesRecovery"); conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); - conf.setInt("hfile.format.version", 3); startCluster(NUM_RS); final AtomicLong sequenceId = new AtomicLong(100); final int NUM_REGIONS_TO_CREATE = 40; @@ -1282,11 +1281,10 @@ public class TestDistributedLogSplitting { conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); conf.setInt("hbase.hstore.compactionThreshold", 3); - conf.setInt("hfile.format.version", 3); startCluster(NUM_RS); final AtomicLong sequenceId = new AtomicLong(100); final int NUM_REGIONS_TO_CREATE = 40; - final int NUM_LOG_LINES = 1000; + final int NUM_LOG_LINES = 2000; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false);