From 9a5135f3ccce443a8664a200dae3cbb6373b7632 Mon Sep 17 00:00:00 2001 From: eshcar Date: Mon, 19 Dec 2016 16:16:08 +0200 Subject: [PATCH] Scan-memory-first optimization --- .../hadoop/hbase/regionserver/RSRpcServices.java | 85 +++++++++++++++++----- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a61a9f2..231635d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -38,6 +38,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -1093,6 +1094,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); + memoryScansOptimization = rs.conf.getBoolean(MEMORY_SCAN_OPTIMIZATION_KEY, false); + InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); @@ -2323,6 +2326,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private static AtomicInteger MEMORY_SCANS = new AtomicInteger(0); + private static AtomicInteger FULL_SCANS = new AtomicInteger(0); + private static String MEMORY_SCAN_OPTIMIZATION_KEY = "regionserver.memory.scan.optimization"; + private boolean memoryScansOptimization = false; private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, RpcCallContext context) throws IOException { region.prepareGet(get); @@ -2336,31 +2343,53 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } long before = EnvironmentEdgeManager.currentTime(); - Scan scan = new Scan(get); - if (scan.getLoadColumnFamiliesOnDemandValue() == null) { - scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); - } RegionScanner scanner = null; + RegionScanner internalScanner = null; try { - scanner = region.getScanner(scan); - scanner.next(results); + int size = 0; + int memScansCount = 0; + if(memoryScansOptimization) { + InternalScan internalScan = new InternalScan(get); + internalScan.checkOnlyMemStore(); + if (internalScan.getLoadColumnFamiliesOnDemandValue() == null) { + internalScan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + size = results.size(); + internalScanner = region.getScanner(internalScan); + internalScanner.next(results); + memScansCount = MEMORY_SCANS.incrementAndGet(); + } + if(!memoryScansOptimization || + (memoryScansOptimization && results.size() <= size)) { + Scan scan = new Scan(get); + if (scan.getLoadColumnFamiliesOnDemandValue() == null) { + scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); + } + scanner = region.getScanner(scan); + scanner.next(results); + if(memoryScansOptimization) { + int fullScansCount = FULL_SCANS.incrementAndGet(); + if (fullScansCount % 20000 == 0) { + LOG.info("ESHCAR memScansCount=" + + memScansCount + " fullScansCount=" + fullScansCount); + } + } + } } finally { if (scanner != null) { - if (closeCallBack == null) { - // If there is a context then the scanner can be added to the current - // RpcCallContext. The rpc callback will take care of closing the - // scanner, for eg in case - // of get() - assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback; - context.setCallBack((RegionScannerImpl) scanner); - } else { - // The call is from multi() where the results from the get() are - // aggregated and then send out to the - // rpc. The rpccall back will close all such scanners created as part - // of multi(). - closeCallBack.addScanner(scanner); + // Executed a full scan: + // (1) add the full scan to call back context + // (2) close memory scan + addScannerToCallBackContext(closeCallBack, context, scanner); + if (internalScanner != null) { + internalScanner.close(); } } + // Only executed a memory scan: + // add the memory scan to call back context; there it is closed + else if (internalScanner != null) { + addScannerToCallBackContext(closeCallBack, context, internalScanner); + } } // post-get CP hook @@ -2371,6 +2400,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } + private void addScannerToCallBackContext(RegionScannersCloseCallBack closeCallBack, + RpcCallContext context, RegionScanner scanner) { + if (closeCallBack == null) { + // If there is a context then the scanner can be added to the current + // RpcCallContext. The rpc callback will take care of closing the + // scanner, for eg in case + // of get() + assert scanner instanceof RpcCallback; + context.setCallBack((RegionScannerImpl) scanner); + } else { + // The call is from multi() where the results from the get() are + // aggregated and then send out to the + // rpc. The rpccall back will close all such scanners created as part + // of multi(). + closeCallBack.addScanner(scanner); + } + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * -- 2.10.1 (Apple Git-78)