Index: src/main/java/org/apache/hadoop/hbase/client/ConnectionClosedException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ConnectionClosedException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ConnectionClosedException.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +/** + * Thrown when HConnection has been closed. + */ +public class ConnectionClosedException extends IOException { + private static final long serialVersionUID = 8792360655678089586L; + + public ConnectionClosedException() { + super(); + } + + public ConnectionClosedException(String s) { + super(s); + } +} Property changes on: src\main\java\org\apache\hadoop\hbase\client\ConnectionClosedException.java ___________________________________________________________________ Added: svn:needs-lock + * Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1229032) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -29,8 +29,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.TreeSet; @@ -787,7 +787,9 @@ private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache) throws IOException { - if (this.closed) throw new IOException(toString() + " closed"); + if (this.closed) { + throw new ConnectionClosedException(toString() + " closed"); + } if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -1231,6 +1233,9 @@ public T getRegionServerWithRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ConnectionClosedException(toString() + " closed"); + } List exceptions = new ArrayList(); for(int tries = 0; tries < numRetries; tries++) { try { @@ -1256,6 +1261,9 @@ public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { + if (this.closed) { + throw new ConnectionClosedException(toString() + " closed"); + } try { callable.instantiateServer(false); return callable.call(); @@ -1295,7 +1303,6 @@ final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { - // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1229032) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -115,8 +115,10 @@ private ExecutorService pool; // For Multi private long maxScannerResultSize; private boolean closed; + private long pause; + private int numRetries; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. - + /** * Creates an object to access a HBase table. * Internally it creates a new instance of {@link Configuration} and a new @@ -215,6 +217,10 @@ new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); this.closed = false; + this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); } /** @@ -296,7 +302,7 @@ */ public HRegionLocation getRegionLocation(final String row) throws IOException { - return connection.getRegionLocation(tableName, Bytes.toBytes(row), false); + return getRegionLocation(tableName, Bytes.toBytes(row), false); } /** @@ -307,7 +313,7 @@ */ public HRegionLocation getRegionLocation(final byte [] row) throws IOException { - return connection.getRegionLocation(tableName, row, false); + return getRegionLocation(tableName, row, false); } @Override @@ -544,7 +550,7 @@ @Override public Result getRowOrBefore(final byte[] row, final byte[] family) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Result call() throws IOException { return server.getClosestRowBefore(location.getRegionInfo().getRegionName(), @@ -576,7 +582,7 @@ } public Result get(final Get get) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, get.getRow()) { public Result call() throws IOException { return server.get(location.getRegionInfo().getRegionName(), get); @@ -619,7 +625,7 @@ @Override public synchronized void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - connection.processBatch(actions, tableName, pool, results); + processBatch(actions, tableName, pool, results); } /** @@ -633,7 +639,7 @@ @Override public synchronized Object[] batch(final List actions) throws InterruptedException, IOException { Object[] results = new Object[actions.size()]; - connection.processBatch(actions, tableName, pool, results); + processBatch(actions, tableName, pool, results); return results; } @@ -647,7 +653,7 @@ @Override public void delete(final Delete delete) throws IOException { - connection.getRegionServerWithRetries( + getRegionServerWithRetries( new ServerCallable(connection, tableName, delete.getRow()) { public Boolean call() throws IOException { server.delete(location.getRegionInfo().getRegionName(), delete); @@ -672,7 +678,7 @@ throws IOException { Object[] results = new Object[deletes.size()]; try { - connection.processBatch((List) deletes, tableName, pool, results); + processBatch((List) deletes, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -722,7 +728,7 @@ throw new IOException( "Invalid arguments to increment, no columns specified"); } - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, increment.getRow()) { public Result call() throws IOException { return server.increment( @@ -753,7 +759,7 @@ throw new IOException( "Invalid arguments to incrementColumnValue", npe); } - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Long call() throws IOException { return server.incrementColumnValue( @@ -782,7 +788,7 @@ final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { return server.checkAndPut(location.getRegionInfo().getRegionName(), @@ -810,7 +816,7 @@ final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { return server.checkAndDelete( @@ -835,7 +841,7 @@ */ @Override public boolean exists(final Get get) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, get.getRow()) { public Boolean call() throws IOException { return server. @@ -857,6 +863,9 @@ public void flushCommits() throws IOException { try { connection.processBatchOfPuts(writeBuffer, tableName, pool); + } catch (IOException ioe) { + handleConnectionClosedException(ioe); + throw ioe; } finally { if (clearBufferOnFail) { writeBuffer.clear(); @@ -903,7 +912,7 @@ @Override public RowLock lockRow(final byte [] row) throws IOException { - return connection.getRegionServerWithRetries( + return getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public RowLock call() throws IOException { long lockId = @@ -917,7 +926,7 @@ @Override public void unlockRow(final RowLock rl) throws IOException { - connection.getRegionServerWithRetries( + getRegionServerWithRetries( new ServerCallable(connection, tableName, rl.getRow()) { public Boolean call() throws IOException { server.unlockRow(location.getRegionInfo().getRegionName(), @@ -1103,7 +1112,7 @@ // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); - getConnection().getRegionServerWithRetries(callable); + getRegionServerWithRetries(callable); this.callable = null; } @@ -1139,7 +1148,7 @@ callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region - getConnection().getRegionServerWithRetries(callable); + getRegionServerWithRetries(callable); this.currentRegion = callable.getHRegionInfo(); } catch (IOException e) { close(); @@ -1179,14 +1188,14 @@ // Skip only the first row (which was the last row of the last // already-processed batch). callable.setCaching(1); - values = getConnection().getRegionServerWithRetries(callable); + values = getRegionServerWithRetries(callable); callable.setCaching(this.caching); skipFirst = false; } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. - values = getConnection().getRegionServerWithRetries(callable); + values = getRegionServerWithRetries(callable); } catch (DoNotRetryIOException e) { if (e instanceof UnknownScannerException) { long timeout = lastNext + scannerTimeout; @@ -1267,7 +1276,7 @@ if (callable != null) { callable.setClose(); try { - getConnection().getRegionServerWithRetries(callable); + getRegionServerWithRetries(callable); } catch (IOException e) { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to @@ -1436,4 +1445,123 @@ } }); } + + /** + * A wrapper method for HConnection#getRegionServerWithRetries. + * + * @param + * the type of the return value + * @param callable + * callable to run + * @return an object of type T + * @throws IOException + * if a remote or network exception occurs + * @throws RuntimeException + * other unspecified error + */ + public T getRegionServerWithRetries(ServerCallable callable) + throws IOException, RuntimeException { + try { + return this.connection.getRegionServerWithRetries(callable); + } catch (IOException ioe) { + handleConnectionClosedException(ioe); + throw ioe; + } + } + + /** + * A wrapper method for HConnection#processBatch. + * + * @param actions + * The collection of actions. + * @param tableName + * Name of the hbase table + * @param pool + * thread pool for parallel execution + * @param results + * An empty array, same size as list. If an exception is thrown, you + * can test here for partial results, and to determine which actions + * processed successfully. + * @throws IOException + * if there are problems talking to META. Per-item exceptions are + * stored in the results array. + */ + public void processBatch(List actions, final byte[] tableName, + ExecutorService pool, Object[] results) throws IOException, + InterruptedException { + try { + this.connection.processBatch(actions, tableName, pool, results); + } catch (IOException ioe) { + handleConnectionClosedException(ioe); + throw ioe; + } + } + + /** + * A wrapper method for HConnection#getRegionLocation. + * + * @param tableName + * table name + * @param row + * Row to find. + * @param reload + * If true do not use cache, otherwise bypass. + * @return Location of row. + * @throws IOException + * if a remote or network exception occurs + */ + public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, + boolean reload) throws IOException { + try { + return this.connection.getRegionLocation(tableName, row, reload); + } catch (IOException ioe) { + handleConnectionClosedException(ioe); + throw ioe; + } + } + + /** + * If a ConnectionClosedException occur, we should recreate the Connection. + * See HBASE-5153. + * + * @param ioe + * The IOE throwed by the HConnection methods. + * @throws RetriesExhaustedException + * @throws IOException + * if the IOE is not a ConnectionClosedException, then throws it. + */ + private void handleConnectionClosedException(IOException ioe) + throws RetriesExhaustedException { + if (ioe instanceof ConnectionClosedException) { + for (int tries = 0; !this.closed; tries++) { + if (tries >= numRetries) { + LOG.warn("Connection can't be recreated. Please verify your environment."); + throw new RetriesExhaustedException( + "Retry to recreate connection in HTable, but failed."); + } + try { + this.connection = HConnectionManager.getConnection(this + .getConfiguration()); + LOG.info("Connection recreated successfully."); + break; + } catch (ZooKeeperConnectionException e) { + LOG.info("Retrying to recreate connection. Already tried " + tries + + " time(s)."); + try { + Thread.sleep(getPauseTime(tries)); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } + } + } + + private long getPauseTime(int tries) { + int ntries = tries; + if (ntries >= HConstants.RETRY_BACKOFF.length) { + ntries = HConstants.RETRY_BACKOFF.length - 1; + } + return this.pause * HConstants.RETRY_BACKOFF[ntries]; + } } Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1229032) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; @@ -63,7 +64,6 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; @@ -4032,6 +4032,38 @@ queue.put(new Object()); } + /** + * Test HConnection can be re-created in HTable after this connection has been + * closed. + * @throws IOException + */ + @Test + public void testHTableConnectionRecovery() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + // At this time, abort the connection. + HConnection conn1 = table.getConnection(); + conn1.abort("Test Connection Abort", new IOException("TestIOE")); + // This put will fail, but trigger to create a new connection. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + LOG.info( + "It's an expected IOE, for the original connection has been closed.", + ioe); + } + // Get the connection again, and check whether it has changed. + HConnection conn2 = table.getConnection(); + assertTrue(conn1 != conn2); + } + }