diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 42c63eb..c325097 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -97,12 +98,21 @@ class MultiServerCallable extends PayloadCarryingServerCallable cells = null; - // The multi object is a list of Actions by region. Iterate by region. + + // Pre-size. Presume at least a KV per Action. There are likely more. + List cells = + (this.cellBlock ? new ArrayList(countOfActions) : null); + long nonceGroup = multiAction.getNonceGroup(); if (nonceGroup != HConstants.NO_NONCE) { multiRequestBuilder.setNonceGroup(nonceGroup); } + // Index to track RegionAction within the MultiRequest + int regionActionIndex = -1; + // Map from a created RegionAction for a RowMutations to the original index within + // its original list of actions + Map rowMutationsIndexMap = new HashMap<>(); + // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry>> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); final List> actions = e.getValue(); @@ -110,19 +120,46 @@ class MultiServerCallable extends PayloadCarryingServerCallable action : actions) { + Row row = action.getAction(); + // Row Mutations are a set of Puts and/or Deletes all to be applied atomically + // on the one row. We do separate RegionAction for each RowMutations. + // We maintain a map to keep track of this RegionAction and the original Action index. + if (row instanceof RowMutations) { + RowMutations rms = (RowMutations)row; + if (this.cellBlock) { + // Build a multi request absent its Cell payload. Send data in cellblocks. + regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells, + regionActionBuilder, actionBuilder, mutationBuilder); + } else { + regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms); + } + regionActionBuilder.setAtomic(true); + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; + rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex()); + rowMutations++; + + regionActionBuilder.clear(); + regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( + HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) ); + } + } - if (this.cellBlock) { - // Presize. Presume at least a KV per Action. There are likely more. - if (cells == null) cells = new ArrayList(countOfActions); - // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. - // They have already been handled above. Guess at count of cells - regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, - regionActionBuilder, actionBuilder, mutationBuilder); - } else { - regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, - regionActionBuilder, actionBuilder, mutationBuilder); + if (actions.size() > rowMutations) { + if (this.cellBlock) { + // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations. + // They have already been handled above. Guess at count of cells + regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, + regionActionBuilder, actionBuilder, mutationBuilder); + } else { + regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions, + regionActionBuilder, actionBuilder, mutationBuilder); + } + multiRequestBuilder.addRegionAction(regionActionBuilder.build()); + regionActionIndex++; } - multiRequestBuilder.addRegionAction(regionActionBuilder.build()); } // Controller optionally carries cell data over the proxy/service boundary and also @@ -140,7 +177,8 @@ class MultiServerCallable extends PayloadCarryingServerCallable gets) throws IOException; /** - * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. + * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations * The ordering of execution of the actions is not defined. Meaning if you do a Put and a * Get in the same {@link #batch} call, you will not necessarily be * guaranteed that the Get returns what the Put had put. * - * @param actions list of Get, Put, Delete, Increment, Append objects + * @param actions list of Get, Put, Delete, Increment, Append, RowMutations * @param results Empty Object[], same size as actions. Provides access to partial * results, in case an exception is thrown. A null in the result array means that * the call for that action failed, even after retries @@ -123,7 +123,7 @@ public interface Table extends Closeable { * Same as {@link #batch(List, Object[])}, but returns an array of * results instead of using a results parameter reference. * - * @param actions list of Get, Put, Delete, Increment, Append objects + * @param actions list of Get, Put, Delete, Increment, Append, RowMutations * @return the results from the actions. A null in the return array means that * the call for that action failed, even after retries * @throws IOException diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 8163130..32bdcb9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -621,7 +621,8 @@ public final class RequestConverter { .setMethodName(exec.getMethod().getName()) .setRequest(exec.getRequest().toByteString()))); } else if (row instanceof RowMutations) { - throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); + // Skip RowMutations, which has been separately converted to RegionAction + continue; } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -699,7 +700,8 @@ public final class RequestConverter { .setMethodName(exec.getMethod().getName()) .setRequest(exec.getRequest().toByteString()))); } else if (row instanceof RowMutations) { - throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); + // Skip RowMutations, which has been separately converted to RegionAction + continue; } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index ba7041e..5b8498d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -79,7 +79,8 @@ public final class ResponseConverter { /** * Get the results from a protocol buffer MultiResponse * - * @param request the protocol buffer MultiResponse to convert + * @param request the original protocol buffer MultiRequest + * @param response the protocol buffer MultiResponse to convert * @param cells Cells to go with the passed in proto. Can be null. * @return the results that were in the MultiResponse (a Result or an Exception). * @throws IOException @@ -87,6 +88,22 @@ public final class ResponseConverter { public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, final MultiResponse response, final CellScanner cells) throws IOException { + return getResults(request, null, response, cells); + } + + /** + * Get the results from a protocol buffer MultiResponse + * + * @param request the original protocol buffer MultiRequest + * @param rowMutationsIndexMap + * @param response the protocol buffer MultiResponse to convert + * @param cells Cells to go with the passed in proto. Can be null. + * @return the results that were in the MultiResponse (a Result or an Exception). + * @throws IOException + */ + public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, + Map rowMutationsIndexMap, final MultiResponse response, final CellScanner cells) + throws IOException { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); if (requestRegionActionCount != responseRegionActionResultCount) { @@ -120,8 +137,22 @@ public final class ResponseConverter { actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion()); } + Object responseValue; + + Integer rowMutationsIndex = + (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i)); + if (rowMutationsIndex != null) { + // This RegionAction is from a RowMutations in a batch. + // If there is an exception from the server, the exception is set at + // the RegionActionResult level, which has been handled above. + responseValue = response.getProcessed() ? + ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : + ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; + results.add(regionName, rowMutationsIndex, responseValue); + continue; + } + for (ResultOrException roe : actionResult.getResultOrExceptionList()) { - Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 08ccc42..5d7c853 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -314,6 +314,52 @@ public class TestFromClientSide3 { } @Test + public void testBatchWithRowMutation() throws Exception { + LOG.info("Starting testBatchWithRowMutation"); + final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation"); + try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) { + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("a"), Bytes.toBytes("b") + }; + RowMutations arm = new RowMutations(ROW); + Put p = new Put(ROW); + p.addColumn(FAMILY, QUALIFIERS[0], VALUE); + arm.add(p); + Object[] batchResult = new Object[1]; + t.batch(Arrays.asList(arm), batchResult); + + Get g = new Get(ROW); + Result r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); + + arm = new RowMutations(ROW); + p = new Put(ROW); + p.addColumn(FAMILY, QUALIFIERS[1], VALUE); + arm.add(p); + Delete d = new Delete(ROW); + d.addColumns(FAMILY, QUALIFIERS[0]); + arm.add(d); + t.batch(Arrays.asList(arm), batchResult); + r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); + assertNull(r.getValue(FAMILY, QUALIFIERS[0])); + + // Test that we get the correct remote exception for RowMutations from batch() + try { + arm = new RowMutations(ROW); + p = new Put(ROW); + p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE); + arm.add(p); + t.batch(Arrays.asList(arm), batchResult); + fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); + } catch (RetriesExhaustedWithDetailsException e) { + String msg = e.getMessage(); + assertTrue(msg.contains("NoSuchColumnFamilyException")); + } + } + } + + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { // Test with a single region table. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 484bc0e..d18f560 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -576,10 +576,12 @@ public class TestMultiParallel { @Test(timeout=300000) public void testBatchWithMixedActions() throws Exception { LOG.info("test=testBatchWithMixedActions"); - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + Table table = UTIL.getConnection().getTable(TEST_TABLE); // Load some data to start - Object[] results = table.batch(constructPutRequests()); + List puts = constructPutRequests(); + Object[] results = new Object[puts.size()]; + table.batch(puts, results); validateSizeAndEmpty(results, KEYS.length); // Batch: get, get, put(new col), delete, get, get of put, get of deleted, @@ -601,12 +603,12 @@ public class TestMultiParallel { // 2 put of new column Put put = new Put(KEYS[10]); - put.add(BYTES_FAMILY, qual2, val2); + put.addColumn(BYTES_FAMILY, qual2, val2); actions.add(put); // 3 delete Delete delete = new Delete(KEYS[20]); - delete.deleteFamily(BYTES_FAMILY); + delete.addFamily(BYTES_FAMILY); actions.add(delete); // 4 get @@ -620,19 +622,38 @@ public class TestMultiParallel { // 5 put of new column put = new Put(KEYS[40]); - put.add(BYTES_FAMILY, qual2, val2); + put.addColumn(BYTES_FAMILY, qual2, val2); actions.add(put); - results = table.batch(actions); + // 6 RowMutations + RowMutations rm = new RowMutations(KEYS[50]); + put = new Put(KEYS[50]); + put.addColumn(BYTES_FAMILY, qual2, val2); + rm.add(put); + byte[] qual3 = Bytes.toBytes("qual3"); + byte[] val3 = Bytes.toBytes("putvalue3"); + put = new Put(KEYS[50]); + put.addColumn(BYTES_FAMILY, qual3, val3); + rm.add(put); + actions.add(rm); + + // 7 Add another Get to the mixed sequence after RowMutations + get = new Get(KEYS[10]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + results = new Object[actions.size()]; + table.batch(actions, results); // Validation validateResult(results[0]); validateResult(results[1]); - validateEmpty(results[2]); validateEmpty(results[3]); validateResult(results[4]); validateEmpty(results[5]); + validateEmpty(results[6]); + validateResult(results[7]); // validate last put, externally from the batch get = new Get(KEYS[40]); @@ -640,6 +661,17 @@ public class TestMultiParallel { Result r = table.get(get); validateResult(r, qual2, val2); + // validate last RowMutations, externally from the batch + get = new Get(KEYS[50]); + get.addColumn(BYTES_FAMILY, qual2); + r = table.get(get); + validateResult(r, qual2, val2); + + get = new Get(KEYS[50]); + get.addColumn(BYTES_FAMILY, qual3); + r = table.get(get); + validateResult(r, qual3, val3); + table.close(); } @@ -716,8 +748,7 @@ public class TestMultiParallel { private void validateEmpty(Object r1) { Result result = (Result)r1; Assert.assertTrue(result != null); - Assert.assertTrue(result.getRow() == null); - Assert.assertEquals(0, result.rawCells().length); + Assert.assertTrue(result.isEmpty()); } private void validateSizeAndEmpty(Object[] results, int expectedSize) {