Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1229984) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -802,7 +802,9 @@ private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache) throws IOException { - if (this.closed) throw new IOException(toString() + " closed"); + if (this.closed) { + throw new IOException(toString() + " closed"); + } if (tableName == null || tableName.length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); @@ -1677,7 +1679,7 @@ if (t != null) LOG.fatal(msg, t); else LOG.fatal(msg); this.aborted = true; - this.closed = true; + HConnectionManager.deleteStaleConnection(this); } @Override Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1229984) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -104,7 +105,7 @@ private HConnection connection; private final byte [] tableName; private volatile Configuration configuration; - private final ArrayList writeBuffer = new ArrayList(); + private final List writeBuffer = new ArrayList(); private long writeBufferSize; private boolean clearBufferOnFail; private boolean autoFlush; @@ -114,6 +115,8 @@ private ExecutorService pool; // For Multi private boolean closed; private int operationTimeout; + private long pause; + private int numRetries; private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts. private final boolean cleanupOnClose; // close the connection in close() @@ -220,6 +223,11 @@ this.maxKeyValueSize = this.configuration.getInt( "hbase.client.keyvalue.maxsize", -1); + this.pause = this.configuration.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = this.configuration.getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.closed = false; } @@ -649,7 +657,7 @@ @Override public synchronized void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - connection.processBatch(actions, tableName, pool, results); + processBatch(actions, tableName, pool, results); } /** @@ -658,7 +666,7 @@ @Override public synchronized Object[] batch(final List actions) throws InterruptedException, IOException { Object[] results = new Object[actions.size()]; - connection.processBatch(actions, tableName, pool, results); + processBatch(actions, tableName, pool, results); return results; } @@ -668,6 +676,7 @@ @Override public void delete(final Delete delete) throws IOException { + checkConnection(); new ServerCallable(connection, tableName, delete.getRow(), operationTimeout) { public Boolean call() throws IOException { server.delete(location.getRegionInfo().getRegionName(), delete); @@ -684,7 +693,7 @@ throws IOException { Object[] results = new Object[deletes.size()]; try { - connection.processBatch((List) deletes, tableName, pool, results); + processBatch((List) deletes, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -743,6 +752,7 @@ throw new IOException( "Invalid arguments to append, no columns specified"); } + checkConnection(); return new ServerCallable(connection, tableName, append.getRow(), operationTimeout) { public Result call() throws IOException { return server.append( @@ -760,6 +770,7 @@ throw new IOException( "Invalid arguments to increment, no columns specified"); } + checkConnection(); return new ServerCallable(connection, tableName, increment.getRow(), operationTimeout) { public Result call() throws IOException { return server.increment( @@ -795,6 +806,7 @@ throw new IOException( "Invalid arguments to incrementColumnValue", npe); } + checkConnection(); return new ServerCallable(connection, tableName, row, operationTimeout) { public Long call() throws IOException { return server.incrementColumnValue( @@ -812,6 +824,7 @@ final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { + checkConnection(); return new ServerCallable(connection, tableName, row, operationTimeout) { public Boolean call() throws IOException { return server.checkAndPut(location.getRegionInfo().getRegionName(), @@ -829,6 +842,7 @@ final byte [] family, final byte [] qualifier, final byte [] value, final Delete delete) throws IOException { + checkConnection(); return new ServerCallable(connection, tableName, row, operationTimeout) { public Boolean call() throws IOException { return server.checkAndDelete( @@ -844,6 +858,7 @@ */ @Override public boolean exists(final Get get) throws IOException { + checkConnection(); return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) { public Boolean call() throws IOException { return server. @@ -860,7 +875,7 @@ try { Object[] results = new Object[writeBuffer.size()]; try { - this.connection.processBatch(writeBuffer, tableName, pool, results); + processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -929,6 +944,7 @@ @Override public RowLock lockRow(final byte [] row) throws IOException { + checkConnection(); return new ServerCallable(connection, tableName, row, operationTimeout) { public RowLock call() throws IOException { long lockId = @@ -944,6 +960,7 @@ @Override public void unlockRow(final RowLock rl) throws IOException { + checkConnection(); new ServerCallable(connection, tableName, rl.getRow(), operationTimeout) { public Boolean call() throws IOException { server.unlockRow(location.getRegionInfo().getRegionName(), @@ -1033,7 +1050,7 @@ * Returns the write buffer. * @return The current write buffer. */ - public ArrayList getWriteBuffer() { + public List getWriteBuffer() { return writeBuffer; } @@ -1202,7 +1219,7 @@ Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable, Batch.Callback callback) throws IOException, Throwable { - + checkConnection(); // get regions covered by the row range List keys = getStartKeysInRange(startKey, endKey); connection.processExecs(protocol, keys, tableName, pool, callable, @@ -1248,4 +1265,65 @@ return operationTimeout; } + /** + * A wrapper method for HConnection#processBatch. + * + * @param actions The collection of actions. + * @param tableName Name of the hbase table + * @param pool Thread pool for parallel execution + * @param results An empty array, same size as list. If an exception is thrown, you + * can test here for partial results, and to determine which actions + * processed successfully. + * @throws IOException if there are problems talking to META. Per-item exceptions are + * stored in the results array. + */ + private void processBatch(List actions, + final byte[] tableName, ExecutorService pool, Object[] results) + throws IOException, InterruptedException { + try { + this.connection.processBatch(actions, tableName, pool, results); + } catch (IOException ioe) { + checkConnection(); + throw ioe; + } + } + + /** + * Check the connection. If it has been closed, we should recreate the + * connection. See HBASE-5153. + * + * @param ioe The IOE thrown by the HConnection methods. + * @throws RetriesExhaustedException + * @throws IOException If the IOE is not a ConnectionClosedException, + * then throws it. + */ + private void checkConnection() throws RetriesExhaustedException { + if (!this.connection.isClosed()) { + return; + } + // If the connection has been closed, then recreate it. + for (int tries = 0; !this.closed; tries++) { + if (tries >= numRetries) { + LOG.warn("Connection can't be recreated. Please verify your environment."); + throw new RetriesExhaustedException( + "Retry to recreate connection in HTable, but failed after " + tries + + " times."); + } + try { + this.connection = HConnectionManager.getConnection(this + .getConfiguration()); + LOG.info("Connection recreated successfully."); + break; + } catch (ZooKeeperConnectionException e) { + LOG.info("Retrying to recreate connection. Already tried " + tries + + " time(s)."); + try { + Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } + } + } Index: src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (revision 1229984) +++ src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (working copy) @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -127,7 +129,40 @@ ht.close(); } + /** + * Test HConnection can be re-created in HTable after this connection has been + * closed. + * + * @throws IOException + */ + @Test + public void testHTableConnectionRecovery() throws IOException { + final byte[] COLUMN_FAMILY = Bytes.toBytes("columnfam"); + final byte[] COLUMN = Bytes.toBytes("col"); + HTable table = TEST_UTIL.createTable( + Bytes.toBytes("testConnectionRecover"), new byte[][] { COLUMN_FAMILY }); + Put put01 = new Put(Bytes.toBytes("testrow1")); + put01.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put01); + // At this time, abort the connection. + HConnection conn1 = table.getConnection(); + conn1.abort("Test Connection Abort", new IOException("TestIOE")); + // This put will fail, but trigger to create a new connection. + try { + Put put02 = new Put(Bytes.toBytes("testrow1")); + put02.add(COLUMN_FAMILY, COLUMN, Bytes.toBytes("testValue")); + table.put(put02); + } catch (IOException ioe) { + LOG.info( + "It's an expected IOE, for the original connection has been closed.", + ioe); + } + // Get the connection again, and check whether it has changed. + HConnection conn2 = table.getConnection(); + assertTrue(conn1 != conn2); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();