diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index b2c012d..8367d53 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; @@ -716,19 +717,7 @@ public class HTable implements Table { final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { - @Override - protected Boolean rpcCall() throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = doMutate(request); - return Boolean.valueOf(response.getProcessed()); - } - }; - return rpcCallerFactory. newCaller(this.writeRpcTimeout). - callWithRetries(callable, this.operationTimeout); + return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); } /** @@ -739,21 +728,7 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - ClientServiceCallable callable = - new ClientServiceCallable(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { - @Override - protected Boolean rpcCall() throws Exception { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); - MutateResponse response = doMutate(request); - return Boolean.valueOf(response.getProcessed()); - } - }; - return rpcCallerFactory. newCaller(this.writeRpcTimeout). - callWithRetries(callable, this.operationTimeout); + return checkAndMutate(row, family, qualifier, compareOp, value, MutationType.PUT, put); } /** @@ -773,6 +748,18 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { + return checkAndMutate(row, family, qualifier, compareOp, value, MutationType.DELETE, delete); + } + + @VisibleForTesting + boolean checkAndMutate(final byte [] row, final byte [] family, + final byte [] qualifier, final CompareOp compareOp, final byte [] value, + final MutationType type, final Mutation mutation) + throws IOException { + if (type != MutationType.PUT && type != MutationType.DELETE) { + throw new IllegalArgumentException("checkAndMutate doesn't support " + + type); + } CancellableRegionServerCallable callable = new CancellableRegionServerCallable( this.connection, getName(), row, this.rpcControllerFactory.newController(), @@ -782,15 +769,14 @@ public class HTable implements Table { CompareType compareType = CompareType.valueOf(compareOp.name()); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); + new BinaryComparator(value), compareType, type, mutation); MutateResponse response = doMutate(request); return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - List rows = Collections.singletonList(delete); Object[] results = new Object[1]; - AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, -1); + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, + Collections.singletonList(mutation), null, results, callable, -1); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index abd1563..96d8e87 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -198,15 +198,8 @@ public final class RequestConverter { final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, final CompareType compareType, final Put put) throws IOException { - MutateRequest.Builder builder = MutateRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder())); - builder.setCondition(condition); - return builder.build(); + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, + MutationType.PUT, put); } /** @@ -226,14 +219,35 @@ public final class RequestConverter { final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, final CompareType compareType, final Delete delete) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, + MutationType.DELETE, delete); + } + + /** + * Create a protocol buffer MutateRequest for a conditioned mutation. + * + * @param regionName + * @param row + * @param family + * @param qualifier + * @param comparator + * @param compareType + * @param type + * @param mutation + * @return a mutate request + * @throws IOException + */ + public static MutateRequest buildMutateRequest( + final byte[] regionName, final byte[] row, final byte[] family, + final byte [] qualifier, final ByteArrayComparable comparator, + final CompareType compareType, final MutationType type, final Mutation mutation) throws IOException { MutateRequest.Builder builder = MutateRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); Condition condition = buildCondition( row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, - MutationProto.newBuilder())); + builder.setMutation(ProtobufUtil.toMutation(type, mutation, MutationProto.newBuilder())); builder.setCondition(condition); return builder.build(); }