diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 76466d7..d5a8c66 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3257,8 +3257,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], replay, + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + if (updateSeqId) { + this.updateSequenceId(familyMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + addedSize += applyFamilyMapToMemstore(familyMaps[i]); } // STEP 6. Complete mvcc. @@ -3674,6 +3681,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void updateSequenceId(final Iterable> cellItr, final long sequenceId) + throws IOException { + for (List cells: cellItr) { + if (cells == null) return; + for (Cell cell : cells) { + CellUtil.setSequenceId(cell, sequenceId); + } + } + } + @Override public void updateCellTimestamps(final Iterable> cellItr, final byte[] now) throws IOException { @@ -3784,15 +3801,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param familyMap Map of Cells by family * @return the additional memory usage of the memstore caused by the new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap, boolean replay, - long sequenceId) + private long applyFamilyMapToMemstore(Map> familyMap) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List cells = e.getValue(); assert cells instanceof RandomAccess; - size += applyToMemstore(getStore(family), cells, false, replay, sequenceId); + size += applyToMemstore(getStore(family), cells, false); } return size; } @@ -3804,23 +3820,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Memstore change in size on insert of these Cells. * @see #applyToMemstore(Store, Cell, long) */ - private long applyToMemstore(final Store store, final List cells, - final boolean delta, boolean replay, long sequenceId) + private long applyToMemstore(final Store store, final List cells, final boolean delta) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! long size = 0; boolean upsert = delta && store.getFamily().getMaxVersions() == 1; - int count = cells.size(); if (upsert) { size += store.upsert(cells, getSmallestReadPoint()); } else { + int count = cells.size(); for (int i = 0; i < count; i++) { Cell cell = cells.get(i); - // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack - // When is it zero anyways? When replay? Then just rely on that flag. - if (cell.getSequenceId() == 0 || replay) { - CellUtil.setSequenceId(cell, sequenceId); - } size += store.add(cell); } } @@ -3831,7 +3841,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Memstore change in size on insert of these Cells. * @see #applyToMemstore(Store, List, boolean, boolean, long) */ - private long applyToMemstore(final Store store, final Cell cell, long sequenceId) + private long applyToMemstore(final Store store, final Cell cell) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! if (store == null) { @@ -7046,7 +7056,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi CellUtil.setSequenceId(cell, sequenceId); } Store store = getStore(cell); - addedSize += applyToMemstore(store, cell, sequenceId); + addedSize += applyToMemstore(store, cell); } } // STEP 8. Complete mvcc. @@ -7232,12 +7242,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // transaction. recordMutationWithoutWal(mutation.getFamilyCellMap()); writeEntry = mvcc.begin(); + updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); } // Now write to MemStore. Do it a column family at a time. - long sequenceId = writeEntry.getWriteNumber(); for (Map.Entry> e: forMemStore.entrySet()) { - accumulatedResultSize += - applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId); + accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), true); } mvcc.completeAndWait(writeEntry); if (rsServices != null && rsServices.getNonceManager() != null) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index f10cce3a..fbbf56e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -4499,6 +4500,66 @@ public class TestFromClientSide { assertEquals(r.getColumnLatestCell(FAMILY, QUALIFIERS[0]).getTimestamp(), r.getColumnLatestCell(FAMILY, QUALIFIERS[2]).getTimestamp()); } + private List doAppend(final boolean walUsed) throws IOException { + LOG.info("Starting testAppend, walUsed is " + walUsed); + final TableName TABLENAME = TableName.valueOf(walUsed ? "testAppendWithWAL" : "testAppendWithoutWAL"); + Table t = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] row1 = Bytes.toBytes("c"); + final byte[] row2 = Bytes.toBytes("b"); + final byte[] row3 = Bytes.toBytes("a"); + final byte[] qual = Bytes.toBytes("qual"); + Put put_0 = new Put(row2); + put_0.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Put put_1 = new Put(row3); + put_1.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Append append_0 = new Append(row1); + append_0.add(FAMILY, qual, Bytes.toBytes("i")); + Append append_1 = new Append(row1); + append_1.add(FAMILY, qual, Bytes.toBytes("k")); + Append append_2 = new Append(row1); + append_2.add(FAMILY, qual, Bytes.toBytes("e")); + if (!walUsed) { + append_2.setDurability(Durability.SKIP_WAL); + } + Append append_3 = new Append(row1); + append_3.add(FAMILY, qual, Bytes.toBytes("a")); + Scan s = new Scan(); + s.setCaching(1); + t.append(append_0); + t.put(put_0); + t.put(put_1); + List results = new LinkedList<>(); + try (ResultScanner scanner = t.getScanner(s)) { + t.append(append_1); + t.append(append_2); + t.append(append_3); + for (Result r : scanner) { + results.add(r); + } + } + TEST_UTIL.deleteTable(TABLENAME); + return results; + } + + @Test + public void testAppendWithoutWAL() throws Exception { + List resultsWithWal = doAppend(true); + List resultsWithoutWal = doAppend(false); + assertEquals(resultsWithWal.size(), resultsWithoutWal.size()); + for (int i = 0; i != resultsWithWal.size(); ++i) { + Result resultWithWal = resultsWithWal.get(i); + Result resultWithoutWal = resultsWithoutWal.get(i); + assertEquals(resultWithWal.rawCells().length, resultWithoutWal.rawCells().length); + for (int j = 0; j != resultWithWal.rawCells().length; ++j) { + Cell cellWithWal = resultWithWal.rawCells()[j]; + Cell cellWithoutWal = resultWithoutWal.rawCells()[j]; + assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), CellUtil.cloneRow(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), CellUtil.cloneFamily(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), CellUtil.cloneQualifier(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), CellUtil.cloneValue(cellWithoutWal))); + } + } + } @Test public void testClientPoolRoundRobin() throws IOException {