diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index 02106d3..2e0c05e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -678,9 +678,10 @@ public class AggregationClient { final AggregateArgument.Builder requestBuilder = AggregateArgument.newBuilder(); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); - if (ci.columnInterpreterSpecificData() != null) { - requestBuilder.setInterpreterSpecificBytes( - ci.columnInterpreterSpecificData()); + ByteString columnInterpreterSpecificData = null; + if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData()) + != null) { + requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData); } requestBuilder.setScan(ProtobufUtil.toScan(scan)); return requestBuilder.build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java new file mode 100644 index 0000000..de1d853 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument; +import org.apache.hadoop.hbase.regionserver.RowProcessor; + +import com.google.protobuf.ByteString; + +public class RowProcessorClient { + public static RowProcessorArgument getRowProcessorPB(RowProcessor r) + throws IOException { + final RowProcessorArgument.Builder requestBuilder = + RowProcessorArgument.newBuilder(); + requestBuilder.setRowProcessorClassName(r.getClass().getName()); + ByteString rowProcessorSpecificData = null; + if ((rowProcessorSpecificData = r.rowProcessorSpecificData()) != null) { + requestBuilder.setRowProcessorSpecificBytes(rowProcessorSpecificData); + } + return requestBuilder.build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java index 9ee63ff..52f2514 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java @@ -21,18 +21,28 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RowProcessor; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + /** * This class demonstrates how to implement atomic read-modify-writes * using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor - implements RowProcessorProtocol { - +public abstract class BaseRowProcessorEndpoint extends RowProcessorService +implements CoprocessorService, Coprocessor { + private RegionCoprocessorEnvironment env; /** * Pass a processor to HRegion to process multiple rows atomically. * @@ -42,16 +52,75 @@ public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor * * See {@link TestRowProcessorEndpoint} for example. * - * @param processor The object defines the read-modify-write procedure + * The request argument contains the processor. The processor object defines + * the read-modify-write procedure * @return The processing result */ @Override - public T process(RowProcessor processor) - throws IOException { - HRegion region = - ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); - region.processRowsWithLocks(processor); - return processor.getResult(); + public void process(RpcController controller, RowProcessorArgument request, + RpcCallback done) { + RowProcessorResult resultProto = null; + try { + RowProcessor processor = constructRowProcessorFromRequest(request); + HRegion region = env.getRegion(); + region.processRowsWithLocks(processor); + T result = processor.getResult(); + RowProcessorResult.Builder b = RowProcessorResult.newBuilder(); + b.setRowProcessorResult(processor.getProtoForResultType(result)); + resultProto = b.build(); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + } + done.run(resultProto); + } + + @Override + public Service getService() { + return this; + } + + /** + * Stores a reference to the coprocessor environment provided by the + * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this + * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded + * on a table region, so always expects this to be an instance of + * {@link RegionCoprocessorEnvironment}. + * @param env the environment provided by the coprocessor host + * @throws IOException if the provided environment is not an instance of + * {@code RegionCoprocessorEnvironment} + */ + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // nothing to do } + RowProcessor constructRowProcessorFromRequest(RowProcessorArgument request) + throws IOException { + String className = request.getRowProcessorClassName(); + Class cls; + try { + cls = Class.forName(className); + @SuppressWarnings("unchecked") + RowProcessor ci = (RowProcessor) cls.newInstance(); + if (request.hasRowProcessorSpecificBytes()) { + ci.initialize(request.getRowProcessorSpecificBytes()); + } + return ci; + } catch (ClassNotFoundException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java index 8db8093..d74929c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java @@ -119,7 +119,7 @@ public interface ColumnInterpreter { /** * This method should return any additional data that is needed on the * server side to construct the ColumnInterpreter. The server - * will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)} + * will pass this to the {@link #initialize(ByteString)} * method. If there is no ColumnInterpreter specific data (for e.g., * {@link LongColumnInterpreter}) then null should be returned. * @return the PB message @@ -161,4 +161,4 @@ public interface ColumnInterpreter { * @return cast */ T castToCellType(S response); -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java deleted file mode 100644 index c670c39..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.coprocessor; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RowProcessor; - -/** - * Defines a protocol to perform multi row transactions. - * See {@link BaseRowProcessorEndpoint} for the implementation. - * See {@link HRegion#processRowsWithLocks()} for detials. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface RowProcessorProtocol extends CoprocessorProtocol { - - /** - * @param processor The processor defines how to process the row - */ - T process(RowProcessor processor) throws IOException; -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 66b0ecf..34c3b21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -948,6 +949,20 @@ public final class ProtobufUtil { builder.addAllKeyValueBytes(protos); return builder.build(); } + /** + * Get bytes from ByteString + */ + public static byte[] getBytesFromByteString(ByteString bs) { + ByteBuffer bb = bs.asReadOnlyByteBuffer(); + bb.rewind(); + byte[] bytes; + if (bb.hasArray()) { + bytes = bb.array(); + } else { + bytes = bs.toByteArray(); + } + return bytes; + } /** * Convert a protocol buffer Result to a client Result diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java new file mode 100644 index 0000000..a2fc230 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RowProcessorProtos.java @@ -0,0 +1,1158 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: RowProcessor.proto + +package org.apache.hadoop.hbase.protobuf.generated; + +public final class RowProcessorProtos { + private RowProcessorProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface RowProcessorArgumentOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string rowProcessorClassName = 1; + boolean hasRowProcessorClassName(); + String getRowProcessorClassName(); + + // optional bytes rowProcessorSpecificBytes = 2; + boolean hasRowProcessorSpecificBytes(); + com.google.protobuf.ByteString getRowProcessorSpecificBytes(); + } + public static final class RowProcessorArgument extends + com.google.protobuf.GeneratedMessage + implements RowProcessorArgumentOrBuilder { + // Use RowProcessorArgument.newBuilder() to construct. + private RowProcessorArgument(Builder builder) { + super(builder); + } + private RowProcessorArgument(boolean noInit) {} + + private static final RowProcessorArgument defaultInstance; + public static RowProcessorArgument getDefaultInstance() { + return defaultInstance; + } + + public RowProcessorArgument getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorArgument_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorArgument_fieldAccessorTable; + } + + private int bitField0_; + // required string rowProcessorClassName = 1; + public static final int ROWPROCESSORCLASSNAME_FIELD_NUMBER = 1; + private java.lang.Object rowProcessorClassName_; + public boolean hasRowProcessorClassName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getRowProcessorClassName() { + java.lang.Object ref = rowProcessorClassName_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + rowProcessorClassName_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getRowProcessorClassNameBytes() { + java.lang.Object ref = rowProcessorClassName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + rowProcessorClassName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional bytes rowProcessorSpecificBytes = 2; + public static final int ROWPROCESSORSPECIFICBYTES_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString rowProcessorSpecificBytes_; + public boolean hasRowProcessorSpecificBytes() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getRowProcessorSpecificBytes() { + return rowProcessorSpecificBytes_; + } + + private void initFields() { + rowProcessorClassName_ = ""; + rowProcessorSpecificBytes_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasRowProcessorClassName()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getRowProcessorClassNameBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, rowProcessorSpecificBytes_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getRowProcessorClassNameBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, rowProcessorSpecificBytes_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument other = (org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument) obj; + + boolean result = true; + result = result && (hasRowProcessorClassName() == other.hasRowProcessorClassName()); + if (hasRowProcessorClassName()) { + result = result && getRowProcessorClassName() + .equals(other.getRowProcessorClassName()); + } + result = result && (hasRowProcessorSpecificBytes() == other.hasRowProcessorSpecificBytes()); + if (hasRowProcessorSpecificBytes()) { + result = result && getRowProcessorSpecificBytes() + .equals(other.getRowProcessorSpecificBytes()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasRowProcessorClassName()) { + hash = (37 * hash) + ROWPROCESSORCLASSNAME_FIELD_NUMBER; + hash = (53 * hash) + getRowProcessorClassName().hashCode(); + } + if (hasRowProcessorSpecificBytes()) { + hash = (37 * hash) + ROWPROCESSORSPECIFICBYTES_FIELD_NUMBER; + hash = (53 * hash) + getRowProcessorSpecificBytes().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgumentOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorArgument_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorArgument_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + rowProcessorClassName_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + rowProcessorSpecificBytes_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument build() { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument result = new org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.rowProcessorClassName_ = rowProcessorClassName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.rowProcessorSpecificBytes_ = rowProcessorSpecificBytes_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.getDefaultInstance()) return this; + if (other.hasRowProcessorClassName()) { + setRowProcessorClassName(other.getRowProcessorClassName()); + } + if (other.hasRowProcessorSpecificBytes()) { + setRowProcessorSpecificBytes(other.getRowProcessorSpecificBytes()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasRowProcessorClassName()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + rowProcessorClassName_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + rowProcessorSpecificBytes_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required string rowProcessorClassName = 1; + private java.lang.Object rowProcessorClassName_ = ""; + public boolean hasRowProcessorClassName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getRowProcessorClassName() { + java.lang.Object ref = rowProcessorClassName_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + rowProcessorClassName_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setRowProcessorClassName(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + rowProcessorClassName_ = value; + onChanged(); + return this; + } + public Builder clearRowProcessorClassName() { + bitField0_ = (bitField0_ & ~0x00000001); + rowProcessorClassName_ = getDefaultInstance().getRowProcessorClassName(); + onChanged(); + return this; + } + void setRowProcessorClassName(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + rowProcessorClassName_ = value; + onChanged(); + } + + // optional bytes rowProcessorSpecificBytes = 2; + private com.google.protobuf.ByteString rowProcessorSpecificBytes_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasRowProcessorSpecificBytes() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public com.google.protobuf.ByteString getRowProcessorSpecificBytes() { + return rowProcessorSpecificBytes_; + } + public Builder setRowProcessorSpecificBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + rowProcessorSpecificBytes_ = value; + onChanged(); + return this; + } + public Builder clearRowProcessorSpecificBytes() { + bitField0_ = (bitField0_ & ~0x00000002); + rowProcessorSpecificBytes_ = getDefaultInstance().getRowProcessorSpecificBytes(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RowProcessorArgument) + } + + static { + defaultInstance = new RowProcessorArgument(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RowProcessorArgument) + } + + public interface RowProcessorResultOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes rowProcessorResult = 1; + boolean hasRowProcessorResult(); + com.google.protobuf.ByteString getRowProcessorResult(); + } + public static final class RowProcessorResult extends + com.google.protobuf.GeneratedMessage + implements RowProcessorResultOrBuilder { + // Use RowProcessorResult.newBuilder() to construct. + private RowProcessorResult(Builder builder) { + super(builder); + } + private RowProcessorResult(boolean noInit) {} + + private static final RowProcessorResult defaultInstance; + public static RowProcessorResult getDefaultInstance() { + return defaultInstance; + } + + public RowProcessorResult getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorResult_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorResult_fieldAccessorTable; + } + + private int bitField0_; + // required bytes rowProcessorResult = 1; + public static final int ROWPROCESSORRESULT_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString rowProcessorResult_; + public boolean hasRowProcessorResult() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getRowProcessorResult() { + return rowProcessorResult_; + } + + private void initFields() { + rowProcessorResult_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasRowProcessorResult()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, rowProcessorResult_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, rowProcessorResult_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult other = (org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult) obj; + + boolean result = true; + result = result && (hasRowProcessorResult() == other.hasRowProcessorResult()); + if (hasRowProcessorResult()) { + result = result && getRowProcessorResult() + .equals(other.getRowProcessorResult()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasRowProcessorResult()) { + hash = (37 * hash) + ROWPROCESSORRESULT_FIELD_NUMBER; + hash = (53 * hash) + getRowProcessorResult().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResultOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorResult_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.internal_static_RowProcessorResult_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + rowProcessorResult_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult build() { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult result = new org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.rowProcessorResult_ = rowProcessorResult_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance()) return this; + if (other.hasRowProcessorResult()) { + setRowProcessorResult(other.getRowProcessorResult()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasRowProcessorResult()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + rowProcessorResult_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required bytes rowProcessorResult = 1; + private com.google.protobuf.ByteString rowProcessorResult_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasRowProcessorResult() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getRowProcessorResult() { + return rowProcessorResult_; + } + public Builder setRowProcessorResult(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + rowProcessorResult_ = value; + onChanged(); + return this; + } + public Builder clearRowProcessorResult() { + bitField0_ = (bitField0_ & ~0x00000001); + rowProcessorResult_ = getDefaultInstance().getRowProcessorResult(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RowProcessorResult) + } + + static { + defaultInstance = new RowProcessorResult(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RowProcessorResult) + } + + public static abstract class RowProcessorService + implements com.google.protobuf.Service { + protected RowProcessorService() {} + + public interface Interface { + public abstract void process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request, + com.google.protobuf.RpcCallback done); + + } + + public static com.google.protobuf.Service newReflectiveService( + final Interface impl) { + return new RowProcessorService() { + @java.lang.Override + public void process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request, + com.google.protobuf.RpcCallback done) { + impl.process(controller, request, done); + } + + }; + } + + public static com.google.protobuf.BlockingService + newReflectiveBlockingService(final BlockingInterface impl) { + return new com.google.protobuf.BlockingService() { + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final com.google.protobuf.Message callBlockingMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request) + throws com.google.protobuf.ServiceException { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callBlockingMethod() given method descriptor for " + + "wrong service type."); + } + switch(method.getIndex()) { + case 0: + return impl.process(controller, (org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument)request); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getRequestPrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getRequestPrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getResponsePrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getResponsePrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + }; + } + + public abstract void process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request, + com.google.protobuf.RpcCallback done); + + public static final + com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.getDescriptor().getServices().get(0); + } + public final com.google.protobuf.Descriptors.ServiceDescriptor + getDescriptorForType() { + return getDescriptor(); + } + + public final void callMethod( + com.google.protobuf.Descriptors.MethodDescriptor method, + com.google.protobuf.RpcController controller, + com.google.protobuf.Message request, + com.google.protobuf.RpcCallback< + com.google.protobuf.Message> done) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.callMethod() given method descriptor for wrong " + + "service type."); + } + switch(method.getIndex()) { + case 0: + this.process(controller, (org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getRequestPrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getRequestPrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public final com.google.protobuf.Message + getResponsePrototype( + com.google.protobuf.Descriptors.MethodDescriptor method) { + if (method.getService() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "Service.getResponsePrototype() given method " + + "descriptor for wrong service type."); + } + switch(method.getIndex()) { + case 0: + return org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance(); + default: + throw new java.lang.AssertionError("Can't get here."); + } + } + + public static Stub newStub( + com.google.protobuf.RpcChannel channel) { + return new Stub(channel); + } + + public static final class Stub extends org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService implements Interface { + private Stub(com.google.protobuf.RpcChannel channel) { + this.channel = channel; + } + + private final com.google.protobuf.RpcChannel channel; + + public com.google.protobuf.RpcChannel getChannel() { + return channel; + } + + public void process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(0), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.class, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance())); + } + } + + public static BlockingInterface newBlockingStub( + com.google.protobuf.BlockingRpcChannel channel) { + return new BlockingStub(channel); + } + + public interface BlockingInterface { + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request) + throws com.google.protobuf.ServiceException; + } + + private static final class BlockingStub implements BlockingInterface { + private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { + this.channel = channel; + } + + private final com.google.protobuf.BlockingRpcChannel channel; + + public org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult process( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult) channel.callBlockingMethod( + getDescriptor().getMethods().get(0), + controller, + request, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.getDefaultInstance()); + } + + } + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RowProcessorArgument_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RowProcessorArgument_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RowProcessorResult_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RowProcessorResult_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\022RowProcessor.proto\"X\n\024RowProcessorArgu" + + "ment\022\035\n\025rowProcessorClassName\030\001 \002(\t\022!\n\031r" + + "owProcessorSpecificBytes\030\002 \001(\014\"0\n\022RowPro" + + "cessorResult\022\032\n\022rowProcessorResult\030\001 \002(\014" + + "2L\n\023RowProcessorService\0225\n\007process\022\025.Row" + + "ProcessorArgument\032\023.RowProcessorResultBH" + + "\n*org.apache.hadoop.hbase.protobuf.gener" + + "atedB\022RowProcessorProtosH\001\210\001\001\240\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_RowProcessorArgument_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_RowProcessorArgument_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RowProcessorArgument_descriptor, + new java.lang.String[] { "RowProcessorClassName", "RowProcessorSpecificBytes", }, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.class, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument.Builder.class); + internal_static_RowProcessorResult_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_RowProcessorResult_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RowProcessorResult_descriptor, + new java.lang.String[] { "RowProcessorResult", }, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.class, + org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index c962bef..b15e5a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import com.google.protobuf.ByteString; + /** * A MultiRowProcessor that performs multiple puts and deletes. */ @@ -123,4 +125,22 @@ class MultiRowMutationProcessor extends BaseRowProcessor { } } + @Override + public ByteString rowProcessorSpecificData() { + return null; + } + + @Override + public void initialize(ByteString bytes) { + } + + @Override + public ByteString getProtoForResultType(Void t) { + return null; + } + + @Override + public Void parseResponseAsResultType(byte[] response) throws IOException { + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 4be0cd3..3bb7d66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import com.google.protobuf.ByteString; + @InterfaceAudience.Public @InterfaceStability.Evolving @@ -108,4 +110,37 @@ public interface RowProcessor { * @return The name of the processor */ String getName(); -} + + /** + * This method should return any additional data that is needed on the + * server side to construct the RowProcessor. The server will pass this to + * the {@link #initialize(ByteString)} method. If there is no RowProcessor + * specific data then null should be returned. + * @return the PB message + * @throws IOException + */ + ByteString rowProcessorSpecificData() throws IOException; + + /** + * This method should initialize any field(s) of the RowProcessor with + * a parsing of the passed message bytes (used on the server side). + * @param bytes + * @throws IOException + */ + void initialize(ByteString bytes) throws IOException; + + /** + * This method should return the PB bytes for the result type instance + * @param t the type of the result + * @throws IOException + */ + ByteString getProtoForResultType(T t) throws IOException; + + /** + * Converts the bytes in the server's response to the expected type S + * @param response + * @return response of type T constructed from the message + * @throws IOException + */ + T parseResponseAsResultType(byte[] response) throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/protobuf/RowProcessor.proto b/hbase-server/src/main/protobuf/RowProcessor.proto new file mode 100644 index 0000000..4f78b94 --- /dev/null +++ b/hbase-server/src/main/protobuf/RowProcessor.proto @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines a protocol to perform multi row transactions. + * See BaseRowProcessorEndpoint for the implementation. + * See HRegion#processRowsWithLocks() for details. + */ +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "RowProcessorProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message RowProcessorArgument { + required string rowProcessorClassName = 1; + optional bytes rowProcessorSpecificBytes = 2; +} + +message RowProcessorResult { + required bytes rowProcessorResult = 1; +} + +service RowProcessorService { + rpc process (RowProcessorArgument) returns (RowProcessorResult); +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 426a586..c4f5e83 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.coprocessor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataOutput; +import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,27 +40,32 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorArgument; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult; +import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.protobuf.ByteString; import com.sun.org.apache.commons.logging.Log; import com.sun.org.apache.commons.logging.LogFactory; @@ -100,7 +107,7 @@ public class TestRowProcessorEndpoint { @BeforeClass public static void setupBeforeClass() throws Exception { Configuration conf = util.getConfiguration(); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, RowProcessorEndpoint.class.getName()); conf.setInt("hbase.client.retries.number", 1); conf.setLong("hbase.hregion.row.processor.timeout", 1000L); @@ -138,11 +145,16 @@ public class TestRowProcessorEndpoint { @Test public void testDoubleScan() throws Throwable { prepareTestData(); - RowProcessorProtocol protocol = - table.coprocessorProxy(RowProcessorProtocol.class, ROW); + + CoprocessorRpcChannel channel = table.coprocessorService(ROW); RowProcessorEndpoint.FriendsOfFriendsProcessor processor = new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); - Set result = protocol.process(processor); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + RowProcessorArgument request = RowProcessorClient.getRowProcessorPB(processor); + RowProcessorResult protoResult = service.process(null, request); + Set result = processor.parseResponseAsResultType( + ProtobufUtil.getBytesFromByteString(protoResult.getRowProcessorResult())); Set expected = new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); @@ -176,12 +188,16 @@ public class TestRowProcessorEndpoint { } private int incrementCounter(HTable table) throws Throwable { - RowProcessorProtocol protocol = - table.coprocessorProxy(RowProcessorProtocol.class, ROW); + CoprocessorRpcChannel channel = table.coprocessorService(ROW); RowProcessorEndpoint.IncrementCounterProcessor processor = new RowProcessorEndpoint.IncrementCounterProcessor(ROW); - int counterValue = protocol.process(processor); - return counterValue; + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + RowProcessorArgument request = RowProcessorClient.getRowProcessorPB(processor); + RowProcessorResult protoResult = service.process(null, request); + Integer result = processor.parseResponseAsResultType( + ProtobufUtil.getBytesFromByteString(protoResult.getRowProcessorResult())); + return result; } private void concurrentExec( @@ -234,23 +250,27 @@ public class TestRowProcessorEndpoint { } private void swapRows(HTable table) throws Throwable { - RowProcessorProtocol protocol = - table.coprocessorProxy(RowProcessorProtocol.class, ROW); + CoprocessorRpcChannel channel = table.coprocessorService(ROW); RowProcessorEndpoint.RowSwapProcessor processor = new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); - protocol.process(processor); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + RowProcessorArgument request = RowProcessorClient.getRowProcessorPB(processor); + service.process(null, request); } @Test public void testTimeout() throws Throwable { prepareTestData(); - RowProcessorProtocol protocol = - table.coprocessorProxy(RowProcessorProtocol.class, ROW); + CoprocessorRpcChannel channel = table.coprocessorService(ROW); RowProcessorEndpoint.TimeoutProcessor processor = new RowProcessorEndpoint.TimeoutProcessor(ROW); + RowProcessorService.BlockingInterface service = + RowProcessorService.newBlockingStub(channel); + RowProcessorArgument request = RowProcessorClient.getRowProcessorPB(processor); boolean exceptionCaught = false; try { - protocol.process(processor); + service.process(null, request); } catch (Exception e) { exceptionCaught = true; } @@ -264,11 +284,10 @@ public class TestRowProcessorEndpoint { * We define the RowProcessors as the inner class of the endpoint. * So they can be loaded with the endpoint on the coprocessor. */ - public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint - implements RowProcessorProtocol { - + public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint + implements CoprocessorService { public static class IncrementCounterProcessor extends - BaseRowProcessor implements Writable { + BaseRowProcessor { int counter = 0; byte[] row = new byte[0]; @@ -330,21 +349,39 @@ public class TestRowProcessorEndpoint { } @Override - public void readFields(DataInput in) throws IOException { + public ByteString rowProcessorSpecificData() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); + Bytes.writeByteArray(out, row); + out.writeInt(counter); + return ByteString.copyFrom(out.getData()); + } + + @Override + public void initialize(ByteString bytes) throws IOException { + DataInput in = new DataInputStream(bytes.newInput()); this.row = Bytes.readByteArray(in); this.counter = in.readInt(); } @Override - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, row); - out.writeInt(counter); + public ByteString getProtoForResultType(Integer t) { + ByteBuffer bb = ByteBuffer.allocate(4).putInt(t); + bb.rewind(); + return ByteString.copyFrom(bb); + } + + @Override + public Integer parseResponseAsResultType(byte[] response) { + ByteBuffer bb = ByteBuffer.allocate(response.length); + bb.put(response); + bb.rewind(); + return bb.getInt(); } } public static class FriendsOfFriendsProcessor extends - BaseRowProcessor> implements Writable { + BaseRowProcessor> { byte[] row = null; byte[] person = null; final Set result = new HashSet(); @@ -405,7 +442,20 @@ public class TestRowProcessorEndpoint { } @Override - public void readFields(DataInput in) throws IOException { + public ByteString rowProcessorSpecificData() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); + Bytes.writeByteArray(out, person); + Bytes.writeByteArray(out, row); + out.writeInt(result.size()); + for (String s : result) { + Text.writeString(out, s); + } + return ByteString.copyFrom(out.getData()); + } + + @Override + public void initialize(ByteString bytes) throws IOException { + DataInput in = new DataInputStream(bytes.newInput()); this.person = Bytes.readByteArray(in); this.row = Bytes.readByteArray(in); int size = in.readInt(); @@ -416,18 +466,29 @@ public class TestRowProcessorEndpoint { } @Override - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, person); - Bytes.writeByteArray(out, row); - out.writeInt(result.size()); - for (String s : result) { - Text.writeString(out, s); + public ByteString getProtoForResultType(Set t) throws IOException { + DataOutputBuffer d = new DataOutputBuffer(); + d.writeInt(t.size()); + for (String str : t) { + Bytes.writeByteArray(d, str.getBytes()); + } + return ByteString.copyFrom(d.getData()); + } + + @Override + public Set parseResponseAsResultType(byte[] response) throws IOException { + DataInput in = new DataInputStream(new ByteArrayInputStream(response)); + int cardinality = in.readInt(); + Set set = new HashSet(); + for (int i = 0; i < cardinality; i++) { + set.add(new String(Bytes.readByteArray(in))); } + return set; } } public static class RowSwapProcessor extends - BaseRowProcessor> implements Writable { + BaseRowProcessor> { byte[] row1 = new byte[0]; byte[] row2 = new byte[0]; @@ -502,25 +563,50 @@ public class TestRowProcessorEndpoint { } @Override - public void readFields(DataInput in) throws IOException { - this.row1 = Bytes.readByteArray(in); - this.row2 = Bytes.readByteArray(in); + public String getName() { + return "swap"; } @Override - public void write(DataOutput out) throws IOException { + public ByteString rowProcessorSpecificData() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); Bytes.writeByteArray(out, row1); Bytes.writeByteArray(out, row2); + return ByteString.copyFrom(out.getData()); } @Override - public String getName() { - return "swap"; + public void initialize(ByteString bytes) throws IOException { + DataInput in = new DataInputStream(bytes.newInput()); + this.row1 = Bytes.readByteArray(in); + this.row2 = Bytes.readByteArray(in); + } + + @Override + public ByteString getProtoForResultType(Set t) throws IOException { + DataOutputBuffer d = new DataOutputBuffer(); + d.writeInt(t.size()); + for (String str : t) { + Bytes.writeByteArray(d, str.getBytes()); + } + return ByteString.copyFrom(d.getData()); + } + + @Override + public Set parseResponseAsResultType(byte[] response) + throws IOException { + DataInput in = new DataInputStream(new ByteArrayInputStream(response)); + int cardinality = in.readInt(); + Set set = new HashSet(); + for (int i = 0; i < cardinality; i++) { + set.add(new String(Bytes.readByteArray(in))); + } + return set; } } public static class TimeoutProcessor extends - BaseRowProcessor implements Writable { + BaseRowProcessor { byte[] row = new byte[0]; @@ -555,18 +641,31 @@ public class TestRowProcessorEndpoint { } @Override - public void readFields(DataInput in) throws IOException { - this.row = Bytes.readByteArray(in); + public String getName() { + return "timeout"; } @Override - public void write(DataOutput out) throws IOException { + public ByteString rowProcessorSpecificData() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); Bytes.writeByteArray(out, row); + return ByteString.copyFrom(out.getData()); } @Override - public String getName() { - return "timeout"; + public void initialize(ByteString bytes) throws IOException { + DataInput in = new DataInputStream(bytes.newInput()); + this.row = Bytes.readByteArray(in); + } + + @Override + public ByteString getProtoForResultType(Void t) { + return null; + } + + @Override + public Void parseResponseAsResultType(byte[] response) { + return null; } }