From 45708244687d7ee00bd8acd862f46740654efc4b Mon Sep 17 00:00:00 2001 From: eyjian Date: Tue, 6 Dec 2016 20:02:31 +0800 Subject: [PATCH] HBASE-17182 Remove timeout scanner from scannerMap to release memory --- .../hbase/thrift2/ThriftHBaseServiceHandler.java | 101 +++++++++++++++++++-- .../thrift2/TestThriftHBaseServiceHandler.java | 76 +++++++++++----- 2 files changed, 147 insertions(+), 30 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index a69a7df..f085da5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -40,6 +40,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -70,6 +71,7 @@ import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; import org.apache.hadoop.hbase.thrift2.generated.TScan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConnectionCache; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.thrift.TException; /** @@ -86,9 +88,9 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { // nextScannerId and scannerMap are used to manage scanner state // TODO: Cleanup thread for Scanners, Scanner id wrap private final AtomicInteger nextScannerId = new AtomicInteger(0); - private final Map scannerMap = - new ConcurrentHashMap(); - + private final Map scannerMap = + new ConcurrentHashMap(); + private final CleanScannerThread cleanScannerThread; private final ConnectionCache connectionCache; static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; @@ -136,6 +138,10 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); connectionCache = new ConnectionCache( conf, userProvider, cleanInterval, maxIdleTime); + + // Start clean scanner thread + cleanScannerThread = new CleanScannerThread(); + cleanScannerThread.start(); } private Table getTable(ByteBuffer tableName) { @@ -175,7 +181,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { */ private int addScanner(ResultScanner scanner) { int id = nextScannerId.getAndIncrement(); - scannerMap.put(id, scanner); + ScannerInfo scannerInfo = new ScannerInfo(scanner); + scannerMap.put(id, scannerInfo); return id; } @@ -185,7 +192,12 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { * @return a Scanner, or null if the Id is invalid */ private ResultScanner getScanner(int id) { - return scannerMap.get(id); + ScannerInfo scannerInfo = scannerMap.get(id); + if (scannerInfo == null) { + return null; + } + scannerInfo.updateAccessTime(); + return scannerInfo.getScanner(); } void setEffectiveUser(String effectiveUser) { @@ -198,7 +210,11 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { * @return the removed Scanner, or null if the Id is invalid */ protected ResultScanner removeScanner(int id) { - return scannerMap.remove(id); + ScannerInfo scannerInfo = scannerMap.remove(id); + if (scannerInfo == null) { + return null; + } + return scannerInfo.getScanner(); } @Override @@ -484,4 +500,77 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { } } } + + public void stopClean() { + cleanScannerThread.stopClean(); + } + + public boolean scannerExists(Integer scannerId) { + ResultScanner scanner = getScanner(scannerId); + return scanner != null; + } + + static class ScannerInfo { + private ResultScanner resultScanner; + private long lastAccessTime; + + ScannerInfo(ResultScanner scanner) { + lastAccessTime = EnvironmentEdgeManager.currentTime(); + resultScanner = scanner; + } + + ResultScanner getScanner() { + return resultScanner; + } + + synchronized void updateAccessTime() { + lastAccessTime = EnvironmentEdgeManager.currentTime(); + } + + synchronized boolean timedOut() { + final long maxIdleTime = 60000; // 1minutes + long timeoutTime = lastAccessTime + maxIdleTime; + if (EnvironmentEdgeManager.currentTime() > timeoutTime) { + return true; + } + return false; + } + } + + class CleanScannerThread extends Thread { + private volatile boolean toStopClean; + + public void stopClean() { + toStopClean = true; + } + + @Override + public void run() { + toStopClean = false; + + while (!toStopClean) { + try { + Thread.sleep(1000); + + Iterator> iter = scannerMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + ScannerInfo scannerInfo = entry.getValue(); + if (scannerInfo.timedOut()) { + Integer scannerId = entry.getKey(); + String str = String.format("Remove timeout scanner(%d).", scannerId); + LOG.info(str); + // ResultScanner scanner = scannerInfo.getScanner(); + // scanner.close(); // Close will throw UnknownScannerException + iter.remove(); + } + } + } catch (InterruptedException e) { + // LOG.error(e); + } + } + + LOG.info("Clean timeout scanner thread exit."); + } + } } diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java index 38e3780..745ebcc 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java @@ -18,6 +18,29 @@ */ package org.apache.hadoop.hbase.thrift2; +import static java.nio.ByteBuffer.wrap; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift; +import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,12 +50,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.test.MetricsAssertHelper; @@ -46,17 +69,17 @@ import org.apache.hadoop.hbase.thrift2.generated.TColumnValue; import org.apache.hadoop.hbase.thrift2.generated.TCompareOp; import org.apache.hadoop.hbase.thrift2.generated.TDelete; import org.apache.hadoop.hbase.thrift2.generated.TDeleteType; +import org.apache.hadoop.hbase.thrift2.generated.TDurability; import org.apache.hadoop.hbase.thrift2.generated.TGet; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.thrift2.generated.TIOError; import org.apache.hadoop.hbase.thrift2.generated.TIllegalArgument; import org.apache.hadoop.hbase.thrift2.generated.TIncrement; +import org.apache.hadoop.hbase.thrift2.generated.TMutation; import org.apache.hadoop.hbase.thrift2.generated.TPut; import org.apache.hadoop.hbase.thrift2.generated.TResult; -import org.apache.hadoop.hbase.thrift2.generated.TScan; -import org.apache.hadoop.hbase.thrift2.generated.TMutation; import org.apache.hadoop.hbase.thrift2.generated.TRowMutations; -import org.apache.hadoop.hbase.thrift2.generated.TDurability; +import org.apache.hadoop.hbase.thrift2.generated.TScan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.thrift.TException; import org.junit.AfterClass; @@ -65,24 +88,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.putFromThrift; -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.scanFromThrift; -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; -import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; -import static org.junit.Assert.*; -import static java.nio.ByteBuffer.wrap; - /** * Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 * package. @@ -1195,5 +1200,28 @@ public class TestThriftHBaseServiceHandler { assertTColumnValueEqual(columnValueA, result.getColumnValues().get(0)); assertTColumnValueEqual(columnValueB, result.getColumnValues().get(1)); } + + @Test + public void testCleanScannerThread() throws Exception { + ThriftHBaseServiceHandler handler = createHandler(); + ByteBuffer table = wrap(tableAname); + + // create scan instance + TScan scan = new TScan(); + List columns = new ArrayList(); + TColumn column = new TColumn(); + column.setFamily(familyAname); + column.setQualifier(qualifierAname); + columns.add(column); + scan.setColumns(columns); + scan.setStartRow("testScan".getBytes()); + scan.setStopRow("testScan\uffff".getBytes()); + + int scannerId = handler.openScanner(table, scan); + assertTrue(handler.scannerExists(scannerId)); + Thread.sleep(60000 + 5000); + assertFalse(handler.scannerExists(scannerId)); + handler.stopClean(); + } } -- 1.9.5.msysgit.1