diff --git src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 53402e7..1701397 100644 --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -116,6 +116,7 @@ public class LocalHBaseCluster implements HConstants { try { server = regionServerClass.getConstructor(HBaseConfiguration.class). newInstance(conf); + server.shutdownHDFS.set(false); // no, bad. } catch (Exception e) { IOException ioe = new IOException(); ioe.initCause(e); diff --git src/java/org/apache/hadoop/hbase/client/HConnectionManager.java src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index d698344..217ebc5 100644 --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1102,11 +1102,12 @@ public class HConnectionManager implements HConstants { t = t.getCause(); } if (t instanceof RemoteException) { - t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t); + throw RemoteExceptionHandler.decodeRemoteException((RemoteException) t); } if (t instanceof DoNotRetryIOException) { throw (DoNotRetryIOException) t; } + LOG.debug("Exception in getRegionServerForWithoutRetries", t); } return null; } @@ -1367,22 +1368,29 @@ public class HConnectionManager implements HConstants { Future future = futures.get(i); MultiPut request = multiPuts.get(i); try { + // response might be null? MultiPutResponse resp = future.get(); - // For each region - for (Map.Entry> e : request.puts.entrySet()) { - Integer result = resp.getAnswer(e.getKey()); - if (result == null) { - // failed - LOG.debug("Failed all for region: " + - Bytes.toStringBinary(e.getKey()) + ", removing from cache"); - failed.addAll(e.getValue()); - } else if (result >= 0) { - // some failures - List lst = e.getValue(); - failed.addAll(lst.subList(result, lst.size())); - LOG.debug("Failed past " + result + " for region: " + - Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + if (resp == null) { + failed.addAll(request.allPuts()); + } else { + + + // For each region + for (Map.Entry> e : request.puts.entrySet()) { + Integer result = resp.getAnswer(e.getKey()); + if (result == null) { + // failed + LOG.debug("Failed all for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + failed.addAll(e.getValue()); + } else if (result >= 0) { + // some failures + List lst = e.getValue(); + failed.addAll(lst.subList(result, lst.size())); + LOG.debug("Failed past " + result + " for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + } } } } catch (InterruptedException e) { @@ -1392,7 +1400,7 @@ public class HConnectionManager implements HConstants { } catch (ExecutionException e) { System.out.println(e); // all go into the failed list. - LOG.debug("Failed all from " + request.address, e); + LOG.debug("Failed all from (2) " + request.address, e); failed.addAll(request.allPuts()); } } @@ -1428,19 +1436,22 @@ public class HConnectionManager implements HConstants { final HConnection connection = this; return new Callable() { public MultiPutResponse call() throws IOException { - return getRegionServerWithRetries( + return getRegionServerForWithoutRetries( + new ServerCallable(connection, tableName, null) { + public MultiPutResponse call() throws IOException { MultiPutResponse resp = server.multiPut(puts); resp.request = puts; return resp; } + @Override public void instantiateServer(boolean reload) throws IOException { server = connection.getHRegionConnection(address); } } - ); + ); // end getRegionServerForWithoutRetries } }; } diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b17b3e8..2f804c7 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -232,7 +232,7 @@ public class HRegionServer implements HConstants, HRegionInterface, // Run HDFS shutdown thread on exit if this is set. We clear this out when // doing a restart() to prevent closing of HDFS. - private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); + public final AtomicBoolean shutdownHDFS = new AtomicBoolean(true); private final String machineName; diff --git src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java index 89cc869..43924e8 100644 --- src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java +++ src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java @@ -20,15 +20,15 @@ package org.apache.hadoop.hbase; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; -import java.util.List; import java.util.ArrayList; +import java.util.List; public class TestMultiParallelPut extends MultiRegionTable { private static final byte[] VALUE = Bytes.toBytes("value"); @@ -58,7 +58,14 @@ public class TestMultiParallelPut extends MultiRegionTable { List keys = new ArrayList(); - public void testMultiPut() throws Exception { + public void testParallelPut() throws Exception { + doATest(false); + } + public void testParallelPutWithRSAbort() throws Exception { + doATest(true); + } + + public void doATest(boolean doAbort) throws Exception { HTable table = new HTable(TEST_TABLE); table.setAutoFlush(false); @@ -73,6 +80,19 @@ public class TestMultiParallelPut extends MultiRegionTable { table.flushCommits(); + if (doAbort) { + cluster.abortRegionServer(0); + + // try putting more keys after the abort. + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + table.flushCommits(); + } + for (byte [] k : keys ) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); @@ -88,10 +108,15 @@ public class TestMultiParallelPut extends MultiRegionTable { HBaseAdmin admin = new HBaseAdmin(conf); ClusterStatus cs = admin.getClusterStatus(); - assertEquals(2, cs.getServers()); + int expectedServerCount = 2; + if (doAbort) + expectedServerCount = 1; + + assertEquals(expectedServerCount, cs.getServers()); for ( HServerInfo info : cs.getServerInfo()) { System.out.println(info); assertTrue( info.getLoad().getNumberOfRegions() > 10); } } + }