Index: core/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java =================================================================== --- core/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (revision 941979) +++ core/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (working copy) @@ -20,15 +20,15 @@ package org.apache.hadoop.hbase; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; +import java.util.ArrayList; import java.util.List; -import java.util.ArrayList; public class TestMultiParallelPut extends MultiRegionTable { private static final byte[] VALUE = Bytes.toBytes("value"); @@ -58,8 +58,15 @@ List keys = new ArrayList(); - public void testMultiPut() throws Exception { + public void testParallelPut() throws Exception { + doATest(false); + } + public void testParallelPutWithRSAbort() throws Exception { + doATest(true); + } + public void doATest(boolean doAbort) throws Exception { + HTable table = new HTable(TEST_TABLE); table.setAutoFlush(false); table.setWriteBufferSize(10 * 1024 * 1024); @@ -73,6 +80,19 @@ table.flushCommits(); + if (doAbort) { + cluster.abortRegionServer(0); + + // try putting more keys after the abort. + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + table.flushCommits(); + } + for (byte [] k : keys ) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); @@ -88,10 +108,15 @@ HBaseAdmin admin = new HBaseAdmin(conf); ClusterStatus cs = admin.getClusterStatus(); - assertEquals(2, cs.getServers()); + int expectedServerCount = 2; + if (doAbort) + expectedServerCount = 1; + + assertEquals(expectedServerCount, cs.getServers()); for ( HServerInfo info : cs.getServerInfo()) { System.out.println(info); assertTrue( info.getLoad().getNumberOfRegions() > 10); } } + } Index: core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 941979) +++ core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -231,7 +231,7 @@ // Run HDFS shutdown on exit if this is set. We clear this out when // doing a restart() to prevent closing of HDFS. - private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); + public final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); private final String machineName; Index: core/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 941979) +++ core/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -680,7 +680,7 @@ // This block guards against two threads trying to load the meta // region at the same time. The first will load the meta region and // the second will use the value that the first one found. - synchronized(regionLockObject) { + synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. If not supposed to // be using the cache, delete any existing cached location so it won't @@ -1077,15 +1077,19 @@ return null; } - public T getRegionServerForWithoutRetries(ServerCallable callable) + public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { try { callable.instantiateServer(false); return callable.call(); } catch (Throwable t) { - t = translateException(t); + Throwable t2 = translateException(t); + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } } - return null; } @SuppressWarnings({"ConstantConditions"}) @@ -1299,9 +1303,25 @@ } } - @SuppressWarnings({"ConstantConditions"}) + /** + * Process a batch of Puts on the given executor service. + * + * @param list the puts to make - successful puts will be removed. + * @param pool thread pool to execute requests on + * + * In the case of an exception, we take different actions depending on the + * situation: + * - If the exception is a DoNotRetryException, we rethrow it and leave the + * 'list' parameter in an indeterminate state. + * - If the 'list' parameter is a singleton, we directly throw the specific + * exception for that put. + * - Otherwise, we throw a generic exception indicating that an error occurred. + * The 'list' parameter is mutated to contain those puts that did not succeed. + */ public void processBatchOfPuts(List list, final byte[] tableName, ExecutorService pool) throws IOException { + boolean singletonList = list.size() == 1; + Throwable singleRowCause = null; for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { Collections.sort(list); Map regionPuts = @@ -1367,10 +1387,19 @@ LOG.debug("Failed all from " + request.address, e); failed.addAll(request.allPuts()); } catch (ExecutionException e) { - System.out.println(e); // all go into the failed list. LOG.debug("Failed all from " + request.address, e); failed.addAll(request.allPuts()); + + // Just give up, leaving the batch put list in an untouched/semi-committed state + if (e.getCause() instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) e.getCause(); + } + + if (singletonList) { + // be richer for reporting in a 1 row case. + singleRowCause = e.getCause(); + } } } list.clear(); @@ -1391,9 +1420,13 @@ } } if (!list.isEmpty()) { + if (singletonList && singleRowCause != null) { + throw new IOException(singleRowCause); + } + // ran out of retries and didnt succeed everything! throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + - numRetries + " times. Should have detail on which Regions failed the most"); + numRetries + " times."); } } @@ -1404,7 +1437,7 @@ final HConnection connection = this; return new Callable() { public MultiPutResponse call() throws IOException { - return getRegionServerWithRetries( + return getRegionServerWithoutRetries( new ServerCallable(connection, tableName, null) { public MultiPutResponse call() throws IOException { MultiPutResponse resp = server.multiPut(puts); Index: core/src/main/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (revision 941979) +++ core/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -192,7 +192,7 @@ * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ - public T getRegionServerForWithoutRetries(ServerCallable callable) + public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException; Index: core/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 941979) +++ core/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -542,8 +542,8 @@ } finally { // the write buffer was adjusted by processBatchOfPuts currentWriteBufferSize = 0; - for (Put aWriteBuffer : writeBuffer) { - currentWriteBufferSize += aWriteBuffer.heapSize(); + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); } } }