From 65e50640766fc170d11140cb88254d5e99263dc9 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 16 May 2017 17:01:37 +0800 Subject: [PATCH] HBASE-18055 Release HFileBlocks before calling shipped when switching from pread to stream causes rpc return value corruption --- .../hadoop/hbase/regionserver/StoreScanner.java | 76 ++++++++++++---------- .../hbase/regionserver/TestSwitchToStreamRead.java | 2 + 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e9feb37..953e911 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -146,7 +146,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private volatile boolean flushed = false; // generally we get one file from a flush private final List flushedStoreFiles = new ArrayList<>(1); - // generally we get one memstroe scanner from a flush + // generally we get one memstore scanner from a flush private final List memStoreScannersAfterFlush = new ArrayList<>(1); // The current list of scanners private final List currentScanners = new ArrayList<>(); @@ -539,7 +539,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); } - trySwitchToStreamRead(); if (checkFlushed() && reopenAfterFlush()) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } @@ -954,7 +953,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.reseek(kv); } - private void trySwitchToStreamRead() throws IOException { + private void trySwitchToStreamRead() { if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || bytesRead < preadMaxBytes) { return; @@ -971,44 +970,45 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } List filesToReopen = new ArrayList<>(); List memstoreScanners = new ArrayList<>(); - List fileScanners = null; List scannersToClose = new ArrayList<>(); - boolean succ = false; - try { - for (KeyValueScanner kvs : currentScanners) { - if (!kvs.isFileScanner()) { - memstoreScanners.add(kvs); - } else { - scannersToClose.add(kvs); - if (kvs.peek() == null) { - continue; - } - filesToReopen.add(name2File.get(kvs.getFilePath().getName())); + for (KeyValueScanner kvs : currentScanners) { + if (!kvs.isFileScanner()) { + memstoreScanners.add(kvs); + } else { + scannersToClose.add(kvs); + if (kvs.peek() == null) { + continue; } + filesToReopen.add(name2File.get(kvs.getFilePath().getName())); } - if (filesToReopen.isEmpty()) { - return; - } + } + if (filesToReopen.isEmpty()) { + return; + } + List fileScanners = null; + List newCurrentScanners; + KeyValueHeap newHeap; + try { fileScanners = store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false); seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); - currentScanners.clear(); - addCurrentScanners(fileScanners); - addCurrentScanners(memstoreScanners); - resetKVHeap(currentScanners, store.getComparator()); - resetQueryMatcher(lastTop); - for (KeyValueScanner kvs : scannersToClose) { - kvs.close(); - } - succ = true; - } finally { - if (!succ && fileScanners != null) { - for (KeyValueScanner scanner : fileScanners) { - scanner.close(); - } + newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); + newCurrentScanners.addAll(fileScanners); + newCurrentScanners.addAll(memstoreScanners); + newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator()); + } catch (Exception e) { + LOG.warn("failed to switch to stream read", e); + if (fileScanners != null) { + fileScanners.forEach(KeyValueScanner::close); } + return; } + currentScanners.clear(); + addCurrentScanners(newCurrentScanners); + this.heap = newHeap; + resetQueryMatcher(lastTop); + scannersToClose.forEach(KeyValueScanner::close); } protected final boolean checkFlushed() { @@ -1117,12 +1117,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); } matcher.beforeShipped(); - for (KeyValueHeap h : this.heapsForDelayedClose) { - h.close();// There wont be further fetch of Cells from these scanners. Just close. - } + // There wont be further fetch of Cells from these scanners. Just close. + this.heapsForDelayedClose.forEach(KeyValueHeap::close); this.heapsForDelayedClose.clear(); if (this.heap != null) { this.heap.shipped(); + // When switching from pread to stream, we will open a new scanner for each store file, but + // the old scanner may still track the HFileBlocks we have scanned but not sent back to client + // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others + // before we serialize and send it back to client. The HFileBlocks will be released in shipped + // method, so we here will also open new scanners and close old scanners in shipped method. + // See HBASE-18055 for more details. + trySwitchToStreamRead(); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java index fb978b1..767ad2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -104,6 +104,7 @@ public class TestSwitchToStreamRead { Result result = Result.create(cells); assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); cells.clear(); + scanner.shipped(); } for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { if (kvs instanceof StoreFileScanner) { @@ -117,6 +118,7 @@ public class TestSwitchToStreamRead { Result result = Result.create(cells); assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); cells.clear(); + scanner.shipped(); } } // make sure all scanners are closed. -- 2.7.4