diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 2ae68c4..8234386 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -232,27 +233,19 @@ class AsyncBatchRpcRetryingCaller { } private ClientProtos.MultiRequest buildReq(Map actionsByRegion, - List cells) throws IOException { + List cells, Map rowMutationsIndexMap) throws IOException { ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); for (Map.Entry entry : actionsByRegion.entrySet()) { - // TODO: remove the extra for loop as we will iterate it in mutationBuilder. - if (!multiRequestBuilder.hasNonceGroup()) { - for (Action action : entry.getValue().actions) { - if (action.hasNonce()) { - multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup()); - break; - } - } - } - regionActionBuilder.clear(); - regionActionBuilder.setRegion( - RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey())); - regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(), - entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder); - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + // multiRequestBuilder will be populated with region actions. + // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the + // action list. + RequestConverter.buildNoDataRegionAction(entry.getKey(), + entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder, + mutationBuilder, nonceGroup, rowMutationsIndexMap); } return multiRequestBuilder.build(); } @@ -337,8 +330,12 @@ class AsyncBatchRpcRetryingCaller { } ClientProtos.MultiRequest req; List cells = new ArrayList<>(); + // Map from a created RegionAction to the original index for a RowMutations within + // the original list of actions. This will be used to process the results when there + // is RowMutations in the action list. + Map rowMutationsIndexMap = new HashMap<>(); try { - req = buildReq(serverReq.actionsByRegion, cells); + req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); } catch (IOException e) { onError(serverReq.actionsByRegion, tries, e, sn); return; @@ -353,8 +350,8 @@ class AsyncBatchRpcRetryingCaller { onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); } else { try { - onComplete(serverReq.actionsByRegion, tries, sn, - ResponseConverter.getResults(req, resp, controller.cellScanner())); + onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, + rowMutationsIndexMap, resp, controller.cellScanner())); } catch (Exception e) { onError(serverReq.actionsByRegion, tries, e, sn); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index b3ccb15..fd08aa3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -431,11 +431,11 @@ public interface AsyncTable { } /** - * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of - * execution of the actions is not defined. Meaning if you do a Put and a Get in the same - * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put - * had put. - * @param actions list of Get, Put, Delete, Increment, Append objects + * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The + * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the + * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the + * Put had put. + * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects * @return A list of {@link CompletableFuture}s that represent the result for each action. */ List> batch(List actions); @@ -443,7 +443,7 @@ public interface AsyncTable { /** * A simple version of batch. It will fail if there are any failures and you will get the whole * result list at once if the operation is succeeded. - * @param actions list of Get, Put, Delete, Increment, Append objects + * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. */ default CompletableFuture> batchAll(List actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index ed7e718..ca391cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -100,57 +100,29 @@ class MultiServerCallable extends CancellableRegionServerCallable (this.cellBlock ? new ArrayList(countOfActions) : null); long nonceGroup = multiAction.getNonceGroup(); - if (nonceGroup != HConstants.NO_NONCE) { - multiRequestBuilder.setNonceGroup(nonceGroup); - } - // Index to track RegionAction within the MultiRequest - int regionActionIndex = -1; + // Map from a created RegionAction to the original index for a RowMutations within - // its original list of actions + // the original list of actions. This will be used to process the results when there + // is RowMutations in the action list. Map rowMutationsIndexMap = new HashMap<>(); // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); final List actions = e.getValue(); - regionActionBuilder.clear(); - regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( - HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); - - int rowMutations = 0; - for (Action action : actions) { - Row row = action.getAction(); - // Row Mutations are a set of Puts and/or Deletes all to be applied atomically - // on the one row. We do separate RegionAction for each RowMutations. - // We maintain a map to keep track of this RegionAction and the original Action index. - if (row instanceof RowMutations) { - RowMutations rms = (RowMutations)row; - if (this.cellBlock) { - // Build a multi request absent its Cell payload. Send data in cellblocks. - regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells, - regionActionBuilder, actionBuilder, mutationBuilder); - } else { - regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms); - } - regionActionBuilder.setAtomic(true); - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); - regionActionIndex++; - rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex()); - rowMutations++; - } + if (this.cellBlock) { + // Send data in cellblocks. + // multiRequestBuilder will be populated with region actions. + // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the + // action list. + RequestConverter.buildNoDataRegionAction(regionName, actions, cells, multiRequestBuilder, + regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap); } - - if (actions.size() > rowMutations) { - if (this.cellBlock) { - // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations. - // They have already been handled above. Guess at count of cells - regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, - regionActionBuilder, actionBuilder, mutationBuilder); - } else { - regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, - regionActionBuilder, actionBuilder, mutationBuilder); - } - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); - regionActionIndex++; + else { + // multiRequestBuilder will be populated with region actions. + // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the + // action list. + RequestConverter.buildRegionAction(regionName, actions, multiRequestBuilder, + regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4fdc87d..fcf00f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.regex.Pattern; @@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; @@ -625,17 +627,31 @@ public final class RequestConverter { /** * Create a protocol buffer multi request for a list of actions. * Propagates Actions original index. + * The passed in multiRequestBuilder will be populated with region actions. * * @param regionName * @param actions - * @return a multi request + * @param multiRequestBuilder + * @param regionActionBuilder + * @param actionBuilder + * @param mutationBuilder + * @param nonceGroup + * @param rowMutationsIndexMap * @throws IOException */ - public static RegionAction.Builder buildRegionAction(final byte[] regionName, - final List actions, final RegionAction.Builder regionActionBuilder, + public static void buildRegionAction(final byte[] regionName, + final List actions, final MultiRequest.Builder multiRequestBuilder, + final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, - final MutationProto.Builder mutationBuilder) throws IOException { + final MutationProto.Builder mutationBuilder, + long nonceGroup, final Map rowMutationsIndexMap) throws IOException { + regionActionBuilder.clear(); + RegionAction.Builder builder = getRegionActionBuilderWithRegion( + regionActionBuilder, regionName); ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null; + boolean hasNonce = false; + List rowMutationsList = new ArrayList<>(); + for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); @@ -643,19 +659,21 @@ public final class RequestConverter { mutationBuilder.clear(); if (row instanceof Get) { Get g = (Get)row; - regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); + builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); } else if (row instanceof Put) { - regionActionBuilder.addAction(actionBuilder. + builder.addAction(actionBuilder. setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder))); } else if (row instanceof Delete) { - regionActionBuilder.addAction(actionBuilder. + builder.addAction(actionBuilder. setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder))); } else if (row instanceof Append) { - regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( + builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce()))); + hasNonce = true; } else if (row instanceof Increment) { - regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( + builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation( MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce()))); + hasNonce = true; } else if (row instanceof RegionCoprocessorServiceExec) { RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row; // DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString. @@ -667,19 +685,39 @@ public final class RequestConverter { } else { cpBuilder.clear(); } - regionActionBuilder.addAction(actionBuilder.setServiceCall( + builder.addAction(actionBuilder.setServiceCall( cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow())) .setServiceName(exec.getMethod().getService().getFullName()) .setMethodName(exec.getMethod().getName()) .setRequest(value))); } else if (row instanceof RowMutations) { - // Skip RowMutations, which has been separately converted to RegionAction - continue; + rowMutationsList.add(action); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } } - return regionActionBuilder; + if (!multiRequestBuilder.hasNonceGroup() && hasNonce) { + multiRequestBuilder.setNonceGroup(nonceGroup); + } + multiRequestBuilder.addRegionAction(builder.build()); + + // Process RowMutations here. We can not process it in the big loop above because + // it will corrupt the sequence order maintained in cells. + // RowMutations is a set of Puts and/or Deletes all to be applied atomically + // on the one row. We do separate RegionAction for each RowMutations. + // We maintain a map to keep track of this RegionAction and the original Action index. + for (Action action : rowMutationsList) { + RowMutations rms = (RowMutations) action.getAction(); + RegionAction.Builder rowMutationsRegionActionBuilder = + RequestConverter.buildRegionAction(regionName, rms); + rowMutationsRegionActionBuilder.setAtomic(true); + // Put it in the multiRequestBuilder + multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build()); + // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) + // in the overall multiRequest. + rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1, + action.getOriginalIndex()); + } } /** @@ -689,23 +727,36 @@ public final class RequestConverter { * coming along otherwise. Note that Get is different. It does not contain 'data' and is always * carried by protobuf. We return references to the data by adding them to the passed in * data param. + * The passed in multiRequestBuilder will be populated with region actions. * *

Propagates Actions original index. * * @param regionName * @param actions * @param cells Place to stuff references to actual data. - * @return a multi request that does not carry any data. + * @param multiRequestBuilder + * @param regionActionBuilder + * @param actionBuilder + * @param mutationBuilder + * @param nonceGroup + * @param rowMutationsIndexMap * @throws IOException */ - public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, + public static void buildNoDataRegionAction(final byte[] regionName, final Iterable actions, final List cells, + final MultiRequest.Builder multiRequestBuilder, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, - final MutationProto.Builder mutationBuilder) throws IOException { + final MutationProto.Builder mutationBuilder, + long nonceGroup, final Map rowMutationsIndexMap) throws IOException { + regionActionBuilder.clear(); RegionAction.Builder builder = getRegionActionBuilderWithRegion( regionActionBuilder, regionName); ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null; + RegionAction.Builder rowMutationsRegionActionBuilder = null; + boolean hasNonce = false; + List rowMutationsList = new ArrayList<>(); + for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); @@ -740,11 +791,13 @@ public final class RequestConverter { cells.add(a); builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData( MutationType.APPEND, a, mutationBuilder, action.getNonce()))); + hasNonce = true; } else if (row instanceof Increment) { Increment i = (Increment)row; cells.add(i); builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData( MutationType.INCREMENT, i, mutationBuilder, action.getNonce()))); + hasNonce = true; } else if (row instanceof RegionCoprocessorServiceExec) { RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row; // DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString. @@ -762,13 +815,40 @@ public final class RequestConverter { .setMethodName(exec.getMethod().getName()) .setRequest(value))); } else if (row instanceof RowMutations) { - // Skip RowMutations, which has been separately converted to RegionAction - continue; + rowMutationsList.add(action); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } } - return builder; + if (!multiRequestBuilder.hasNonceGroup() && hasNonce) { + multiRequestBuilder.setNonceGroup(nonceGroup); + } + multiRequestBuilder.addRegionAction(builder.build()); + + // Process RowMutations here. We can not process it in the big loop above because + // it will corrupt the sequence order maintained in cells. + // RowMutations is a set of Puts and/or Deletes all to be applied atomically + // on the one row. We do separate RegionAction for each RowMutations. + // We maintain a map to keep track of this RegionAction and the original Action index. + for (Action action : rowMutationsList) { + RowMutations rms = (RowMutations) action.getAction(); + if (rowMutationsRegionActionBuilder == null) { + rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder(); + } else { + rowMutationsRegionActionBuilder.clear(); + } + rowMutationsRegionActionBuilder.setRegion( + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); + rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, + cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder); + rowMutationsRegionActionBuilder.setAtomic(true); + // Put it in the multiRequestBuilder + multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build()); + // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) + // in the overall multiRequest. + rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1, + action.getOriginalIndex()); + } } // End utilities for Client diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index c80b27b..489ad1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -71,6 +71,7 @@ public class TestAsyncTableBatch { private static byte[] FAMILY = Bytes.toBytes("cf"); private static byte[] CQ = Bytes.toBytes("cq"); + private static byte[] CQ1 = Bytes.toBytes("cq1"); private static int COUNT = 1000; @@ -178,9 +179,9 @@ public class TestAsyncTableBatch { } @Test - public void testMixed() throws InterruptedException, ExecutionException { + public void testMixed() throws InterruptedException, ExecutionException, IOException { AsyncTable table = tableGetter.apply(TABLE_NAME); - table.putAll(IntStream.range(0, 5) + table.putAll(IntStream.range(0, 7) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) .collect(Collectors.toList())).get(); List actions = new ArrayList<>(); @@ -189,8 +190,14 @@ public class TestAsyncTableBatch { actions.add(new Delete(Bytes.toBytes(2))); actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); + RowMutations rm = new RowMutations(Bytes.toBytes(5)); + rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100))); + rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200))); + actions.add(rm); + actions.add(new Get(Bytes.toBytes(6))); + List results = table.batchAll(actions).get(); - assertEquals(5, results.size()); + assertEquals(7, results.size()); Result getResult = (Result) results.get(0); assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); @@ -202,6 +209,12 @@ public class TestAsyncTableBatch { assertEquals(12, appendValue.length); assertEquals(4, Bytes.toLong(appendValue)); assertEquals(4, Bytes.toInt(appendValue, 8)); + assertEquals(100, + Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); + assertEquals(200, + Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); + getResult = (Result) results.get(6); + assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); } public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {