diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c7bf78d486..066f7f266e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -100,6 +100,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final List heapsForDelayedClose = new ArrayList<>(); /** + * If we close the memstore scanners before sending data to client, the chuck may be reclaimed + * by other updates and the data will be corrupt. + */ + private final List scannerForDelayedClose = new ArrayList<>(); + /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not * KVs skipped via seeking to next row/column. TODO: estimate them? */ @@ -492,6 +497,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (withHeapClose) { this.closing = true; } + clearAndClose(scannerForDelayedClose); // Under test, we dont have a this.store if (this.store != null) { this.store.deleteChangedReaderObserver(this); @@ -879,7 +885,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // remove the older memstore scanner for (int i = currentScanners.size() - 1; i >=0; i--) { if (!currentScanners.get(i).isFileScanner()) { - currentScanners.remove(i).close(); + scannerForDelayedClose.add(currentScanners.remove(i)); } else { // we add the memstore scanner to the end of currentScanners break; @@ -1111,6 +1117,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void shipped() throws IOException { + clearAndClose(scannerForDelayedClose); if (prevCell != null) { // Do the copy here so that in case the prevCell ref is pointing to the previous // blocks we can safely release those blocks. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index ff213b8fe9..6d11f9bd2b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; @@ -45,7 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -192,7 +193,7 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); @@ -1154,6 +1155,62 @@ public class TestStore { } } + @Test + public void testReclaimChunkWhenScaning() throws IOException { + init("testReclaimChunkWhenScaning"); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), null); + store.add(createCell(qf2, ts, seqId, value), null); + store.add(createCell(qf3, ts, seqId, value), null); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId)) { + List results = new MyList<>(size -> { + switch (size) { + // 1) we get the first cell (qf1) + // 2) flush the data to have StoreScanner update inner scanners + // 3) the chunk will be reclaimed after updaing + case 1: + try { + flushStore(store, id++); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + // 1) we get the second cell (qf2) + // 2) add some cell to fill some byte into the chunk (we have only one chunk) + case 2: + try { + byte[] newValue = Bytes.toBytes("newValue"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + default: + break; + } + }); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } + } + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1221,4 +1278,82 @@ public class TestStore { } } } + private static class MyList implements List { + private final List delegatee = new ArrayList<>(); + private final Consumer hookAtAdd; + MyList(final Consumer hookAtAdd) { + this.hookAtAdd = hookAtAdd; + } + @Override + public int size() {return delegatee.size();} + + @Override + public boolean isEmpty() {return delegatee.isEmpty();} + + @Override + public boolean contains(Object o) {return delegatee.contains(o);} + + @Override + public Iterator iterator() {return delegatee.iterator();} + + @Override + public Object[] toArray() {return delegatee.toArray();} + + @Override + public T[] toArray(T[] a) {return delegatee.toArray(a);} + + @Override + public boolean add(T e) { + hookAtAdd.accept(size()); + return delegatee.add(e); + } + + @Override + public boolean remove(Object o) {return delegatee.remove(o);} + + @Override + public boolean containsAll(Collection c) {return delegatee.containsAll(c);} + + @Override + public boolean addAll(Collection c) {return delegatee.addAll(c);} + + @Override + public boolean addAll(int index, Collection c) {return delegatee.addAll(index, c);} + + @Override + public boolean removeAll(Collection c) {return delegatee.removeAll(c);} + + @Override + public boolean retainAll(Collection c) {return delegatee.retainAll(c);} + + @Override + public void clear() {delegatee.clear();} + + @Override + public T get(int index) {return delegatee.get(index);} + + @Override + public T set(int index, T element) {return delegatee.set(index, element);} + + @Override + public void add(int index, T element) {delegatee.add(index, element);} + + @Override + public T remove(int index) {return delegatee.remove(index);} + + @Override + public int indexOf(Object o) {return delegatee.indexOf(o);} + + @Override + public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} + + @Override + public ListIterator listIterator() {return delegatee.listIterator();} + + @Override + public ListIterator listIterator(int index) {return delegatee.listIterator(index);} + + @Override + public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} + } } \ No newline at end of file