Index: src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (revision 1480092) +++ src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (working copy) @@ -33,17 +33,20 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -51,6 +54,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -252,6 +256,7 @@ } public void shutdown() { + STOPPABLE.stop("Shutting down"); if (tserver != null) { tserver.stop(); tserver = null; @@ -392,7 +397,7 @@ // nextScannerId and scannerMap are used to manage scanner state protected int nextScannerId = 0; - protected HashMap scannerMap = null; + protected Map scannerMap = null; private ThriftMetrics metrics = null; private static ThreadLocal> threadLocalTables = @@ -452,30 +457,20 @@ * @param scanner * @return integer scanner id */ - protected synchronized int addScanner(ResultScanner scanner) { + protected synchronized int addScanner(Scanner scanner) { int id = nextScannerId++; scannerMap.put(id, scanner); return id; } /** - * Returns the scanner associated with the specified ID. - * - * @param id - * @return a Scanner, or null if ID was invalid. - */ - protected synchronized ResultScanner getScanner(int id) { - return scannerMap.get(id); - } - - /** * Removes the scanner associated with the specified ID from the internal * id->scanner hash-map. * * @param id * @return a Scanner, or null if ID was invalid. */ - protected synchronized ResultScanner removeScanner(int id) { + protected synchronized Scanner removeScanner(int id) { return scannerMap.remove(id); } @@ -491,8 +486,14 @@ protected HBaseHandler(final Configuration c) throws IOException { this.conf = c; admin = new HBaseAdmin(conf); - scannerMap = new HashMap(); + scannerMap = new ConcurrentHashMap(); this.coalescer = new IncrementCoalescer(this); + + int scannerTimeout = + this.conf.getInt("hbase.thrift.scanner.cleaner.timeout", 1000 * 60 * 10); + ScannerCleaner cleaner = + new ScannerCleaner("Scanner cleaner", scannerTimeout, STOPPABLE); + cleaner.start(); } @Override @@ -1086,21 +1087,20 @@ public void scannerClose(int id) throws IOError, IllegalArgument { LOG.debug("scannerClose: id=" + id); - ResultScanner scanner = getScanner(id); + Scanner scanner = removeScanner(id); if (scanner == null) { String message = "scanner ID is invalid"; LOG.warn(message); throw new IllegalArgument("scanner ID is invalid"); } - scanner.close(); - removeScanner(id); + scanner.scanner.close(); } @Override public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { LOG.debug("scannerGetList: id=" + id); - ResultScanner scanner = getScanner(id); + Scanner scanner = removeScanner(id); if (null == scanner) { String message = "scanner ID is invalid"; LOG.warn(message); @@ -1109,14 +1109,17 @@ Result [] results = null; try { - results = scanner.next(nbRows); + results = scanner.scanner.next(nbRows); if (null == results) { + scanner.scanner.close(); return new ArrayList(); } } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); } + scannerMap.put(id, new Scanner( + scanner.scanner, System.currentTimeMillis(), scanner.table)); return ThriftUtilities.rowResultFromHBase(results); } @@ -1159,7 +1162,9 @@ scan.setFilter( parseFilter.parseFilterString(tScan.getFilterString())); } - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1184,7 +1189,9 @@ } } } - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1210,7 +1217,9 @@ } } } - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1240,7 +1249,9 @@ } } } - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1266,7 +1277,9 @@ } } } - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1294,7 +1307,9 @@ } } scan.setTimeRange(Long.MIN_VALUE, timestamp); - return addScanner(table.getScanner(scan)); + Scanner scanner = new Scanner(table.getScanner(scan), + System.currentTimeMillis(), Bytes.toString(table.getTableName())); + return addScanner(scanner); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); @@ -1416,10 +1431,67 @@ increment(tinc); } } - } + /** + * Class that's used to clean the scanners that aren't closed after the + * defined period of time. Only works for the new scanners + */ + class ScannerCleaner extends Chore { + final int timeout; + /** + * + * @param name + * @param p + * @param stopper + */ + public ScannerCleaner(String name, final int p, + final Stoppable stopper) { + super(name, p, stopper); + this.timeout = p; + } + + @Override + protected void chore() { + + // Iterate over all the current scanners + for (java.util.Map.Entry scanner : + scannerMap.entrySet()) { + Scanner scan = scanner.getValue(); + + // if over the timeout, clean + if ((System.currentTimeMillis() - scan.lastTimeUsed) > this.timeout) { + Integer key = scanner.getKey(); + LOG.info("ATTENTION we are closing a scanner that has been running for a" + + " long time on the table " + scan.table); + scannerMap.remove(key); + // This is normally expired + scan.scanner.close(); + } + } + } + } + + /** + * This class is just a wrapper around ResultScanner to carry more info about it. + * It is meant to be recreated with the same ResultScanner over and over as the + * scanner progresses. We are also including the table name to give more information + * about the scanners that timeout. + */ + class Scanner { + final ResultScanner scanner; + final long lastTimeUsed; + final String table; + + Scanner(ResultScanner scanner, long lastTimeUsed, String table) { + this.lastTimeUsed = lastTimeUsed; + this.scanner = scanner; + this.table = table; + } + } + } + /** * Adds all the attributes into the Operation object */ @@ -1448,4 +1520,23 @@ } } } + static Stoppable STOPPABLE = new Stoppable() { + final AtomicBoolean stop = new AtomicBoolean(false); + + @Override + public boolean isStopped() { + return this.stop.get(); + } + + @Override + public void stop(String why) { + LOG.info("STOPPING BECAUSE: " + why); + this.stop.set(true); + } + + }; } + + + +