diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 2ae68c4..3f83011 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -232,27 +233,53 @@ class AsyncBatchRpcRetryingCaller { } private ClientProtos.MultiRequest buildReq(Map actionsByRegion, - List cells) throws IOException { + List cells, Map rowMutationsIndexMap) throws IOException { ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); + + // Index to track RegionAction within the MultiRequest + int regionActionIndex = -1; for (Map.Entry entry : actionsByRegion.entrySet()) { - // TODO: remove the extra for loop as we will iterate it in mutationBuilder. - if (!multiRequestBuilder.hasNonceGroup()) { - for (Action action : entry.getValue().actions) { - if (action.hasNonce()) { - multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup()); - break; - } - } - } regionActionBuilder.clear(); regionActionBuilder.setRegion( RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); - regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(), - entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder); - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + + int rowMutations = 0; + for (Action action : entry.getValue().actions) { + if (!multiRequestBuilder.hasNonceGroup() && action.hasNonce()) { + multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup()); + } + + Row row = action.getAction(); + // Row Mutations are a set of Puts and/or Deletes all to be applied atomically + // on the one row. We do separate RegionAction for each RowMutations. + // We maintain a map to keep track of this RegionAction and the original Action index. + if (row instanceof RowMutations) { + RowMutations rms = (RowMutations) row; + regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(), rms, cells, + regionActionBuilder, actionBuilder, mutationBuilder); + regionActionBuilder.setAtomic(true); + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; + rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex()); + rowMutations++; + + regionActionBuilder.clear(); + regionActionBuilder.setRegion( + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); + } + } + + if (entry.getValue().actions.size() > rowMutations) { + // The call to buildNoDataRegionAction will skip RowMutations. + // They have already been handled above. + regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(), + entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder); + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; + } } return multiRequestBuilder.build(); } @@ -337,8 +364,11 @@ class AsyncBatchRpcRetryingCaller { } ClientProtos.MultiRequest req; List cells = new ArrayList<>(); + // Map from a created RegionAction to the original index for a RowMutations within + // its original list of actions + Map rowMutationsIndexMap = new HashMap<>(); try { - req = buildReq(serverReq.actionsByRegion, cells); + req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); } catch (IOException e) { onError(serverReq.actionsByRegion, tries, e, sn); return; @@ -353,8 +383,8 @@ class AsyncBatchRpcRetryingCaller { onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); } else { try { - onComplete(serverReq.actionsByRegion, tries, sn, - ResponseConverter.getResults(req, resp, controller.cellScanner())); + onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, + rowMutationsIndexMap, resp, controller.cellScanner())); } catch (Exception e) { onError(serverReq.actionsByRegion, tries, e, sn); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index 7d24c4f..44d884d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -393,11 +393,11 @@ public interface AsyncTableBase { } /** - * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of - * execution of the actions is not defined. Meaning if you do a Put and a Get in the same - * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put - * had put. - * @param actions list of Get, Put, Delete, Increment, Append objects + * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The + * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the + * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the + * Put had put. + * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects * @return A list of {@link CompletableFuture}s that represent the result for each action. */ List> batch(List actions); @@ -405,7 +405,7 @@ public interface AsyncTableBase { /** * A simple version of batch. It will fail if there are any failures and you will get the whole * result list at once if the operation is succeeded. - * @param actions list of Get, Put, Delete, Increment, Append objects + * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. */ default CompletableFuture> batchAll(List actions) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index fce9041..4e400a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -69,6 +69,7 @@ public class TestAsyncTableBatch { private static byte[] FAMILY = Bytes.toBytes("cf"); private static byte[] CQ = Bytes.toBytes("cq"); + private static byte[] CQ1 = Bytes.toBytes("cq1"); private static int COUNT = 1000; @@ -178,9 +179,9 @@ public class TestAsyncTableBatch { } @Test - public void testMixed() throws InterruptedException, ExecutionException { + public void testMixed() throws InterruptedException, ExecutionException, IOException { AsyncTableBase table = tableGetter.apply(TABLE_NAME); - table.putAll(IntStream.range(0, 5) + table.putAll(IntStream.range(0, 7) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) .collect(Collectors.toList())).get(); List actions = new ArrayList<>(); @@ -189,8 +190,14 @@ public class TestAsyncTableBatch { actions.add(new Delete(Bytes.toBytes(2))); actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); + RowMutations rm = new RowMutations(Bytes.toBytes(5)); + rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100))); + rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200))); + actions.add(rm); + actions.add(new Get(Bytes.toBytes(6))); + List results = table.batchAll(actions).get(); - assertEquals(5, results.size()); + assertEquals(7, results.size()); Result getResult = (Result) results.get(0); assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); @@ -202,6 +209,12 @@ public class TestAsyncTableBatch { assertEquals(12, appendValue.length); assertEquals(4, Bytes.toLong(appendValue)); assertEquals(4, Bytes.toInt(appendValue, 8)); + assertEquals(100, + Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); + assertEquals(200, + Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); + getResult = (Result) results.get(6); + assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); } public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {