From f98688472100aaf48840a8278e257c47cb8e8f82 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 24 Sep 2014 11:26:51 -0700 Subject: [PATCH] HBASE-11796 Add client support for atomic checkAndMutate (Srikanth Srungarapu) Amending-Author: Andrew Purtell --- .../org/apache/hadoop/hbase/client/HTable.java | 30 ++ .../hadoop/hbase/client/HTableInterface.java | 18 + .../org/apache/hadoop/hbase/client/HTablePool.java | 8 + .../hadoop/hbase/protobuf/RequestConverter.java | 47 +++ .../hbase/protobuf/generated/ClientProtos.java | 370 ++++++++++++++++++++- hbase-protocol/src/main/protobuf/Client.proto | 3 + .../hadoop/hbase/coprocessor/CoprocessorHost.java | 9 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 83 +++++ .../hadoop/hbase/rest/client/RemoteHTable.java | 7 + .../hadoop/hbase/client/TestCheckAndMutate.java | 103 ++++++ .../hadoop/hbase/regionserver/TestHRegion.java | 2 +- 11 files changed, 663 insertions(+), 17 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e12c7a3..e06a273 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.google.protobuf.InvalidProtocolBufferException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; @@ -1205,6 +1207,34 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override + public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, + final CompareOp compareOp, final byte [] value, final RowMutations rm) + throws IOException { + RegionServerCallable callable = + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call() throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MultiRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, rm); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + } + + /** + * {@inheritDoc} + */ + @Override public boolean exists(final Get get) throws IOException { get.setCheckExistenceOnly(true); Result r = get(get); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index eed5368..43a3c85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import java.io.Closeable; @@ -647,4 +648,21 @@ public interface HTableInterface extends Closeable { void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback) throws ServiceException, Throwable; + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. + * If it does, it performs the row mutations. If the passed value is null, the check + * is for the lack of column (ie: non-existence) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 429b5f2..ad57bf2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap; @@ -643,5 +644,12 @@ public class HTablePool implements Closeable { checkState(); table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback); } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + checkState(); + return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 7ce9254..756bc43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanReq import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Triple; @@ -262,6 +263,52 @@ public final class RequestConverter { } /** + * Create a protocol buffer MutateRequest for conditioned row mutations + * + * @param regionName + * @param row + * @param family + * @param qualifier + * @param comparator + * @param compareType + * @param rowMutations + * @return a mutate request + * @throws IOException + */ + public static ClientProtos.MultiRequest buildMutateRequest( + final byte[] regionName, final byte[] row, final byte[] family, + final byte [] qualifier, final ByteArrayComparable comparator, + final CompareType compareType, final RowMutations rowMutations) throws IOException { + RegionAction.Builder builder = + getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); + builder.setAtomic(true); + ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); + MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); + Condition condition = buildCondition( + row, family, qualifier, comparator, compareType); + for (Mutation mutation: rowMutations.getMutations()) { + MutationType mutateType = null; + if (mutation instanceof Put) { + mutateType = MutationType.PUT; + } else if (mutation instanceof Delete) { + mutateType = MutationType.DELETE; + } else { + throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + + mutation.getClass().getName()); + } + mutationBuilder.clear(); + MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); + actionBuilder.clear(); + actionBuilder.setMutation(mp); + builder.addAction(actionBuilder.build()); + } + ClientProtos.MultiRequest request = + ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) + .setCondition(condition).build(); + return request; + } + + /** * Create a protocol buffer MutateRequest for a put * * @param regionName diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index e70958c..ab6a68c 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -27932,6 +27932,20 @@ public final class ClientProtos { * optional uint64 nonceGroup = 2; */ long getNonceGroup(); + + // optional .Condition condition = 3; + /** + * optional .Condition condition = 3; + */ + boolean hasCondition(); + /** + * optional .Condition condition = 3; + */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition(); + /** + * optional .Condition condition = 3; + */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder(); } /** * Protobuf type {@code MultiRequest} @@ -28006,6 +28020,19 @@ public final class ClientProtos { nonceGroup_ = input.readUInt64(); break; } + case 26: { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = condition_.toBuilder(); + } + condition_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(condition_); + condition_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -28101,9 +28128,32 @@ public final class ClientProtos { return nonceGroup_; } + // optional .Condition condition = 3; + public static final int CONDITION_FIELD_NUMBER = 3; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_; + /** + * optional .Condition condition = 3; + */ + public boolean hasCondition() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional .Condition condition = 3; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() { + return condition_; + } + /** + * optional .Condition condition = 3; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() { + return condition_; + } + private void initFields() { regionAction_ = java.util.Collections.emptyList(); nonceGroup_ = 0L; + condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -28116,6 +28166,12 @@ public final class ClientProtos { return false; } } + if (hasCondition()) { + if (!getCondition().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -28129,6 +28185,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeUInt64(2, nonceGroup_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(3, condition_); + } getUnknownFields().writeTo(output); } @@ -28146,6 +28205,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(2, nonceGroup_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(3, condition_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -28176,6 +28239,11 @@ public final class ClientProtos { result = result && (getNonceGroup() == other.getNonceGroup()); } + result = result && (hasCondition() == other.hasCondition()); + if (hasCondition()) { + result = result && getCondition() + .equals(other.getCondition()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -28197,6 +28265,10 @@ public final class ClientProtos { hash = (37 * hash) + NONCEGROUP_FIELD_NUMBER; hash = (53 * hash) + hashLong(getNonceGroup()); } + if (hasCondition()) { + hash = (37 * hash) + CONDITION_FIELD_NUMBER; + hash = (53 * hash) + getCondition().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -28308,6 +28380,7 @@ public final class ClientProtos { private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getRegionActionFieldBuilder(); + getConditionFieldBuilder(); } } private static Builder create() { @@ -28324,6 +28397,12 @@ public final class ClientProtos { } nonceGroup_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); + if (conditionBuilder_ == null) { + condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance(); + } else { + conditionBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -28365,6 +28444,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000001; } result.nonceGroup_ = nonceGroup_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + if (conditionBuilder_ == null) { + result.condition_ = condition_; + } else { + result.condition_ = conditionBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -28410,6 +28497,9 @@ public final class ClientProtos { if (other.hasNonceGroup()) { setNonceGroup(other.getNonceGroup()); } + if (other.hasCondition()) { + mergeCondition(other.getCondition()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -28421,6 +28511,12 @@ public final class ClientProtos { return false; } } + if (hasCondition()) { + if (!getCondition().isInitialized()) { + + return false; + } + } return true; } @@ -28716,6 +28812,123 @@ public final class ClientProtos { return this; } + // optional .Condition condition = 3; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder> conditionBuilder_; + /** + * optional .Condition condition = 3; + */ + public boolean hasCondition() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .Condition condition = 3; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() { + if (conditionBuilder_ == null) { + return condition_; + } else { + return conditionBuilder_.getMessage(); + } + } + /** + * optional .Condition condition = 3; + */ + public Builder setCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) { + if (conditionBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + condition_ = value; + onChanged(); + } else { + conditionBuilder_.setMessage(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .Condition condition = 3; + */ + public Builder setCondition( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder builderForValue) { + if (conditionBuilder_ == null) { + condition_ = builderForValue.build(); + onChanged(); + } else { + conditionBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .Condition condition = 3; + */ + public Builder mergeCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) { + if (conditionBuilder_ == null) { + if (((bitField0_ & 0x00000004) == 0x00000004) && + condition_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance()) { + condition_ = + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.newBuilder(condition_).mergeFrom(value).buildPartial(); + } else { + condition_ = value; + } + onChanged(); + } else { + conditionBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .Condition condition = 3; + */ + public Builder clearCondition() { + if (conditionBuilder_ == null) { + condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance(); + onChanged(); + } else { + conditionBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + /** + * optional .Condition condition = 3; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder getConditionBuilder() { + bitField0_ |= 0x00000004; + onChanged(); + return getConditionFieldBuilder().getBuilder(); + } + /** + * optional .Condition condition = 3; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() { + if (conditionBuilder_ != null) { + return conditionBuilder_.getMessageOrBuilder(); + } else { + return condition_; + } + } + /** + * optional .Condition condition = 3; + */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder> + getConditionFieldBuilder() { + if (conditionBuilder_ == null) { + conditionBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder>( + condition_, + getParentForChildren(), + isClean()); + condition_ = null; + } + return conditionBuilder_; + } + // @@protoc_insertion_point(builder_scope:MultiRequest) } @@ -28754,6 +28967,24 @@ public final class ClientProtos { */ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResultOrBuilder getRegionActionResultOrBuilder( int index); + + // optional bool processed = 2; + /** + * optional bool processed = 2; + * + *
+     * used for mutate to indicate processed only
+     * 
+ */ + boolean hasProcessed(); + /** + * optional bool processed = 2; + * + *
+     * used for mutate to indicate processed only
+     * 
+ */ + boolean getProcessed(); } /** * Protobuf type {@code MultiResponse} @@ -28814,6 +29045,11 @@ public final class ClientProtos { regionActionResult_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult.PARSER, extensionRegistry)); break; } + case 16: { + bitField0_ |= 0x00000001; + processed_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -28856,6 +29092,7 @@ public final class ClientProtos { return PARSER; } + private int bitField0_; // repeated .RegionActionResult regionActionResult = 1; public static final int REGIONACTIONRESULT_FIELD_NUMBER = 1; private java.util.List regionActionResult_; @@ -28892,8 +29129,33 @@ public final class ClientProtos { return regionActionResult_.get(index); } + // optional bool processed = 2; + public static final int PROCESSED_FIELD_NUMBER = 2; + private boolean processed_; + /** + * optional bool processed = 2; + * + *
+     * used for mutate to indicate processed only
+     * 
+ */ + public boolean hasProcessed() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional bool processed = 2; + * + *
+     * used for mutate to indicate processed only
+     * 
+ */ + public boolean getProcessed() { + return processed_; + } + private void initFields() { regionActionResult_ = java.util.Collections.emptyList(); + processed_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -28916,6 +29178,9 @@ public final class ClientProtos { for (int i = 0; i < regionActionResult_.size(); i++) { output.writeMessage(1, regionActionResult_.get(i)); } + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBool(2, processed_); + } getUnknownFields().writeTo(output); } @@ -28929,6 +29194,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(1, regionActionResult_.get(i)); } + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(2, processed_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -28954,6 +29223,11 @@ public final class ClientProtos { boolean result = true; result = result && getRegionActionResultList() .equals(other.getRegionActionResultList()); + result = result && (hasProcessed() == other.hasProcessed()); + if (hasProcessed()) { + result = result && (getProcessed() + == other.getProcessed()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -28971,6 +29245,10 @@ public final class ClientProtos { hash = (37 * hash) + REGIONACTIONRESULT_FIELD_NUMBER; hash = (53 * hash) + getRegionActionResultList().hashCode(); } + if (hasProcessed()) { + hash = (37 * hash) + PROCESSED_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getProcessed()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -29087,6 +29365,8 @@ public final class ClientProtos { } else { regionActionResultBuilder_.clear(); } + processed_ = false; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -29114,6 +29394,7 @@ public final class ClientProtos { public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse buildPartial() { org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse(this); int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; if (regionActionResultBuilder_ == null) { if (((bitField0_ & 0x00000001) == 0x00000001)) { regionActionResult_ = java.util.Collections.unmodifiableList(regionActionResult_); @@ -29123,6 +29404,11 @@ public final class ClientProtos { } else { result.regionActionResult_ = regionActionResultBuilder_.build(); } + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000001; + } + result.processed_ = processed_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -29164,6 +29450,9 @@ public final class ClientProtos { } } } + if (other.hasProcessed()) { + setProcessed(other.getProcessed()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -29437,6 +29726,55 @@ public final class ClientProtos { return regionActionResultBuilder_; } + // optional bool processed = 2; + private boolean processed_ ; + /** + * optional bool processed = 2; + * + *
+       * used for mutate to indicate processed only
+       * 
+ */ + public boolean hasProcessed() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bool processed = 2; + * + *
+       * used for mutate to indicate processed only
+       * 
+ */ + public boolean getProcessed() { + return processed_; + } + /** + * optional bool processed = 2; + * + *
+       * used for mutate to indicate processed only
+       * 
+ */ + public Builder setProcessed(boolean value) { + bitField0_ |= 0x00000002; + processed_ = value; + onChanged(); + return this; + } + /** + * optional bool processed = 2; + * + *
+       * used for mutate to indicate processed only
+       * 
+ */ + public Builder clearProcessed() { + bitField0_ = (bitField0_ & ~0x00000002); + processed_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:MultiResponse) } @@ -30277,20 +30615,22 @@ public final class ClientProtos { "air\0221\n\016service_result\030\004 \001(\0132\031.Coprocesso" + "rServiceResult\"f\n\022RegionActionResult\022-\n\021" + "resultOrException\030\001 \003(\0132\022.ResultOrExcept" + - "ion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"G" + + "ion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"f" + "\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r.R", - "egionAction\022\022\n\nnonceGroup\030\002 \001(\004\"@\n\rMulti" + - "Response\022/\n\022regionActionResult\030\001 \003(\0132\023.R" + - "egionActionResult2\261\002\n\rClientService\022 \n\003G" + - "et\022\013.GetRequest\032\014.GetResponse\022)\n\006Mutate\022" + - "\016.MutateRequest\032\017.MutateResponse\022#\n\004Scan" + - "\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBulkLoa" + - "dHFile\022\025.BulkLoadHFileRequest\032\026.BulkLoad" + - "HFileResponse\022F\n\013ExecService\022\032.Coprocess" + - "orServiceRequest\032\033.CoprocessorServiceRes" + - "ponse\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRes", - "ponseBB\n*org.apache.hadoop.hbase.protobu" + - "f.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "egionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondi" + + "tion\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse\022" + + "/\n\022regionActionResult\030\001 \003(\0132\023.RegionActi" + + "onResult\022\021\n\tprocessed\030\002 \001(\0102\261\002\n\rClientSe" + + "rvice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022" + + ")\n\006Mutate\022\016.MutateRequest\032\017.MutateRespon" + + "se\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse\022" + + ">\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest\032" + + "\026.BulkLoadHFileResponse\022F\n\013ExecService\022\032" + + ".CoprocessorServiceRequest\032\033.Coprocessor", + "ServiceResponse\022&\n\005Multi\022\r.MultiRequest\032" + + "\016.MultiResponseBB\n*org.apache.hadoop.hba" + + "se.protobuf.generatedB\014ClientProtosH\001\210\001\001" + + "\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -30464,13 +30804,13 @@ public final class ClientProtos { internal_static_MultiRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiRequest_descriptor, - new java.lang.String[] { "RegionAction", "NonceGroup", }); + new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", }); internal_static_MultiResponse_descriptor = getDescriptor().getMessageTypes().get(25); internal_static_MultiResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiResponse_descriptor, - new java.lang.String[] { "RegionActionResult", }); + new java.lang.String[] { "RegionActionResult", "Processed", }); return null; } }; diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 9f8affd..537c1d4 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -372,10 +372,13 @@ message RegionActionResult { message MultiRequest { repeated RegionAction regionAction = 1; optional uint64 nonceGroup = 2; + optional Condition condition = 3; } message MultiResponse { repeated RegionActionResult regionActionResult = 1; + // used for mutate to indicate processed only + optional bool processed = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 80ef9d5..e23a75a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import com.google.protobuf.Descriptors; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CoprocessorClassLoader; @@ -71,7 +73,6 @@ import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.io.MultipleIOException; -import com.google.protobuf.Descriptors.ServiceDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; @@ -633,6 +634,12 @@ public abstract class CoprocessorHost { table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback); } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { + return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } } /** The coprocessor */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 8b3954b..01dda60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2786,6 +2786,89 @@ public class HRegion implements HeapSize { // , Writable{ } } + //TODO, Think that gets/puts and deletes should be refactored a bit so that + //the getting of the lock happens before, so that you would just pass it into + //the methods. So in the case of checkAndMutate you could just do lockRow, + //get, put, unlockRow or something + /** + * + * @throws IOException + * @return true if the new put was executed, false otherwise + */ + public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, + boolean writeToWAL) + throws IOException{ + checkReadOnly(); + //TODO, add check for value length or maybe even better move this to the + //client if this becomes a global setting + checkResources(); + + startRegionOperation(); + try { + Get get = new Get(row); + checkFamily(family); + get.addColumn(family, qualifier); + + // Lock row - note that doBatchMutate will relock this row if called + RowLock rowLock = getRowLock(get.getRow()); + // wait for all previous transactions to complete (with lock held) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + try { + List result = get(get, false); + + boolean valueIsNull = comparator.getValue() == null || + comparator.getValue().length == 0; + boolean matches = false; + if (result.size() == 0 && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && + valueIsNull) { + matches = true; + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + int compareResult = comparator.compareTo(kv.getValueArray(), + kv.getValueOffset(), kv.getValueLength()); + switch (compareOp) { + case LESS: + matches = compareResult < 0; + break; + case LESS_OR_EQUAL: + matches = compareResult <= 0; + break; + case EQUAL: + matches = compareResult == 0; + break; + case NOT_EQUAL: + matches = compareResult != 0; + break; + case GREATER_OR_EQUAL: + matches = compareResult >= 0; + break; + case GREATER: + matches = compareResult > 0; + break; + default: + throw new RuntimeException("Unknown Compare op " + compareOp.name()); + } + } + //If matches put the new put or delete the new delete + if (matches) { + // All edits for the given row (across all column families) must + // happen atomically. + mutateRow(rm); + this.checkAndMutateChecksPassed.increment(); + return true; + } + this.checkAndMutateChecksFailed.increment(); + return false; + } finally { + rowLock.release(); + } + } finally { + closeRegionOperation(); + } + } private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException { // Currently this is only called for puts and deletes, so no nonces. OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 764529c..fbede44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.rest.Constants; @@ -815,4 +816,10 @@ public class RemoteHTable implements HTableInterface { throws ServiceException, Throwable { throw new UnsupportedOperationException("batchCoprocessorService not implemented"); } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + throw new UnsupportedOperationException("checkAndMutate not implemented"); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java new file mode 100644 index 0000000..1203e1f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +@Category(MediumTests.class) +public class TestCheckAndMutate { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCheckAndMutate() throws Exception { + final TableName tableName = TableName.valueOf("TestPutWithDelete"); + final byte[] rowKey = Bytes.toBytes("12345"); + final byte[] family = Bytes.toBytes("cf"); + HTable table = TEST_UTIL.createTable(tableName, family); + TEST_UTIL.waitTableAvailable(tableName.getName(), 5000); + try { + // put one row + Put put = new Put(rowKey); + put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.add(family, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put); + // get row back and assert the values + Get get = new Get(rowKey); + Result result = table.get(get); + assertTrue("Column A value should be a", + Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a")); + assertTrue("Column B value should be b", + Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b")); + assertTrue("Column C value should be c", + Bytes.toString(result.getValue(family, Bytes.toBytes("C"))).equals("c")); + + // put the same row again with C column deleted + RowMutations rm = new RowMutations(rowKey); + put = new Put(rowKey); + put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b")); + rm.add(put); + Delete del = new Delete(rowKey); + del.deleteColumn(family, Bytes.toBytes("C")); + rm.add(del); + boolean res = table.checkAndMutate(rowKey, family, Bytes.toBytes("A"), CompareFilter.CompareOp.EQUAL, + Bytes.toBytes("a"), rm); + assertTrue(res); + + // get row back and assert the values + get = new Get(rowKey); + result = table.get(get); + assertTrue("Column A value should be a", + Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a")); + assertTrue("Column B value should be b", + Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b")); + assertTrue("Column C should not exist", + result.getValue(family, Bytes.toBytes("C")) == null); + } finally { + table.close(); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index c4b9f33..a26aeb5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1222,7 +1222,7 @@ public class TestHRegion { Delete delete = new Delete(row1); delete.deleteFamily(fam1); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2), - delete, true); + put, true); assertEquals(false, res); } finally { HRegion.closeHRegion(this.region); -- 1.7.12.4 (Apple Git-37)