diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 94ae510..8412d6e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -235,6 +235,15 @@ public class DefaultMemStore implements MemStore { } @Override + public long add(Iterable cells) { + long size = 0; + for (Cell cell : cells) { + size += add(cell); + } + return size; + } + + @Override public long timeOfOldestEdit() { return timeOfOldestEdit; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ed32536..c399f4a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3324,8 +3324,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi != OperationStatusCode.NOT_RUN) { continue; } + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + boolean updateSeqId = isInReplay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + if (updateSeqId) { + updateSequenceId(familyMaps[i].values(), mvccNum); + } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); + addedSize += applyFamilyMapToMemstore(familyMaps[i]); } // ------------------------------- @@ -3722,6 +3729,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi manifest.addRegion(this); } + private void updateSequenceId(final Iterable> cellItr, final long sequenceId) + throws IOException { + for (List cells : cellItr) { + if (cells == null) { + return; + } + for (Cell cell : cells) { + CellUtil.setSequenceId(cell, sequenceId); + } + } + } + @Override public void updateCellTimestamps(final Iterable> cellItr, final byte[] now) throws IOException { @@ -3846,8 +3865,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * new entries. * @throws IOException */ - private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, boolean isInReplay) throws IOException { + private long applyFamilyMapToMemstore(Map> familyMap) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3855,14 +3873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List cells = e.getValue(); assert cells instanceof RandomAccess; Store store = getStore(family); - int listSize = cells.size(); - for (int i=0; i < listSize; i++) { - Cell cell = cells.get(i); - if (cell.getSequenceId() == 0 || isInReplay) { - CellUtil.setSequenceId(cell, mvccNum); - } - size += store.add(cell); - } + size += store.add(cells); } return size; @@ -7520,9 +7531,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi recordMutationWithoutWal(mutate.getFamilyCellMap()); } } + boolean updateSeqId = false; if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendEmptyEdit(this.wal); + // If no WAL, FSWALEntry won't be used and no update for sequence id + updateSeqId = true; } // Do a get on the write entry... this will block until sequenceid is assigned... w/o it, // TestAtomicOperation fails. @@ -7533,21 +7547,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi writeEntry.getWriteNumber()); } + if (updateSeqId) { + updateSequenceId(tempMemstore.values(), writeEntry.getWriteNumber()); + } + // Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 size += store.upsert(entry.getValue(), getSmallestReadPoint()); } else { // otherwise keep older versions around - for (Cell cell: entry.getValue()) { - // This stamping of sequenceid seems redundant; it is happening down in - // FSHLog when we consume edits off the ring buffer. - CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); - size += store.add(cell); + size += store.add(entry.getValue()); + if (!entry.getValue().isEmpty()) { doRollBackMemstore = true; } } @@ -7746,6 +7760,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } + boolean updateSeqId = false; // Actually write to WAL now. If walEdits is non-empty, we write the WAL. if (walEdits != null && !walEdits.isEmpty()) { // Using default cluster id, as this can only happen in the originating cluster. @@ -7759,6 +7774,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendEmptyEdit(this.wal); + // If no WAL, FSWALEntry won't be used and no update for sequence id + updateSeqId = true; } // Get WriteEntry. Will wait on assign of the sequence id. WriteEntry writeEntry = walKey.getWriteEntry(); @@ -7768,6 +7785,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi writeEntry.getWriteNumber()); } + if (updateSeqId) { + updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); + } + // Now write to memstore, a family at a time. for (Map.Entry> entry: forMemStore.entrySet()) { Store store = entry.getKey(); @@ -7778,10 +7799,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // TODO: St.Ack 20151222 Why no rollback in this case? } else { // Otherwise keep older versions around - for (Cell cell: results) { - // Why we need this? - CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); - accumulatedResultSize += store.add(cell); + accumulatedResultSize += store.add(entry.getValue()); + if (!entry.getValue().isEmpty()) { doRollBackMemstore = true; } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index b598f09..6ee6bb5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -666,6 +666,16 @@ public class HStore implements Store { } @Override + public long add(Iterable cells) { + lock.readLock().lock(); + try { + return this.memstore.add(cells); + } finally { + lock.readLock().unlock(); + } + } + + @Override public long timeOfOldestEdit() { return memstore.timeOfOldestEdit(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 4901b2a..a885d79 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -73,6 +73,13 @@ public interface MemStore extends HeapSize { long add(final Cell cell); /** + * Write the updates + * @param cells + * @return approximate size of the passed cell. + */ + long add(Iterable cells); + + /** * @return Oldest timestamp of all the Cells in the MemStore */ long timeOfOldestEdit(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 1b41eb0..c78e0e9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -151,6 +151,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf long add(Cell cell); /** + * Adds the specified value to the memstore + * @param cells + * @return memstore size delta + */ + long add(Iterable cells); + + /** * When was the last edit done in the memstore */ long timeOfOldestEdit(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 039c781..69fdf6a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -4120,6 +4121,67 @@ public class TestFromClientSide { assertEquals(true, ok); } + private List doAppend(final boolean walUsed) throws IOException { + LOG.info("Starting testAppend, walUsed is " + walUsed); + final TableName TABLENAME = TableName.valueOf(walUsed ? "testAppendWithWAL" : "testAppendWithoutWAL"); + Table t = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] row1 = Bytes.toBytes("c"); + final byte[] row2 = Bytes.toBytes("b"); + final byte[] row3 = Bytes.toBytes("a"); + final byte[] qual = Bytes.toBytes("qual"); + Put put_0 = new Put(row2); + put_0.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Put put_1 = new Put(row3); + put_1.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Append append_0 = new Append(row1); + append_0.add(FAMILY, qual, Bytes.toBytes("i")); + Append append_1 = new Append(row1); + append_1.add(FAMILY, qual, Bytes.toBytes("k")); + Append append_2 = new Append(row1); + append_2.add(FAMILY, qual, Bytes.toBytes("e")); + if (!walUsed) { + append_2.setDurability(Durability.SKIP_WAL); + } + Append append_3 = new Append(row1); + append_3.add(FAMILY, qual, Bytes.toBytes("a")); + Scan s = new Scan(); + s.setCaching(1); + t.append(append_0); + t.put(put_0); + t.put(put_1); + List results = new LinkedList<>(); + try (ResultScanner scanner = t.getScanner(s)) { + t.append(append_1); + t.append(append_2); + t.append(append_3); + for (Result r : scanner) { + results.add(r); + } + } + TEST_UTIL.deleteTable(TABLENAME); + return results; + } + + @Test + public void testAppendWithoutWAL() throws Exception { + List resultsWithWal = doAppend(true); + List resultsWithoutWal = doAppend(false); + assertEquals(resultsWithWal.size(), resultsWithoutWal.size()); + for (int i = 0; i != resultsWithWal.size(); ++i) { + Result resultWithWal = resultsWithWal.get(i); + Result resultWithoutWal = resultsWithoutWal.get(i); + assertEquals(resultWithWal.rawCells().length, resultWithoutWal.rawCells().length); + for (int j = 0; j != resultWithWal.rawCells().length; ++j) { + Cell cellWithWal = resultWithWal.rawCells()[j]; + Cell cellWithoutWal = resultWithoutWal.rawCells()[j]; + assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), CellUtil.cloneRow(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), CellUtil.cloneFamily(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), CellUtil.cloneQualifier(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), CellUtil.cloneValue(cellWithoutWal))); + } + } + } + /** * test for HBASE-737 * @throws IOException