From 25334fde2d6cd3b8fd84a3a51671a2f6b10b88f6 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Fri, 22 Jul 2016 10:38:43 -0700 Subject: [PATCH] HBASE-16221 Have ThriftScanner update the ConnectionCache's last used time overtime getScannerRow() to keep the connection alive for long lived scanners --- .../apache/hadoop/hbase/util/ConnectionCache.java | 14 ++++++ .../hbase/thrift2/ThriftHBaseServiceHandler.java | 2 +- .../thrift2/TestThriftHBaseServiceHandler.java | 51 ++++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java index a860f20..0659a0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java @@ -189,6 +189,20 @@ public class ConnectionCache { return connInfo; } + /** + * Updates the access time for the current connection. Used to keep Connections alive for + * long-lived scanners. + * @return whether we successfully updated the last access time + */ + public boolean updateConnectionAccessTime() { + String userName = getEffectiveUser(); + ConnectionInfo connInfo = connections.get(userName); + if (connInfo != null) { + return connInfo.updateAccessTime(); + } + return false; + } + class ConnectionInfo { final Connection connection; final String userName; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index 9dea9a5..a69a7df 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -383,8 +383,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { ex.setMessage("Invalid scanner Id"); throw ex; } - try { + connectionCache.updateConnectionAccessTime(); return resultsFromHBase(scanner.next(numRows)); } catch (IOException e) { throw getTIOError(e); diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java index 6fca282..38e3780 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java @@ -589,6 +589,57 @@ public class TestThriftHBaseServiceHandler { } } + /** + * Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow() + * should reset the ConnectionCache timeout for the scanner's connection + * @throws Exception + */ + @Test + public void testLongLivedScan() throws Exception { + int numTrials = 6; + int trialPause = 1000; + int cleanUpInterval = 100; + Configuration conf = new Configuration(UTIL.getConfiguration()); + // Set the ConnectionCache timeout to trigger halfway through the trials + conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause); + conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval); + ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf, + UserProvider.instantiate(conf)); + + ByteBuffer table = wrap(tableAname); + // insert data + TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), + wrap(valueAname)); + List columnValues = new ArrayList(); + columnValues.add(columnValue); + for (int i = 0; i < numTrials; i++) { + TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues); + handler.put(table, put); + } + + // create scan instance + TScan scan = new TScan(); + List columns = new ArrayList(); + TColumn column = new TColumn(); + column.setFamily(familyAname); + column.setQualifier(qualifierAname); + columns.add(column); + scan.setColumns(columns); + scan.setStartRow("testScan".getBytes()); + scan.setStopRow("testScan\uffff".getBytes()); + // Prevent the scanner from caching results + scan.setCaching(1); + + // get scanner and rows + int scanId = handler.openScanner(table, scan); + for (int i = 0; i < numTrials; i++) { + // Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout + List results = handler.getScannerRows(scanId, 1); + assertArrayEquals(("testScan" + i).getBytes(), results.get(0).getRow()); + Thread.sleep(trialPause); + } + } + @Test public void testReverseScan() throws Exception { ThriftHBaseServiceHandler handler = createHandler(); -- 2.8.0-rc2