diff --git src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 53402e7..1701397 100644 --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -116,6 +116,7 @@ public class LocalHBaseCluster implements HConstants { try { server = regionServerClass.getConstructor(HBaseConfiguration.class). newInstance(conf); + server.shutdownHDFS.set(false); // no, bad. } catch (Exception e) { IOException ioe = new IOException(); ioe.initCause(e); diff --git src/java/org/apache/hadoop/hbase/client/HConnection.java src/java/org/apache/hadoop/hbase/client/HConnection.java index 6693582..c072b16 100644 --- src/java/org/apache/hadoop/hbase/client/HConnection.java +++ src/java/org/apache/hadoop/hbase/client/HConnection.java @@ -192,7 +192,7 @@ public interface HConnection { * @throws IOException * @throws RuntimeException */ - public T getRegionServerForWithoutRetries(ServerCallable callable) + public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException; @@ -219,5 +219,5 @@ public interface HConnection { public void processBatchOfPuts(List list, final byte[] tableName, ExecutorService pool) throws IOException; - + } diff --git src/java/org/apache/hadoop/hbase/client/HConnectionManager.java src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index b311391..c5e66bb 100644 --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -681,7 +681,7 @@ public class HConnectionManager implements HConstants { // This block guards against two threads trying to load the meta // region at the same time. The first will load the meta region and // the second will use the value that the first one found. - synchronized(regionLockObject) { + synchronized (regionLockObject) { // Check the cache again for a hit in case some other thread made the // same query while we were waiting on the lock. If not supposed to // be using the cache, delete any existing cached location so it won't @@ -1082,15 +1082,19 @@ public class HConnectionManager implements HConstants { return null; } - public T getRegionServerForWithoutRetries(ServerCallable callable) + public T getRegionServerWithoutRetries(ServerCallable callable) throws IOException, RuntimeException { try { callable.instantiateServer(false); return callable.call(); } catch (Throwable t) { - t = translateException(t); + Throwable t2 = translateException(t); + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } } - return null; } private HRegionLocation @@ -1306,7 +1310,12 @@ public class HConnectionManager implements HConstants { } public void processBatchOfPuts(List list, - final byte[] tableName, ExecutorService pool) throws IOException { + final byte[] tableName, + ExecutorService pool) throws IOException { + boolean singletonList = list.size() == 1; + Throwable singleRowCause = null; + List permFails = new ArrayList(); + for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { Collections.sort(list); Map regionPuts = @@ -1372,10 +1381,20 @@ public class HConnectionManager implements HConstants { LOG.debug("Failed all from " + request.address, e); failed.addAll(request.allPuts()); } catch (ExecutionException e) { - System.out.println(e); // all go into the failed list. LOG.debug("Failed all from " + request.address, e); failed.addAll(request.allPuts()); + + // Just give up, leaving the batch put list in an untouched/semi-committed state + if (e.getCause() instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) e.getCause(); + } + + if (singletonList) { + // be richer for reporting in a 1 row case. + singleRowCause = e.getCause(); + } + } } list.clear(); @@ -1391,15 +1410,20 @@ public class HConnectionManager implements HConstants { " ms!"); try { Thread.sleep(sleepTime); - } catch (InterruptedException e) { - + } catch (InterruptedException ignored) { } } } + + if (singletonList) { + if (singleRowCause != null) + throw new IOException(singleRowCause); + } + if (!list.isEmpty()) { // ran out of retries and didnt succeed everything! throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + - numRetries + " times. Should have detail on which Regions failed the most"); + numRetries + " times."); } } @@ -1410,7 +1434,7 @@ public class HConnectionManager implements HConstants { final HConnection connection = this; return new Callable() { public MultiPutResponse call() throws IOException { - return getRegionServerWithRetries( + return getRegionServerWithoutRetries( new ServerCallable(connection, tableName, null) { public MultiPutResponse call() throws IOException { MultiPutResponse resp = server.multiPut(puts); diff --git src/java/org/apache/hadoop/hbase/client/HTable.java src/java/org/apache/hadoop/hbase/client/HTable.java index 3b7b3c0..cbfcb7e 100644 --- src/java/org/apache/hadoop/hbase/client/HTable.java +++ src/java/org/apache/hadoop/hbase/client/HTable.java @@ -645,10 +645,10 @@ public class HTable { connection.processBatchOfPuts(writeBuffer, tableName, pool); } finally { - // the write buffer was adjsuted by processBatchOfPuts + // the write buffer was adjusted by processBatchOfPuts currentWriteBufferSize = 0; - for (int i = 0; i < writeBuffer.size(); i++) { - currentWriteBufferSize += writeBuffer.get(i).heapSize(); + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); } } } diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 578d310..e797597 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -232,7 +232,7 @@ public class HRegionServer implements HConstants, HRegionInterface, // Run HDFS shutdown thread on exit if this is set. We clear this out when // doing a restart() to prevent closing of HDFS. - private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); + public final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); private final String machineName; diff --git src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java index 89cc869..43924e8 100644 --- src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java +++ src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java @@ -20,15 +20,15 @@ package org.apache.hadoop.hbase; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; -import java.util.List; import java.util.ArrayList; +import java.util.List; public class TestMultiParallelPut extends MultiRegionTable { private static final byte[] VALUE = Bytes.toBytes("value"); @@ -58,7 +58,14 @@ public class TestMultiParallelPut extends MultiRegionTable { List keys = new ArrayList(); - public void testMultiPut() throws Exception { + public void testParallelPut() throws Exception { + doATest(false); + } + public void testParallelPutWithRSAbort() throws Exception { + doATest(true); + } + + public void doATest(boolean doAbort) throws Exception { HTable table = new HTable(TEST_TABLE); table.setAutoFlush(false); @@ -73,6 +80,19 @@ public class TestMultiParallelPut extends MultiRegionTable { table.flushCommits(); + if (doAbort) { + cluster.abortRegionServer(0); + + // try putting more keys after the abort. + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + table.flushCommits(); + } + for (byte [] k : keys ) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); @@ -88,10 +108,15 @@ public class TestMultiParallelPut extends MultiRegionTable { HBaseAdmin admin = new HBaseAdmin(conf); ClusterStatus cs = admin.getClusterStatus(); - assertEquals(2, cs.getServers()); + int expectedServerCount = 2; + if (doAbort) + expectedServerCount = 1; + + assertEquals(expectedServerCount, cs.getServers()); for ( HServerInfo info : cs.getServerInfo()) { System.out.println(info); assertTrue( info.getLoad().getNumberOfRegions() > 10); } } + }