diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 302489f..08111d3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -17,15 +17,22 @@ */ package org.apache.hadoop.hbase.protobuf; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -107,22 +114,15 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableSet; - -import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; /** * Protobufs utility. @@ -1021,37 +1021,43 @@ public final class ProtobufUtil { */ public static MultiResponse multi(final ClientProtocol client, final MultiAction multi) throws IOException { - try { - MultiResponse response = new MultiResponse(); - for (Map.Entry>> e: multi.actions.entrySet()) { - byte[] regionName = e.getKey(); - int rowMutations = 0; - List> actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof RowMutations) { + MultiResponse response = new MultiResponse(); + for (Map.Entry>> e: multi.actions.entrySet()) { + byte[] regionName = e.getKey(); + int rowMutations = 0; + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof RowMutations) { + try { MultiRequest request = - RequestConverter.buildMultiRequest(regionName, (RowMutations)row); + RequestConverter.buildMultiRequest(regionName, (RowMutations)row); client.multi(null, request); response.add(regionName, action.getOriginalIndex(), new Result()); - rowMutations++; + } catch (ServiceException se) { + response.add(regionName, action.getOriginalIndex(), getRemoteException(se)); } + rowMutations++; } - if (actions.size() > rowMutations) { + } + if (actions.size() > rowMutations) { + try { MultiRequest request = - RequestConverter.buildMultiRequest(regionName, actions); + RequestConverter.buildMultiRequest(regionName, actions); ClientProtos.MultiResponse proto = client.multi(null, request); List results = ResponseConverter.getResults(proto); for (int i = 0, n = results.size(); i < n; i++) { int originalIndex = actions.get(i).getOriginalIndex(); response.add(regionName, originalIndex, results.get(i)); } + } catch (ServiceException se) { + for (int i = 0, n = actions.size(); i < n; i++) { + response.add(regionName, actions.get(i).getOriginalIndex(), getRemoteException(se)); + } } } - return response; - } catch (ServiceException se) { - throw getRemoteException(se); } + return response; } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index e80f208..0d7f91e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,10 +18,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -38,11 +38,17 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -52,6 +58,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + /** * This class is for testing HCM features */ @@ -64,6 +72,7 @@ public class TestHCM { private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2"); private static final byte[] FAM_NAM = Bytes.toBytes("f"); private static final byte[] ROW = Bytes.toBytes("bbb"); + private static final byte[] ROW_X = Bytes.toBytes("xxx"); @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -106,14 +115,14 @@ public class TestHCM { private static int getHConnectionManagerCacheSize(){ return HConnectionTestingUtility.getConnectionCount(); } - + @Test public void abortingHConnectionRemovesItselfFromHCM() throws Exception { // Save off current HConnections - Map oldHBaseInstances = + Map oldHBaseInstances = new HashMap(); oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES); - + HConnectionManager.HBASE_INSTANCES.clear(); try { @@ -536,6 +545,118 @@ public class TestHCM { conn.close(); } + @Test + public void testMulti() throws Exception { + HTable table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAM); + TEST_UTIL.createMultiRegions(table, FAM_NAM); + HConnectionManager.HConnectionImplementation conn = + (HConnectionManager.HConnectionImplementation)table.getConnection(); + + // We're now going to move the region and check that it works for the client + // First a new put to add the location in the cache + conn.clearRegionCache(TABLE_NAME); + Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME)); + + TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + + // We can wait for all regions to be online, that makes log reading easier when debugging + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(1); + } + + Put put = new Put(ROW_X); + put.add(FAM_NAM, ROW_X, ROW_X); + table.put(put); + + // Now moving the region to the second server + HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW_X); + byte[] regionName = toMove.getRegionInfo().getRegionName(); + byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); + + // Choose the other server. + int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); + int destServerId = (curServerId == 0 ? 1 : 0); + + HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); + HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); + + ServerName destServerName = destServer.getServerName(); + + //find another row in the cur server that is less than ROW_X + List regions = curServer.getOnlineRegions(TABLE_NAME); + byte[] otherRow = null; + for (HRegion region : regions) { + if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) + && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { + otherRow = region.getRegionInfo().getStartKey(); + break; + } + } + assertNotNull(otherRow); + Put put2 = new Put(otherRow); + put2.add(FAM_NAM, otherRow, otherRow); + table.put(put2); //cache put2's location + + // Check that we are in the expected state + Assert.assertTrue(curServer != destServer); + Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName())); + Assert.assertFalse( toMove.getPort() == destServerName.getPort()); + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionStates().isRegionsInTransition()); + + // Moving. It's possible that we don't have all the regions online at this point, so + // the test must depends only on the region we're looking at. + LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + destServerName.getServerName().getBytes() + ); + + while (destServer.getOnlineRegion(regionName) == null || + destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(1); + } + + LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); + + // Check our new state. + Assert.assertNull(curServer.getOnlineRegion(regionName)); + Assert.assertNotNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + + + // Cache was NOT updated and points to the wrong server + Assert.assertFalse( + conn.getCachedLocation(TABLE_NAME, ROW_X).getPort() == destServerName.getPort()); + + // Hijack the number of retry to fail after 2 tries + Field numRetries = conn.getClass().getDeclaredField("numRetries"); + numRetries.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL); + final int prevNumRetriesVal = (Integer)numRetries.get(conn); + numRetries.set(conn, 2); + + Put put3 = new Put(ROW_X); + put3.add(FAM_NAM, ROW_X, ROW_X); + Put put4 = new Put(otherRow); + put4.add(FAM_NAM, otherRow, otherRow); + + // do multi + table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, + // second we get RegionMovedException. + + numRetries.set(conn, prevNumRetriesVal); + table.close(); + } }