From bc0c6ad74eb244527667999f5dec34a80207d6dd Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 6 Jun 2017 03:44:12 +0800 Subject: [PATCH] HBASE-18145 The flush may cause the corrupt data for reading --- .../hadoop/hbase/regionserver/StoreScanner.java | 27 ++-- .../hadoop/hbase/regionserver/TestStore.java | 139 ++++++++++++++++++++- 2 files changed, 150 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c7bf78d486..f320eb6cff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -95,9 +95,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final long maxRowSize; private final long cellsPerHeartbeatCheck; - // Collects all the KVHeap that are eagerly getting closed during the - // course of a scan - private final List heapsForDelayedClose = new ArrayList<>(); + // 1) Collects all the KVHeap that are eagerly getting closed during the + // course of a scan + // 2) Collects the unused memstore scanners. If we close the memstore scanners + // before sending data to client, the chunk may be reclaimed by other + // updates and the data will be corrupt. + private final List scannersForDelayedClose = new ArrayList<>(); /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -485,23 +488,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner close(true); } - private void close(boolean withHeapClose) { + private void close(boolean withDelayedScannersClose) { if (this.closing) { return; } - if (withHeapClose) { + if (withDelayedScannersClose) { this.closing = true; } // Under test, we dont have a this.store if (this.store != null) { this.store.deleteChangedReaderObserver(this); } - if (withHeapClose) { + if (withDelayedScannersClose) { + clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close(); - } - this.heapsForDelayedClose.clear(); if (this.heap != null) { this.heap.close(); this.currentScanners.clear(); @@ -509,7 +509,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } else { if (this.heap != null) { - this.heapsForDelayedClose.add(this.heap); + this.scannersForDelayedClose.add(this.heap); this.currentScanners.clear(); this.heap = null; } @@ -879,7 +879,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // remove the older memstore scanner for (int i = currentScanners.size() - 1; i >=0; i--) { if (!currentScanners.get(i).isFileScanner()) { - currentScanners.remove(i).close(); + scannersForDelayedClose.add(currentScanners.remove(i)); } else { // we add the memstore scanner to the end of currentScanners break; @@ -1121,8 +1121,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } matcher.beforeShipped(); // There wont be further fetch of Cells from these scanners. Just close. - this.heapsForDelayedClose.forEach(KeyValueHeap::close); - this.heapsForDelayedClose.clear(); + clearAndClose(scannersForDelayedClose); if (this.heap != null) { this.heap.shipped(); // When switching from pread to stream, we will open a new scanner for each store file, but diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index ff213b8fe9..6d11f9bd2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; @@ -45,7 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -192,7 +193,7 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); @@ -1154,6 +1155,62 @@ public class TestStore { } } + @Test + public void testReclaimChunkWhenScaning() throws IOException { + init("testReclaimChunkWhenScaning"); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value), null); + store.add(createCell(qf2, ts, seqId, value), null); + store.add(createCell(qf3, ts, seqId, value), null); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId)) { + List results = new MyList<>(size -> { + switch (size) { + // 1) we get the first cell (qf1) + // 2) flush the data to have StoreScanner update inner scanners + // 3) the chunk will be reclaimed after updaing + case 1: + try { + flushStore(store, id++); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + // 1) we get the second cell (qf2) + // 2) add some cell to fill some byte into the chunk (we have only one chunk) + case 2: + try { + byte[] newValue = Bytes.toBytes("newValue"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null); + store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + default: + break; + } + }); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } + } + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1221,4 +1278,82 @@ public class TestStore { } } } + private static class MyList implements List { + private final List delegatee = new ArrayList<>(); + private final Consumer hookAtAdd; + MyList(final Consumer hookAtAdd) { + this.hookAtAdd = hookAtAdd; + } + @Override + public int size() {return delegatee.size();} + + @Override + public boolean isEmpty() {return delegatee.isEmpty();} + + @Override + public boolean contains(Object o) {return delegatee.contains(o);} + + @Override + public Iterator iterator() {return delegatee.iterator();} + + @Override + public Object[] toArray() {return delegatee.toArray();} + + @Override + public T[] toArray(T[] a) {return delegatee.toArray(a);} + + @Override + public boolean add(T e) { + hookAtAdd.accept(size()); + return delegatee.add(e); + } + + @Override + public boolean remove(Object o) {return delegatee.remove(o);} + + @Override + public boolean containsAll(Collection c) {return delegatee.containsAll(c);} + + @Override + public boolean addAll(Collection c) {return delegatee.addAll(c);} + + @Override + public boolean addAll(int index, Collection c) {return delegatee.addAll(index, c);} + + @Override + public boolean removeAll(Collection c) {return delegatee.removeAll(c);} + + @Override + public boolean retainAll(Collection c) {return delegatee.retainAll(c);} + + @Override + public void clear() {delegatee.clear();} + + @Override + public T get(int index) {return delegatee.get(index);} + + @Override + public T set(int index, T element) {return delegatee.set(index, element);} + + @Override + public void add(int index, T element) {delegatee.add(index, element);} + + @Override + public T remove(int index) {return delegatee.remove(index);} + + @Override + public int indexOf(Object o) {return delegatee.indexOf(o);} + + @Override + public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} + + @Override + public ListIterator listIterator() {return delegatee.listIterator();} + + @Override + public ListIterator listIterator(int index) {return delegatee.listIterator(index);} + + @Override + public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} + } } \ No newline at end of file -- 2.13.0.windows.1