Index: hbase-server/src/test/protobuf/test_delayed_rpc.proto =================================================================== --- hbase-server/src/test/protobuf/test_delayed_rpc.proto (revision 0) +++ hbase-server/src/test/protobuf/test_delayed_rpc.proto (revision 0) @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; +option java_outer_classname = "TestDelayedRpcProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + + +message TestArg { + required bool delay = 1; +} + +message TestResponse { + required int32 response = 1; +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (revision 0) @@ -0,0 +1,98 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.regionserver.HRegionServer.QosFunction; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.protobuf.ByteString; +/** + * Tests that verify certain RPCs get a higher QoS. + */ +@Category(SmallTests.class) +public class TestPriorityRpc { + + @Test + public void testScanHighQos() throws IOException { + HRegionServer regionServer = + HRegionServer.constructRegionServer(HRegionServer.class, new Configuration()); + QosFunction qosFunc = regionServer.getQosFunction(); + RpcRequestBody.Builder rpcRequestBuilder = RpcRequestBody.newBuilder(); + rpcRequestBuilder.setMethodName("scan"); + + //build an empty scan request + ScanRequest.Builder scanBuilder = ScanRequest.newBuilder(); + ByteString requestBody = scanBuilder.build().toByteString(); + rpcRequestBuilder.setRequest(requestBody); + RpcRequestBody rpcRequest = rpcRequestBuilder.build(); + assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + + //build a scan request with scannerID + scanBuilder.setScannerId(12345); + requestBody = scanBuilder.build().toByteString(); + rpcRequestBuilder.setRequest(requestBody); + rpcRequestBuilder.setRequestClassName(ScanRequest.class.getCanonicalName()); + rpcRequest = rpcRequestBuilder.build(); + //mock out a high priority type handling and see the QoS returned + HRegionServer mockRS = Mockito.mock(HRegionServer.class); + RegionScanner mockRegionScanner = Mockito.mock(RegionScanner.class); + HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class); + Mockito.when(mockRS.getScanner(12345)).thenReturn(mockRegionScanner); + Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo); + Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true); + + qosFunc.setRegionServer(mockRS); + + assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.HIGH_QOS); + + //the same as above but with non-meta region + Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false); + assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + + //create a rpc request that has references to META region and also + //uses one of the known argument classes (known argument classes are + //listed in HRegionServer.QosFunction.knownArgumentClasses) + rpcRequestBuilder = RpcRequestBody.newBuilder(); + rpcRequestBuilder.setMethodName("foo"); + rpcRequestBuilder.setRequestClassName(GetRequest.class.getCanonicalName()); + GetRequest.Builder getRequestBuilder = GetRequest.newBuilder(); + RegionSpecifier.Builder regionSpecifierBuilder = RegionSpecifier.newBuilder(); + regionSpecifierBuilder.setType(RegionSpecifierType.REGION_NAME); + ByteString name = + ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); + regionSpecifierBuilder.setValue(name); + RegionSpecifier regionSpecifier = regionSpecifierBuilder.build(); + getRequestBuilder.setRegion(regionSpecifier); + Get.Builder getBuilder = Get.newBuilder(); + getBuilder.setRow(ByteString.copyFrom("somerow".getBytes())); + getRequestBuilder.setGet(getBuilder.build()); + rpcRequestBuilder.setRequest(getRequestBuilder.build().toByteString()); + rpcRequest = rpcRequestBuilder.build(); + HRegion mockRegion = Mockito.mock(HRegion.class); + Mockito.when(mockRS.getRegion(regionSpecifier)).thenReturn(mockRegion); + Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); + assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.HIGH_QOS); + + //the same as above except that the request is not using any of the + //known argument classes (it uses one random request classes) + //(known argument classes are listed in + //HRegionServer.QosFunction.knownArgumentClasses) + rpcRequestBuilder.setRequestClassName(GetOnlineRegionRequest.class.getCanonicalName()); + rpcRequest = rpcRequestBuilder.build(); + assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + } +} \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (revision 1371001) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (working copy) @@ -23,18 +23,20 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.MasterMonitorProtocol; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.protobuf.ServiceException; + @Category(MediumTests.class) public class TestHMasterRPCException { @@ -52,9 +54,11 @@ try { MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseRPC.getProxy( MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100); + inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance()); fail(); - } catch (ServerNotRunningYetException ex) { - assertTrue(ex.getMessage().startsWith( + } catch (ServiceException ex) { + IOException ie = ProtobufUtil.getRemoteException(ex); + assertTrue(ie.getMessage().startsWith( "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")); } catch (Throwable t) { fail("Unexpected throwable: " + t); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java (revision 0) @@ -0,0 +1,825 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: test_delayed_rpc.proto + +package org.apache.hadoop.hbase.ipc.protobuf.generated; + +public final class TestDelayedRpcProtos { + private TestDelayedRpcProtos() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface TestArgOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bool delay = 1; + boolean hasDelay(); + boolean getDelay(); + } + public static final class TestArg extends + com.google.protobuf.GeneratedMessage + implements TestArgOrBuilder { + // Use TestArg.newBuilder() to construct. + private TestArg(Builder builder) { + super(builder); + } + private TestArg(boolean noInit) {} + + private static final TestArg defaultInstance; + public static TestArg getDefaultInstance() { + return defaultInstance; + } + + public TestArg getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_fieldAccessorTable; + } + + private int bitField0_; + // required bool delay = 1; + public static final int DELAY_FIELD_NUMBER = 1; + private boolean delay_; + public boolean hasDelay() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getDelay() { + return delay_; + } + + private void initFields() { + delay_ = false; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasDelay()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBool(1, delay_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(1, delay_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg) obj; + + boolean result = true; + result = result && (hasDelay() == other.hasDelay()); + if (hasDelay()) { + result = result && (getDelay() + == other.getDelay()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasDelay()) { + hash = (37 * hash) + DELAY_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getDelay()); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArgOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + delay_ = false; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDescriptor(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg getDefaultInstanceForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg build() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg buildPartial() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.delay_ = delay_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg) { + return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg other) { + if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDefaultInstance()) return this; + if (other.hasDelay()) { + setDelay(other.getDelay()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasDelay()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + delay_ = input.readBool(); + break; + } + } + } + } + + private int bitField0_; + + // required bool delay = 1; + private boolean delay_ ; + public boolean hasDelay() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getDelay() { + return delay_; + } + public Builder setDelay(boolean value) { + bitField0_ |= 0x00000001; + delay_ = value; + onChanged(); + return this; + } + public Builder clearDelay() { + bitField0_ = (bitField0_ & ~0x00000001); + delay_ = false; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:TestArg) + } + + static { + defaultInstance = new TestArg(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TestArg) + } + + public interface TestResponseOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required int32 response = 1; + boolean hasResponse(); + int getResponse(); + } + public static final class TestResponse extends + com.google.protobuf.GeneratedMessage + implements TestResponseOrBuilder { + // Use TestResponse.newBuilder() to construct. + private TestResponse(Builder builder) { + super(builder); + } + private TestResponse(boolean noInit) {} + + private static final TestResponse defaultInstance; + public static TestResponse getDefaultInstance() { + return defaultInstance; + } + + public TestResponse getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_fieldAccessorTable; + } + + private int bitField0_; + // required int32 response = 1; + public static final int RESPONSE_FIELD_NUMBER = 1; + private int response_; + public boolean hasResponse() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public int getResponse() { + return response_; + } + + private void initFields() { + response_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasResponse()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt32(1, response_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, response_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse) obj; + + boolean result = true; + result = result && (hasResponse() == other.hasResponse()); + if (hasResponse()) { + result = result && (getResponse() + == other.getResponse()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasResponse()) { + hash = (37 * hash) + RESPONSE_FIELD_NUMBER; + hash = (53 * hash) + getResponse(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponseOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + response_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDescriptor(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse getDefaultInstanceForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse build() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse buildPartial() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.response_ = response_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse) { + return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse other) { + if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDefaultInstance()) return this; + if (other.hasResponse()) { + setResponse(other.getResponse()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasResponse()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + response_ = input.readInt32(); + break; + } + } + } + } + + private int bitField0_; + + // required int32 response = 1; + private int response_ ; + public boolean hasResponse() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public int getResponse() { + return response_; + } + public Builder setResponse(int value) { + bitField0_ |= 0x00000001; + response_ = value; + onChanged(); + return this; + } + public Builder clearResponse() { + bitField0_ = (bitField0_ & ~0x00000001); + response_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:TestResponse) + } + + static { + defaultInstance = new TestResponse(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TestResponse) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TestArg_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TestArg_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TestResponse_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TestResponse_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026test_delayed_rpc.proto\"\030\n\007TestArg\022\r\n\005d" + + "elay\030\001 \002(\010\" \n\014TestResponse\022\020\n\010response\030\001" + + " \002(\005BL\n.org.apache.hadoop.hbase.ipc.prot" + + "obuf.generatedB\024TestDelayedRpcProtos\210\001\001\240" + + "\001\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_TestArg_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_TestArg_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TestArg_descriptor, + new java.lang.String[] { "Delay", }, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.class, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.Builder.class); + internal_static_TestResponse_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_TestResponse_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TestResponse_descriptor, + new java.lang.String[] { "Response", }, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.class, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (revision 1371001) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (working copy) @@ -34,12 +34,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse; import org.apache.hadoop.hbase.MediumTests; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; /** * Test that delayed RPCs work. Fire up three calls, the first of which should @@ -163,7 +166,7 @@ public interface TestRpc extends VersionedProtocol { public static final long VERSION = 1L; - int test(boolean delay); + TestResponse test(TestArg delay); } private static class TestRpcImpl implements TestRpc { @@ -183,9 +186,12 @@ } @Override - public int test(final boolean delay) { + public TestResponse test(final TestArg testArg) { + boolean delay = testArg.getDelay(); + TestResponse.Builder responseBuilder = TestResponse.newBuilder(); if (!delay) { - return UNDELAYED; + responseBuilder.setResponse(UNDELAYED); + return responseBuilder.build(); } final Delayable call = HBaseServer.getCurrentCall(); call.startDelay(delayReturnValue); @@ -193,7 +199,9 @@ public void run() { try { Thread.sleep(500); - call.endDelay(delayReturnValue ? DELAYED : null); + TestResponse.Builder responseBuilder = TestResponse.newBuilder(); + call.endDelay(delayReturnValue ? + responseBuilder.setResponse(DELAYED).build() : null); } catch (Exception e) { e.printStackTrace(); } @@ -201,7 +209,8 @@ }.start(); // This value should go back to client only if the response is set // immediately at delay time. - return 0xDEADBEEF; + responseBuilder.setResponse(0xDEADBEEF); + return responseBuilder.build(); } @Override @@ -235,7 +244,9 @@ @Override public void run() { try { - Integer result = new Integer(server.test(delay)); + Integer result = + new Integer(server.test(TestArg.newBuilder() + .setDelay(delay).build()).getResponse()); if (results != null) { synchronized (results) { results.add(result); @@ -263,7 +274,7 @@ int result = 0xDEADBEEF; try { - result = client.test(false); + result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse(); } catch (Exception e) { fail("No exception should have been thrown."); } @@ -271,12 +282,13 @@ boolean caughtException = false; try { - result = client.test(true); + result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse(); } catch(Exception e) { // Exception thrown by server is enclosed in a RemoteException. - if (e.getCause().getMessage().startsWith( + if (e.getCause().getMessage().contains( "java.lang.Exception: Something went wrong")) caughtException = true; + Log.warn(e); } assertTrue(caughtException); } @@ -286,9 +298,9 @@ */ private static class FaultyTestRpc implements TestRpc { @Override - public int test(boolean delay) { - if (!delay) - return UNDELAYED; + public TestResponse test(TestArg arg) { + if (!arg.getDelay()) + return TestResponse.newBuilder().setResponse(UNDELAYED).build(); Delayable call = HBaseServer.getCurrentCall(); call.startDelay(true); try { @@ -297,7 +309,7 @@ e.printStackTrace(); } // Client will receive the Exception, not this value. - return DELAYED; + return TestResponse.newBuilder().setResponse(DELAYED).build(); } @Override Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java (revision 1371001) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java (working copy) @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.junit.Test; - -import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; - -import org.junit.experimental.categories.Category; - -/** Unit tests to test PB-based types on WritableRpcEngine. */ -@Category(SmallTests.class) -public class TestPBOnWritableRpc { - - private static Configuration conf = new Configuration(); - - public interface TestProtocol extends VersionedProtocol { - public static final long VERSION = 1L; - - String echo(String value) throws IOException; - Writable echo(Writable value) throws IOException; - - DescriptorProtos.EnumDescriptorProto exchangeProto( - DescriptorProtos.EnumDescriptorProto arg); - } - - public static class TestImpl implements TestProtocol { - public long getProtocolVersion(String protocol, long clientVersion) { - return TestProtocol.VERSION; - } - - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int hashcode) { - return new ProtocolSignature(TestProtocol.VERSION, null); - } - - @Override - public String echo(String value) throws IOException { return value; } - - @Override - public Writable echo(Writable writable) { - return writable; - } - - @Override - public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) { - return arg; - } - } - - @Test(timeout=10000) - public void testCalls() throws Exception { - testCallsInternal(conf); - } - - private void testCallsInternal(Configuration conf) throws Exception { - RpcServer rpcServer = HBaseRPC.getServer(new TestImpl(), - new Class[] {TestProtocol.class}, - "localhost", // BindAddress is IP we got for this server. - 9999, // port number - 2, // number of handlers - 0, // we dont use high priority handlers in master - conf.getBoolean("hbase.rpc.verbose", false), conf, - 0); - TestProtocol proxy = null; - try { - rpcServer.start(); - - InetSocketAddress isa = - new InetSocketAddress("localhost", 9999); - proxy = (TestProtocol) HBaseRPC.waitForProxy( - TestProtocol.class, TestProtocol.VERSION, - isa, conf, -1, 8000, 8000); - - String stringResult = proxy.echo("foo"); - assertEquals(stringResult, "foo"); - - stringResult = proxy.echo((String)null); - assertEquals(stringResult, null); - - Text utf8Result = (Text)proxy.echo(new Text("hello world")); - assertEquals(utf8Result, new Text("hello world")); - - utf8Result = (Text)proxy.echo((Text)null); - assertEquals(utf8Result, null); - - // Test protobufs - EnumDescriptorProto sendProto = - EnumDescriptorProto.newBuilder().setName("test").build(); - EnumDescriptorProto retProto = proxy.exchangeProto(sendProto); - assertEquals(sendProto, retProto); - assertNotSame(sendProto, retProto); - } finally { - rpcServer.stop(); - if(proxy != null) { - HBaseRPC.stopProxy(proxy); - } - } - } - - public static void main(String[] args) throws Exception { - new TestPBOnWritableRpc().testCallsInternal(conf); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} Index: hbase-server/src/main/protobuf/RPC.proto =================================================================== --- hbase-server/src/main/protobuf/RPC.proto (revision 1371001) +++ hbase-server/src/main/protobuf/RPC.proto (working copy) @@ -86,6 +86,10 @@ /** Bytes corresponding to the client protobuf request */ optional bytes request = 3; + + /** Some metainfo about the request. Helps us to treat RPCs with + different priorities */ + optional string requestClassName = 4; } /** @@ -122,4 +126,4 @@ /** Exception stack trace from the server side */ optional string stackTrace = 2; -} \ No newline at end of file +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (working copy) @@ -1492,6 +1492,10 @@ // optional bytes request = 3; boolean hasRequest(); com.google.protobuf.ByteString getRequest(); + + // optional string requestClassName = 4; + boolean hasRequestClassName(); + String getRequestClassName(); } public static final class RpcRequestBody extends com.google.protobuf.GeneratedMessage @@ -1574,10 +1578,43 @@ return request_; } + // optional string requestClassName = 4; + public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4; + private java.lang.Object requestClassName_; + public boolean hasRequestClassName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getRequestClassName() { + java.lang.Object ref = requestClassName_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + requestClassName_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getRequestClassNameBytes() { + java.lang.Object ref = requestClassName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + requestClassName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { methodName_ = ""; clientProtocolVersion_ = 0L; request_ = com.google.protobuf.ByteString.EMPTY; + requestClassName_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1604,6 +1641,9 @@ if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, request_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, getRequestClassNameBytes()); + } getUnknownFields().writeTo(output); } @@ -1625,6 +1665,10 @@ size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, request_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getRequestClassNameBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1663,6 +1707,11 @@ result = result && getRequest() .equals(other.getRequest()); } + result = result && (hasRequestClassName() == other.hasRequestClassName()); + if (hasRequestClassName()) { + result = result && getRequestClassName() + .equals(other.getRequestClassName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1684,6 +1733,10 @@ hash = (37 * hash) + REQUEST_FIELD_NUMBER; hash = (53 * hash) + getRequest().hashCode(); } + if (hasRequestClassName()) { + hash = (37 * hash) + REQUESTCLASSNAME_FIELD_NUMBER; + hash = (53 * hash) + getRequestClassName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -1806,6 +1859,8 @@ bitField0_ = (bitField0_ & ~0x00000002); request_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); + requestClassName_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1856,6 +1911,10 @@ to_bitField0_ |= 0x00000004; } result.request_ = request_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.requestClassName_ = requestClassName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1881,6 +1940,9 @@ if (other.hasRequest()) { setRequest(other.getRequest()); } + if (other.hasRequestClassName()) { + setRequestClassName(other.getRequestClassName()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1931,6 +1993,11 @@ request_ = input.readBytes(); break; } + case 34: { + bitField0_ |= 0x00000008; + requestClassName_ = input.readBytes(); + break; + } } } } @@ -2018,6 +2085,42 @@ return this; } + // optional string requestClassName = 4; + private java.lang.Object requestClassName_ = ""; + public boolean hasRequestClassName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getRequestClassName() { + java.lang.Object ref = requestClassName_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + requestClassName_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setRequestClassName(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + requestClassName_ = value; + onChanged(); + return this; + } + public Builder clearRequestClassName() { + bitField0_ = (bitField0_ & ~0x00000008); + requestClassName_ = getDefaultInstance().getRequestClassName(); + onChanged(); + return this; + } + void setRequestClassName(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000008; + requestClassName_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:RpcRequestBody) } @@ -3505,16 +3608,17 @@ "ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" + "rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" + "doop.hbase.client.ClientProtocol\"\"\n\020RpcR" + - "equestHeader\022\016\n\006callId\030\001 \002(\r\"T\n\016RpcReque" + + "equestHeader\022\016\n\006callId\030\001 \002(\r\"n\n\016RpcReque" + "stBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientProt" + - "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\"{\n\021Rp" + - "cResponseHeader\022\016\n\006callId\030\001 \002(\005\022)\n\006statu" + - "s\030\002 \002(\0162\031.RpcResponseHeader.Status\"+\n\006St", - "atus\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"" + - "#\n\017RpcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014" + - "RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\ns" + - "tackTrace\030\002 \001(\tB<\n*org.apache.hadoop.hba" + - "se.protobuf.generatedB\tRPCProtosH\001\240\001\001" + "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020re" + + "questClassName\030\004 \001(\t\"{\n\021RpcResponseHeade" + + "r\022\016\n\006callId\030\001 \002(\005\022)\n\006status\030\002 \002(\0162\031.RpcR", + "esponseHeader.Status\"+\n\006Status\022\013\n\007SUCCES" + + "S\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcResponse" + + "Body\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022\025" + + "\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001(" + + "\tB<\n*org.apache.hadoop.hbase.protobuf.ge" + + "neratedB\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3550,7 +3654,7 @@ internal_static_RpcRequestBody_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RpcRequestBody_descriptor, - new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", }, + new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", }, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class); internal_static_RpcResponseHeader_descriptor = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (working copy) @@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.monitoring; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; /** * A MonitoredTask implementation optimized for use with RPC Handlers @@ -41,6 +40,6 @@ public abstract void setRPC(String methodName, Object [] params, long queueTime); - public abstract void setRPCPacket(Writable param); + public abstract void setRPCPacket(RpcRequestBody param); public abstract void setConnection(String clientAddress, int remotePort); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -46,7 +47,7 @@ private long rpcStartTime; private String methodName = ""; private Object [] params = {}; - private Writable packet; + private RpcRequestBody packet; public MonitoredRPCHandlerImpl() { super(); @@ -141,11 +142,7 @@ // no RPC is currently running, or we don't have an RPC's packet info return -1L; } - if (!(packet instanceof WritableWithSize)) { - // the packet passed to us doesn't expose size information - return -1L; - } - return ((WritableWithSize) packet).getWritableSize(); + return packet.getSerializedSize(); } /** @@ -201,11 +198,11 @@ } /** - * Gives this instance a reference to the Writable received by the RPC, so + * Gives this instance a reference to the protobuf received by the RPC, so * that it can later compute its size if asked for it. - * @param param The Writable received by the RPC for this call + * @param param The protobuf received by the RPC for this call */ - public void setRPCPacket(Writable param) { + public void setRPCPacket(RpcRequestBody param) { this.packet = param; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -27,10 +27,13 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.BindException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -40,11 +43,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.SortedSet; import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -168,6 +173,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -224,6 +230,8 @@ import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.google.protobuf.RpcController; /** @@ -436,7 +444,12 @@ */ private final int scannerLeaseTimeoutPeriod; + /** + * The reference to the QosFunction + */ + private final QosFunction qosFunction; + /** * Starts a HRegionServer at the default location * @@ -513,7 +526,7 @@ this.isa = this.rpcServer.getListenerAddress(); this.rpcServer.setErrorHandler(this); - this.rpcServer.setQosFunction(new QosFunction()); + this.rpcServer.setQosFunction((qosFunction = new QosFunction())); this.startcode = System.currentTimeMillis(); // login the server principal (if using secure Hadoop) @@ -545,13 +558,56 @@ int priority() default 0; } + QosFunction getQosFunction() { + return qosFunction; + } + + RegionScanner getScanner(long scannerId) { + String scannerIdString = Long.toString(scannerId); + return scanners.get(scannerIdString); + } + /** * Utility used ensuring higher quality of service for priority rpcs; e.g. * rpcs to .META. and -ROOT-, etc. */ - class QosFunction implements Function { + class QosFunction implements Function { private final Map annotatedQos; + private HRegionServer hRegionServer = HRegionServer.this; + //The logic for figuring out high priority RPCs is as follows: + //1. if the method is annotated with a QosPriority of QOS_HIGH, + // that is honored + //2. parse out the protobuf message and see if the request is for meta + // region, and if so, treat it as a high priority RPC + //Some optimizations for (2) are done here - + //Clients send the argument classname as part of making the RPC. The server + //decides whether to deserialize the proto argument message based on the + //pre-established set of argument classes (knownArgumentClasses below). + //This prevents the server from having to deserialize all proto argument + //messages prematurely. + //All the argument classes declare a 'getRegion' method that returns a + //RegionSpecifier object. Methods can be invoked on the returned object + //to figure out whether it is a meta region or not. + @SuppressWarnings("unchecked") + private final Class[] knownArgumentClasses = new Class[]{ + GetRegionInfoRequest.class, + GetStoreFileRequest.class, + CloseRegionRequest.class, + FlushRegionRequest.class, + SplitRegionRequest.class, + CompactRegionRequest.class, + GetRequest.class, + MutateRequest.class, + ScanRequest.class, + LockRowRequest.class, + UnlockRowRequest.class, + MultiRequest.class + }; + + private final Map> argumentToClassMap = + new HashMap>(); + public QosFunction() { Map qosMap = new HashMap(); for (Method m : HRegionServer.class.getMethods()) { @@ -562,12 +618,19 @@ } annotatedQos = qosMap; + for (Class cls : knownArgumentClasses) { + argumentToClassMap.put(cls.getCanonicalName(), cls); + } } + + public void setRegionServer(HRegionServer server) { + this.hRegionServer = server; + } public boolean isMetaRegion(byte[] regionName) { HRegion region; try { - region = getRegion(regionName); + region = hRegionServer.getRegion(regionName); } catch (NotServingRegionException ignored) { return false; } @@ -575,64 +638,68 @@ } @Override - public Integer apply(Writable from) { - if (!(from instanceof Invocation)) return NORMAL_QOS; + public Integer apply(RpcRequestBody from) { + String methodName = from.getMethodName(); + Class rpcArgClass = null; + if (from.hasRequestClassName()) { + String cls = from.getRequestClassName(); + rpcArgClass = argumentToClassMap.get(cls); + } - Invocation inv = (Invocation) from; - String methodName = inv.getMethodName(); - Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; } - // scanner methods... - if (methodName.equals("next") || methodName.equals("close")) { - // translate! - Long scannerId; + if (rpcArgClass == null || from.getRequest().isEmpty()) { + return NORMAL_QOS; + } + + if (isMetaRegionOperation(rpcArgClass, from.getRequest())) { + return HIGH_QOS; + } else if (methodName.equals("scan")) { // scanner methods... + ScanRequest request; try { - scannerId = (Long) inv.getParameters()[0]; - } catch (ClassCastException ignored) { - // LOG.debug("Low priority: " + from); - return NORMAL_QOS; // doh. + request = ScanRequest.parseFrom(from.getRequest()); + } catch (InvalidProtocolBufferException e) { + return NORMAL_QOS; } - String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); + if (!request.hasScannerId()) { + return NORMAL_QOS; + } + RegionScanner scanner = hRegionServer.getScanner(request.getScannerId()); if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { - // LOG.debug("High priority scanner request: " + scannerId); + if (LOG.isDebugEnabled()) { + LOG.debug("High priority scanner request: " + request.getScannerId()); + } return HIGH_QOS; } - } else if (inv.getParameterClasses().length == 0) { - // Just let it through. This is getOnlineRegions, etc. - } else if (inv.getParameterClasses()[0] == byte[].class) { - // first arg is byte array, so assume this is a regionname: - if (isMetaRegion((byte[]) inv.getParameters()[0])) { - // LOG.debug("High priority with method: " + methodName + - // " and region: " - // + Bytes.toString((byte[]) inv.getParameters()[0])); - return HIGH_QOS; - } - } else if (inv.getParameterClasses()[0] == MultiAction.class) { - MultiAction ma = (MultiAction) inv.getParameters()[0]; - Set regions = ma.getRegions(); - // ok this sucks, but if any single of the actions touches a meta, the - // whole - // thing gets pingged high priority. This is a dangerous hack because - // people - // can get their multi action tagged high QOS by tossing a Get(.META.) - // AND this - // regionserver hosts META/-ROOT- - for (byte[] region : regions) { - if (isMetaRegion(region)) { - // LOG.debug("High priority multi with region: " + - // Bytes.toString(region)); - return HIGH_QOS; // short circuit for the win. - } - } } - // LOG.debug("Low priority: " + from.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Low priority: " + from.toString()); + } return NORMAL_QOS; } + + private boolean isMetaRegionOperation(Class argumentClass, + ByteString argument) { + try { + Method parseFrom = argumentClass.getDeclaredMethod("parseFrom", + ByteString.class); + Object deserializedRequestObj = parseFrom.invoke(null, argument); + Method getRegion = argumentClass.getDeclaredMethod("getRegion"); + RegionSpecifier regionSpecifier = + (RegionSpecifier)getRegion.invoke(deserializedRequestObj, + (Object[])null); + HRegion region = hRegionServer.getRegion(regionSpecifier); + if (region.getRegionInfo().isMetaRegion()) { + return true; + } + } catch (Exception ex) { + return false; + } + return true; + } } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (working copy) @@ -54,14 +54,10 @@ */ protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) { - return getClient(conf, factory, HbaseObjectWritable.class); - } - protected synchronized HBaseClient getClient(Configuration conf, - SocketFactory factory, Class valueClass) { HBaseClient client = clients.get(factory); if (client == null) { // Make an hbase client instead of hadoop Client. - client = new HBaseClient(valueClass, conf, factory); + client = new HBaseClient(conf, factory); clients.put(factory, client); } else { client.incCount(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -1,468 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import java.lang.reflect.Proxy; -import java.lang.reflect.Method; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.UndeclaredThrowableException; - -import java.net.InetSocketAddress; -import java.io.*; -import java.util.Map; -import java.util.HashMap; - -import javax.net.SocketFactory; - -import org.apache.commons.logging.*; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.Operation; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Objects; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; -import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.*; - -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.protobuf.ServiceException; - -/** An RpcEngine implementation for Writable data. */ -@InterfaceAudience.Private -class WritableRpcEngine implements RpcEngine { - // LOG is NOT in hbase subpackage intentionally so that the default HBase - // DEBUG log level does NOT emit RPC-level logging. - private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); - - protected final static ClientCache CLIENTS = new ClientCache(); - - private static class Invoker implements InvocationHandler { - private Class protocol; - private InetSocketAddress address; - private User ticket; - private HBaseClient client; - private boolean isClosed = false; - final private int rpcTimeout; - - public Invoker(Class protocol, - InetSocketAddress address, User ticket, - Configuration conf, SocketFactory factory, int rpcTimeout) { - this.protocol = protocol; - this.address = address; - this.ticket = ticket; - this.client = CLIENTS.getClient(conf, factory); - this.rpcTimeout = rpcTimeout; - } - - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - final boolean logDebug = LOG.isDebugEnabled(); - long startTime = 0; - if (logDebug) { - startTime = System.currentTimeMillis(); - } - - try { - HbaseObjectWritable value = (HbaseObjectWritable) - client.call(new Invocation(method, args), address, protocol, ticket, - rpcTimeout); - if (logDebug) { - // FIGURE HOW TO TURN THIS OFF! - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); - } - return value.get(); - } catch (Throwable t) { - // For protobuf protocols, ServiceException is expected - if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) { - if (t instanceof RemoteException) { - Throwable cause = ((RemoteException)t).unwrapRemoteException(); - throw new ServiceException(cause); - } - throw new ServiceException(t); - } - throw t; - } - } - - /* close the IPC client that's responsible for this invoker's RPCs */ - synchronized protected void close() { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } - } - - /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public VersionedProtocol getProxy( - Class protocol, long clientVersion, - InetSocketAddress addr, User ticket, - Configuration conf, SocketFactory factory, int rpcTimeout) - throws IOException { - - VersionedProtocol proxy = - (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); - try { - long serverVersion = ((VersionedProtocol)proxy) - .getProtocolVersion(protocol.getName(), clientVersion); - if (serverVersion != clientVersion) { - throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } - } catch (Throwable t) { - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); - } - if (t instanceof ServiceException) { - throw ProtobufUtil.getRemoteException((ServiceException)t); - } - if (!(t instanceof IOException)) { - LOG.error("Unexpected throwable object ", t); - throw new IOException(t); - } - throw (IOException)t; - } - return proxy; - } - - /** - * Stop this proxy and release its invoker's resource - * @param proxy the proxy to be stopped - */ - public void stopProxy(VersionedProtocol proxy) { - if (proxy!=null) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - } - - /** Construct a server for a protocol implementation instance listening on a - * port and address. */ - public Server getServer(Class protocol, - Object instance, - Class[] ifaces, - String bindAddress, int port, - int numHandlers, - int metaHandlerCount, boolean verbose, - Configuration conf, int highPriorityLevel) - throws IOException { - return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, - metaHandlerCount, verbose, highPriorityLevel); - } - - /** An RPC Server. */ - public static class Server extends HBaseServer { - private Object instance; - private Class implementation; - private Class[] ifaces; - private boolean verbose; - - private static final String WARN_RESPONSE_TIME = - "hbase.ipc.warn.response.time"; - private static final String WARN_RESPONSE_SIZE = - "hbase.ipc.warn.response.size"; - - /** Default value for above params */ - private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds - private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; - - /** Names for suffixed metrics */ - private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec."; - - private final int warnResponseTime; - private final int warnResponseSize; - - private static String classNameBase(String className) { - String[] names = className.split("\\.", -1); - if (names == null || names.length == 0) { - return className; - } - return names[names.length-1]; - } - - /** Construct an RPC server. - * @param instance the instance whose methods will be called - * @param ifaces the interfaces the server supports - * @param paramClass an instance of this class is used to read the RPC requests - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param metaHandlerCount the number of meta handlers desired - * @param verbose whether each call should be logged - * @param highPriorityLevel the priority level this server treats as high priority RPCs - * @throws IOException e - */ - public Server(Object instance, final Class[] ifaces, - Class paramClass, - Configuration conf, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, - int highPriorityLevel) throws IOException { - super(bindAddress, port, paramClass, numHandlers, metaHandlerCount, - conf, classNameBase(instance.getClass().getName()), - highPriorityLevel); - this.instance = instance; - this.implementation = instance.getClass(); - this.verbose = verbose; - - this.ifaces = ifaces; - - // create metrics for the advertised interfaces this server implements. - String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC}; - this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes); - - this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, - DEFAULT_WARN_RESPONSE_TIME); - this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, - DEFAULT_WARN_RESPONSE_SIZE); - } - - public Server(Object instance, final Class[] ifaces, - Configuration conf, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, - int highPriorityLevel) throws IOException { - this(instance, ifaces, Invocation.class, conf, bindAddress, port, - numHandlers, metaHandlerCount, verbose, highPriorityLevel); - } - - public AuthenticationTokenSecretManager createSecretManager(){ - if (!User.isSecurityEnabled() || - !(instance instanceof org.apache.hadoop.hbase.Server)) { - return null; - } - org.apache.hadoop.hbase.Server server = - (org.apache.hadoop.hbase.Server)instance; - Configuration conf = server.getConfiguration(); - long keyUpdateInterval = - conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); - long maxAge = - conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); - return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), - server.getServerName().toString(), keyUpdateInterval, maxAge); - } - - @Override - public void startThreads() { - AuthenticationTokenSecretManager mgr = createSecretManager(); - if (mgr != null) { - setSecretManager(mgr); - mgr.start(); - } - this.authManager = new ServiceAuthorizationManager(); - HBasePolicyProvider.init(conf, authManager); - - // continue with base startup - super.startThreads(); - } - - @Override - public Writable call(Class protocol, - Writable param, long receivedTime, MonitoredRPCHandler status) - throws IOException { - try { - Invocation call = (Invocation)param; - if(call.getMethodName() == null) { - throw new IOException("Could not find requested method, the usual " + - "cause is a version mismatch between client and server."); - } - if (verbose) log("Call: " + call, LOG); - status.setRPC(call.getMethodName(), call.getParameters(), receivedTime); - status.setRPCPacket(param); - status.resume("Servicing call"); - - Method method = - protocol.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - - //Verify protocol version. - //Bypass the version check for VersionedProtocol - if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { - long clientVersion = call.getProtocolVersion(); - ProtocolSignature serverInfo = ((VersionedProtocol) instance) - .getProtocolSignature(protocol.getCanonicalName(), call - .getProtocolVersion(), call.getClientMethodsHash()); - long serverVersion = serverInfo.getVersion(); - if (serverVersion != clientVersion) { - LOG.warn("Version mismatch: client version=" + clientVersion - + ", server version=" + serverVersion); - throw new RPC.VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } - } - Object impl = null; - if (protocol.isAssignableFrom(this.implementation)) { - impl = this.instance; - } - else { - throw new HBaseRPC.UnknownProtocolException(protocol); - } - - long startTime = System.currentTimeMillis(); - Object[] params = call.getParameters(); - Object value = method.invoke(impl, params); - int processingTime = (int) (System.currentTimeMillis() - startTime); - int qTime = (int) (startTime-receivedTime); - if (TRACELOG.isDebugEnabled()) { - TRACELOG.debug("Call #" + CurCall.get().id + - "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() + - " queueTime=" + qTime + - " processingTime=" + processingTime + - " contents=" + Objects.describeQuantity(params)); - } - rpcMetrics.rpcQueueTime.inc(qTime); - rpcMetrics.rpcProcessingTime.inc(processingTime); - rpcMetrics.inc(call.getMethodName(), processingTime); - if (verbose) log("Return: "+value, LOG); - - HbaseObjectWritable retVal = - new HbaseObjectWritable(method.getReturnType(), value); - long responseSize = retVal.getWritableSize(); - // log any RPC responses that are slower than the configured warn - // response time or larger than configured warning size - boolean tooSlow = (processingTime > warnResponseTime - && warnResponseTime > -1); - boolean tooLarge = (responseSize > warnResponseSize - && warnResponseSize > -1); - if (tooSlow || tooLarge) { - // when tagging, we let TooLarge trump TooSmall to keep output simple - // note that large responses will often also be slow. - logResponse(call.getParameters(), call.getMethodName(), - call.toString(), (tooLarge ? "TooLarge" : "TooSlow"), - status.getClient(), startTime, processingTime, qTime, - responseSize); - // provides a count of log-reported slow responses - if (tooSlow) { - rpcMetrics.rpcSlowResponseTime.inc(processingTime); - } - } - if (processingTime > 1000) { - // we use a hard-coded one second period so that we can clearly - // indicate the time period we're warning about in the name of the - // metric itself - rpcMetrics.inc(call.getMethodName() + ABOVE_ONE_SEC_METRIC, - processingTime); - } - - return retVal; - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - throw (IOException)target; - } - if (target instanceof ServiceException) { - throw ProtobufUtil.getRemoteException((ServiceException)target); - } - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - throw ioe; - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - throw ioe; - } - } - - /** - * Logs an RPC response to the LOG file, producing valid JSON objects for - * client Operations. - * @param params The parameters received in the call. - * @param methodName The name of the method invoked - * @param call The string representation of the call - * @param tag The tag that will be used to indicate this event in the log. - * @param client The address of the client who made this call. - * @param startTime The time that the call was initiated, in ms. - * @param processingTime The duration that the call took to run, in ms. - * @param qTime The duration that the call spent on the queue - * prior to being initiated, in ms. - * @param responseSize The size in bytes of the response buffer. - */ - void logResponse(Object[] params, String methodName, String call, String tag, - String clientAddress, long startTime, int processingTime, int qTime, - long responseSize) - throws IOException { - // for JSON encoding - ObjectMapper mapper = new ObjectMapper(); - // base information that is reported regardless of type of call - Map responseInfo = new HashMap(); - responseInfo.put("starttimems", startTime); - responseInfo.put("processingtimems", processingTime); - responseInfo.put("queuetimems", qTime); - responseInfo.put("responsesize", responseSize); - responseInfo.put("client", clientAddress); - responseInfo.put("class", instance.getClass().getSimpleName()); - responseInfo.put("method", methodName); - if (params.length == 2 && instance instanceof HRegionServer && - params[0] instanceof byte[] && - params[1] instanceof Operation) { - // if the slow process is a query, we want to log its table as well - // as its own fingerprint - byte [] tableName = - HRegionInfo.parseRegionName((byte[]) params[0])[0]; - responseInfo.put("table", Bytes.toStringBinary(tableName)); - // annotate the response map with operation details - responseInfo.putAll(((Operation) params[1]).toMap()); - // report to the log file - LOG.warn("(operation" + tag + "): " + - mapper.writeValueAsString(responseInfo)); - } else if (params.length == 1 && instance instanceof HRegionServer && - params[0] instanceof Operation) { - // annotate the response map with operation details - responseInfo.putAll(((Operation) params[0]).toMap()); - // report to the log file - LOG.warn("(operation" + tag + "): " + - mapper.writeValueAsString(responseInfo)); - } else { - // can't get JSON details, so just report call.toString() along with - // a more generic tag. - responseInfo.put("call", call); - LOG.warn("(response" + tag + "): " + - mapper.writeValueAsString(responseInfo)); - } - } - } - - protected static void log(String value, Log LOG) { - String v = value; - if (v != null && v.length() > 55) - v = v.substring(0, 55)+"..."; - LOG.info(v); - } -} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -68,12 +68,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.DataOutputOutputStream; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; @@ -87,9 +85,7 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -104,17 +100,16 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import org.cliffc.high_scale_lib.Counter; -/** An abstract IPC service. IPC calls take a single {@link Writable} as a - * parameter, and return a {@link Writable} as their value. A service runs on +/** A client for an IPC service. IPC calls take a single Protobuf message as a + * parameter, and return a single Protobuf message as their value. A service runs on * a port and is defined by a parameter class and a value class. * * @@ -193,8 +188,8 @@ } /** Returns the server instance called under or null. May be called under - * {@link #call(Class, Writable, long, MonitoredRPCHandler)} implementations, - * and under {@link Writable} methods of paramters and return values. + * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations, + * and under protobuf methods of paramters and return values. * Permits applications to access the server context. * @return HBaseServer */ @@ -235,7 +230,6 @@ private int handlerCount; // number of handler threads private int priorityHandlerCount; private int readThreads; // number of read threads - protected Class paramClass; // class of call parameters protected int maxIdleTime; // the maximum idle time after // which a client may be // disconnected @@ -312,7 +306,7 @@ /** A call queued for handling. */ protected class Call implements RpcCallContext { protected int id; // the client's call id - protected Writable param; // the parameter passed + protected RpcRequestBody param; // the parameter passed protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null @@ -324,7 +318,7 @@ protected long size; // size of current call protected boolean isError; - public Call(int id, Writable param, Connection connection, + public Call(int id, RpcRequestBody param, Connection connection, Responder responder, long size) { this.id = id; this.param = param; @@ -353,34 +347,13 @@ if (errorClass != null) { this.isError = true; } - Writable result = null; - if (value instanceof Writable) { - result = (Writable) value; + + ByteBufferOutputStream buf = null; + if (value != null) { + buf = new ByteBufferOutputStream(((Message)value).getSerializedSize()); } else { - /* We might have a null value and errors. Avoid creating a - * HbaseObjectWritable, because the constructor fails on null. */ - if (value != null) { - result = new HbaseObjectWritable(value); - } + buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE); } - - int size = BUFFER_INITIAL_SIZE; - if (result instanceof WritableWithSize) { - // get the size hint. - WritableWithSize ohint = (WritableWithSize) result; - long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT; - if (hint > Integer.MAX_VALUE) { - // oops, new problem. - IOException ioe = - new IOException("Result buffer size too large: " + hint); - errorClass = ioe.getClass().getName(); - error = StringUtils.stringifyException(ioe); - } else { - size = (int)hint; - } - } - - ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); try { RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder(); @@ -394,7 +367,9 @@ b.setStackTrace(error); b.build().writeDelimitedTo(out); } else { - result.write(out); + if (value != null) { + ((Message)value).writeDelimitedTo(out); + } } if (connection.useWrap) { wrapWithSasl(buf); @@ -709,7 +684,7 @@ closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ignored) {} - } + } } catch (Exception e) { closeCurrentConnection(key, e); } @@ -1418,7 +1393,7 @@ AccessControlException ae = new AccessControlException( "Authentication is required"); setupResponse(authFailedResponse, authFailedCall, Status.FATAL, - null, ae.getClass().getName(), ae.getMessage()); + ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); throw ae; } @@ -1506,7 +1481,7 @@ // Versions 3 and greater can interpret this exception // response in the same manner setupResponse(buffer, fakeCall, Status.FATAL, - null, VersionMismatch.class.getName(), errMsg); + VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); } @@ -1623,23 +1598,21 @@ if (LOG.isDebugEnabled()) { LOG.debug(" got call #" + id + ", " + callSize + " bytes"); } - // Enforcing the call queue size, this triggers a retry in the client if ((callSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = new Call(id, null, this, responder, callSize); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - setupResponse(responseBuffer, callTooBig, Status.FATAL, null, + setupResponse(responseBuffer, callTooBig, Status.FATAL, IOException.class.getName(), "Call queue is full, is ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; } - Writable param; + RpcRequestBody param; try { - param = ReflectionUtils.newInstance(paramClass, conf);//read param - param.readFields(dis); + param = RpcRequestBody.parseDelimitedFrom(dis); } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress(), t); @@ -1647,7 +1620,7 @@ new Call(id, null, this, responder, callSize); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); - setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, + setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, t.getClass().getName(), "IPC server unable to read call parameters: " + t.getMessage()); responder.doRespond(readParamsFailedCall); @@ -1683,7 +1656,7 @@ } catch (AuthorizationException ae) { LOG.debug("Connection authorization failed: "+ae.getMessage(), ae); rpcMetrics.authorizationFailures.inc(); - setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, + setupResponse(authFailedResponse, authFailedCall, Status.FATAL, ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); return false; @@ -1785,7 +1758,7 @@ String errorClass = null; String error = null; - Writable value = null; + Message value = null; CurCall.set(call); try { @@ -1855,7 +1828,7 @@ } - private Function qosFunction = null; + private Function qosFunction = null; /** * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there @@ -1864,11 +1837,11 @@ * @param newFunc */ @Override - public void setQosFunction(Function newFunc) { + public void setQosFunction(Function newFunc) { qosFunction = newFunc; } - protected int getQosLevel(Writable param) { + protected int getQosLevel(RpcRequestBody param) { if (qosFunction == null) { return 0; } @@ -1886,14 +1859,13 @@ * */ protected HBaseServer(String bindAddress, int port, - Class paramClass, int handlerCount, + int handlerCount, int priorityHandlerCount, Configuration conf, String serverName, int highPriorityLevel) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.port = port; - this.paramClass = paramClass; this.handlerCount = handlerCount; this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; @@ -1963,26 +1935,10 @@ */ private void setupResponse(ByteArrayOutputStream response, Call call, Status status, - Writable rv, String errorClass, String error) + String errorClass, String error) throws IOException { response.reset(); - DataOutputStream out = new DataOutputStream(response); - - if (status == Status.SUCCESS) { - try { - rv.write(out); - call.setResponse(rv, status, null, null); - } catch (Throwable t) { - LOG.warn("Error serializing call response for call " + call, t); - // Call back to same function - this is OK since the - // buffer is reset at the top, and since status is changed - // to ERROR it won't infinite loop. - call.setResponse(null, status.ERROR, t.getClass().getName(), - StringUtils.stringifyException(t)); - } - } else { - call.setResponse(rv, status, errorClass, error); - } + call.setResponse(null, status, errorClass, error); } protected void closeConnection(Connection connection) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.base.Function; -import org.apache.hadoop.io.Writable; +import com.google.protobuf.Message; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import java.io.IOException; import java.net.InetSocketAddress; @@ -47,16 +49,16 @@ /** Called for each call. * @param param writable parameter * @param receiveTime time - * @return Writable + * @return Message * @throws java.io.IOException e */ - Writable call(Class protocol, - Writable param, long receiveTime, MonitoredRPCHandler status) + Message call(Class protocol, + RpcRequestBody param, long receiveTime, MonitoredRPCHandler status) throws IOException; void setErrorHandler(HBaseRPCErrorHandler handler); - void setQosFunction(Function newFunc); + void setQosFunction(Function newFunc); void openServer(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -69,10 +71,8 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -80,11 +80,14 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.util.ReflectionUtils; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; -/** A client for an IPC service. IPC calls take a single {@link Writable} as a - * parameter, and return a {@link Writable} as their value. A service runs on + +/** A client for an IPC service. IPC calls take a single Protobuf message as a + * parameter, and return a single Protobuf message as their value. A service runs on * a port and is defined by a parameter class and a value class. * *

This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and @@ -99,7 +102,6 @@ .getLog("org.apache.hadoop.ipc.HBaseClient"); protected final PoolMap connections; - protected final Class valueClass; // class of call values protected int counter; // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; @@ -187,13 +189,13 @@ /** A call waiting for a value. */ protected class Call { final int id; // call id - final Writable param; // parameter - Writable value; // value, null if error + final RpcRequestBody param; // rpc request object + Message value; // value, null if error IOException error; // exception, null if value boolean done; // true when call is done long startTime; - protected Call(Writable param) { + protected Call(RpcRequestBody param) { this.param = param; this.startTime = System.currentTimeMillis(); synchronized (HBaseClient.this) { @@ -223,7 +225,7 @@ * * @param value return value of the call. */ - public synchronized void setValue(Writable value) { + public synchronized void setValue(Message value) { this.value = value; callComplete(); } @@ -825,15 +827,18 @@ try { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder(); - builder.setCallId(call.id); - DataOutputBuffer d = new DataOutputBuffer(); - builder.build().writeDelimitedTo(d); - call.param.write(d); + RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder(); + headerBuilder.setCallId(call.id); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - this.out.writeInt(d.getLength()); - this.out.write(d.getData(), 0, d.getLength()); + int serializedHeaderSize = headerBuilder.build().getSerializedSize(); + int requestSerializedSize = call.param.getSerializedSize(); + this.out.writeInt(serializedHeaderSize + + CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) + + requestSerializedSize + + CodedOutputStream.computeRawVarint32Size(requestSerializedSize)); + headerBuilder.build().writeDelimitedTo(this.out); + call.param.writeDelimitedTo(this.out); this.out.flush(); } } catch(IOException e) { @@ -870,8 +875,17 @@ Status status = response.getStatus(); if (status == Status.SUCCESS) { - Writable value = ReflectionUtils.newInstance(valueClass, conf); - value.readFields(in); // read value + Message rpcResponseType; + try { + rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType( + ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(), + call.param.getMethodName())); + } catch (Exception e) { + throw new RuntimeException(e); //local exception + } + Builder builder = rpcResponseType.newBuilderForType(); + builder.mergeDelimitedFrom(in); + Message value = builder.build(); // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. if (call != null) { @@ -983,7 +997,7 @@ private final ParallelResults results; protected final int index; - public ParallelCall(Writable param, ParallelResults results, int index) { + public ParallelCall(RpcRequestBody param, ParallelResults results, int index) { super(param); this.results = results; this.index = index; @@ -998,12 +1012,12 @@ /** Result collector for parallel calls. */ protected static class ParallelResults { - protected final Writable[] values; + protected final Message[] values; protected int size; protected int count; public ParallelResults(int size) { - this.values = new Writable[size]; + this.values = new RpcResponseBody[size]; this.size = size; } @@ -1020,15 +1034,13 @@ } /** - * Construct an IPC client whose values are of the given {@link Writable} + * Construct an IPC client whose values are of the {@link Message} * class. * @param valueClass value class * @param conf configuration * @param factory socket factory */ - public HBaseClient(Class valueClass, Configuration conf, - SocketFactory factory) { - this.valueClass = valueClass; + public HBaseClient(Configuration conf, SocketFactory factory) { this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); @@ -1051,8 +1063,8 @@ * @param valueClass value class * @param conf configuration */ - public HBaseClient(Class valueClass, Configuration conf) { - this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); + public HBaseClient(Configuration conf) { + this(conf, NetUtils.getDefaultSocketFactory(conf)); } /** @@ -1124,17 +1136,17 @@ /** Make a call, passing param, to the IPC server running at * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. - * @param param writable parameter + * @param param RpcRequestBody parameter * @param address network address - * @return Writable + * @return Message * @throws IOException e */ - public Writable call(Writable param, InetSocketAddress address) + public Message call(RpcRequestBody param, InetSocketAddress address) throws IOException, InterruptedException { return call(param, address, null, 0); } - public Writable call(Writable param, InetSocketAddress addr, + public Message call(RpcRequestBody param, InetSocketAddress addr, User ticket, int rpcTimeout) throws IOException, InterruptedException { return call(param, addr, null, ticket, rpcTimeout); @@ -1145,7 +1157,7 @@ * with the ticket credentials, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(Writable param, InetSocketAddress addr, + public Message call(RpcRequestBody param, InetSocketAddress addr, Class protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { @@ -1217,14 +1229,14 @@ * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. - * @param params writable parameters + * @param params RpcRequestBody parameters * @param addresses socket addresses - * @return Writable[] + * @return RpcResponseBody[] * @throws IOException e - * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead + * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead */ @Deprecated - public Writable[] call(Writable[] params, InetSocketAddress[] addresses) + public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses) throws IOException, InterruptedException { return call(params, addresses, null, null); } @@ -1233,11 +1245,11 @@ * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. */ - public Writable[] call(Writable[] params, InetSocketAddress[] addresses, + public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses, Class protocol, User ticket) throws IOException, InterruptedException { - if (addresses.length == 0) return new Writable[0]; + if (addresses.length == 0) return new RpcResponseBody[0]; ParallelResults results = new ParallelResults(params.length); // TODO this synchronization block doesnt make any sense, we should possibly fix it Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; @@ -46,24 +45,13 @@ * * This is a local hbase copy of the hadoop RPC so we can do things like * address HADOOP-414 for hbase-only and try other hbase-specific - * optimizations like using our own version of ObjectWritable. Class has been - * renamed to avoid confusing it w/ hadoop versions. + * optimizations. Class has been renamed to avoid confusing it w/ hadoop + * versions. *

* * * A protocol is a Java interface. All parameters and return types must - * be one of: - * - *

  • a primitive type, boolean, byte, - * char, short, int, long, - * float, double, or void; or
  • - * - *
  • a {@link String}; or
  • - * - *
  • a {@link Writable}; or
  • - * - *
  • an array of the above types
- * + * be Protobuf objects. * All methods in the protocol should throw only IOException. No field data of * the protocol instance is transmitted. */ @@ -122,7 +110,7 @@ if (engine == null) { // check for a configured default engine Class defaultEngine = - conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class); + conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class); // check for a per interface override Class impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(), @@ -345,16 +333,6 @@ VersionedProtocol proxy = engine .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout())); - if (engine instanceof WritableRpcEngine) { - long serverVersion = proxy.getProtocolVersion(protocol.getName(), - clientVersion); - if (serverVersion == clientVersion) { - return proxy; - } - - throw new VersionMismatch(protocol.getName(), clientVersion, - serverVersion); - } return proxy; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (revision 1371001) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (working copy) @@ -18,14 +18,13 @@ package org.apache.hadoop.hbase.ipc; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,14 +34,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.DataOutputOutputStream; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.io.*; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Objects; -import org.apache.hadoop.hbase.util.ProtoUtil; +import org.codehaus.jackson.map.ObjectMapper; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -80,8 +85,9 @@ return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel); } - private static class Invoker implements InvocationHandler { - private final Map returnTypes = + + static class Invoker implements InvocationHandler { + private static final Map returnTypes = new ConcurrentHashMap(); private Class protocol; private InetSocketAddress address; @@ -97,7 +103,7 @@ this.protocol = protocol; this.address = addr; this.ticket = ticket; - this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class); + this.client = CLIENTS.getClient(conf, factory); this.rpcTimeout = rpcTimeout; Long version = Invocation.PROTOCOL_VERSION.get(protocol); if (version != null) { @@ -133,6 +139,7 @@ + method.getName() + "]" + ", Expected: 2, Actual: " + params.length); } + builder.setRequestClassName(param.getClass().getName()); builder.setRequest(param.toByteString()); builder.setClientProtocolVersion(clientProtocolVersion); rpcRequest = builder.build(); @@ -166,24 +173,20 @@ } RpcRequestBody rpcRequest = constructRpcRequest(method, args); - RpcResponseWritable val = null; + Message val = null; try { - val = (RpcResponseWritable) client.call( - new RpcRequestWritable(rpcRequest), address, protocol, ticket, - rpcTimeout); + val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } - - Message protoType = null; - protoType = getReturnProtoType(method); - Message returnMessage; - returnMessage = protoType.newBuilderForType() - .mergeFrom(val.responseMessage).build(); - return returnMessage; + return val; } catch (Throwable e) { + if (e instanceof RemoteException) { + Throwable cause = ((RemoteException)e).unwrapRemoteException(); + throw new ServiceException(cause); + } throw new ServiceException(e); } } @@ -195,7 +198,7 @@ } } - private Message getReturnProtoType(Method method) throws Exception { + static Message getReturnProtoType(Method method) throws Exception { if (returnTypes.containsKey(method.getName())) { return returnTypes.get(method.getName()); } @@ -209,78 +212,11 @@ } } - /** - * Writable Wrapper for Protocol Buffer Requests - */ - private static class RpcRequestWritable implements Writable { - RpcRequestBody message; - - @SuppressWarnings("unused") - public RpcRequestWritable() { - } - - RpcRequestWritable(RpcRequestBody message) { - this.message = message; - } - - @Override - public void write(DataOutput out) throws IOException { - ((Message)message).writeDelimitedTo( - DataOutputOutputStream.constructOutputStream(out)); - } - - @Override - public void readFields(DataInput in) throws IOException { - int length = ProtoUtil.readRawVarint32(in); - byte[] bytes = new byte[length]; - in.readFully(bytes); - message = RpcRequestBody.parseFrom(bytes); - } - - public int getSerializedSize() { - return message.getSerializedSize(); - } - - @Override - public String toString() { - return " Client Protocol Version: " + - message.getClientProtocolVersion() + " MethodName: " + - message.getMethodName(); - } - } - - /** - * Writable Wrapper for Protocol Buffer Responses - */ - private static class RpcResponseWritable implements Writable { - byte[] responseMessage; - - @SuppressWarnings("unused") - public RpcResponseWritable() { - } - - public RpcResponseWritable(Message message) { - this.responseMessage = message.toByteArray(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(responseMessage.length); - out.write(responseMessage); - } - - @Override - public void readFields(DataInput in) throws IOException { - int length = in.readInt(); - byte[] bytes = new byte[length]; - in.readFully(bytes); - responseMessage = bytes; - } - } - public static class Server extends WritableRpcEngine.Server { + public static class Server extends HBaseServer { boolean verbose; Object instance; Class implementation; + private Class[] ifaces; private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = @@ -295,41 +231,84 @@ private final int warnResponseTime; private final int warnResponseSize; + private static String classNameBase(String className) { + String[] names = className.split("\\.", -1); + if (names == null || names.length == 0) { + return className; + } + return names[names.length-1]; + } public Server(Object instance, final Class[] ifaces, Configuration conf, String bindAddress, int port, int numHandlers, int metaHandlerCount, boolean verbose, int highPriorityLevel) throws IOException { - super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port, - numHandlers, metaHandlerCount, verbose, highPriorityLevel); - this.verbose = verbose; + super(bindAddress, port, numHandlers, metaHandlerCount, + conf, classNameBase(instance.getClass().getName()), + highPriorityLevel); this.instance = instance; this.implementation = instance.getClass(); + this.verbose = verbose; + + this.ifaces = ifaces; + // create metrics for the advertised interfaces this server implements. String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC}; - this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes); + this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); + this.verbose = verbose; + this.instance = instance; + this.implementation = instance.getClass(); } - private final Map methodArg = + private static final Map methodArg = new ConcurrentHashMap(); - private final Map methodInstances = + private static final Map methodInstances = new ConcurrentHashMap(); + + private AuthenticationTokenSecretManager createSecretManager(){ + if (!User.isSecurityEnabled() || + !(instance instanceof org.apache.hadoop.hbase.Server)) { + return null; + } + org.apache.hadoop.hbase.Server server = + (org.apache.hadoop.hbase.Server)instance; + Configuration conf = server.getConfiguration(); + long keyUpdateInterval = + conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000); + long maxAge = + conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000); + return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(), + server.getServerName().toString(), keyUpdateInterval, maxAge); + } + @Override + public void startThreads() { + AuthenticationTokenSecretManager mgr = createSecretManager(); + if (mgr != null) { + setSecretManager(mgr); + mgr.start(); + } + this.authManager = new ServiceAuthorizationManager(); + HBasePolicyProvider.init(conf, authManager); + + // continue with base startup + super.startThreads(); + } + + @Override /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the * exception name and the stack trace are returned in the protobuf response. */ - public Writable call(Class protocol, - Writable writableRequest, long receiveTime, MonitoredRPCHandler status) + public Message call(Class protocol, + RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status) throws IOException { try { - RpcRequestWritable request = (RpcRequestWritable) writableRequest; - RpcRequestBody rpcRequest = request.message; String methodName = rpcRequest.getMethodName(); Method method = getMethod(protocol, methodName); if (method == null) { @@ -358,7 +337,7 @@ status.setRPC(rpcRequest.getMethodName(), new Object[]{rpcRequest.getRequest()}, receiveTime); - status.setRPCPacket(writableRequest); + status.setRPCPacket(rpcRequest); status.resume("Servicing call"); //get an instance of the method arg type Message protoType = getMethodArgType(method); @@ -398,7 +377,7 @@ rpcMetrics.rpcProcessingTime.inc(processingTime); rpcMetrics.inc(method.getName(), processingTime); if (verbose) { - WritableRpcEngine.log("Return: "+result, LOG); + log("Return: "+result, LOG); } long responseSize = result.getSerializedSize(); // log any RPC responses that are slower than the configured warn @@ -432,7 +411,7 @@ rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC, processingTime); } - return new RpcResponseWritable(result); + return result; } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { @@ -454,7 +433,7 @@ } } - private Method getMethod(Class protocol, + static Method getMethod(Class protocol, String methodName) { Method method = methodInstances.get(methodName); if (method != null) { @@ -472,7 +451,7 @@ return null; } - private Message getMethodArgType(Method method) throws Exception { + static Message getMethodArgType(Method method) throws Exception { Message protoType = methodArg.get(method.getName()); if (protoType != null) { return protoType; @@ -497,5 +476,68 @@ methodArg.put(method.getName(), protoType); return protoType; } + /** + * Logs an RPC response to the LOG file, producing valid JSON objects for + * client Operations. + * @param params The parameters received in the call. + * @param methodName The name of the method invoked + * @param call The string representation of the call + * @param tag The tag that will be used to indicate this event in the log. + * @param client The address of the client who made this call. + * @param startTime The time that the call was initiated, in ms. + * @param processingTime The duration that the call took to run, in ms. + * @param qTime The duration that the call spent on the queue + * prior to being initiated, in ms. + * @param responseSize The size in bytes of the response buffer. + */ + void logResponse(Object[] params, String methodName, String call, String tag, + String clientAddress, long startTime, int processingTime, int qTime, + long responseSize) + throws IOException { + // for JSON encoding + ObjectMapper mapper = new ObjectMapper(); + // base information that is reported regardless of type of call + Map responseInfo = new HashMap(); + responseInfo.put("starttimems", startTime); + responseInfo.put("processingtimems", processingTime); + responseInfo.put("queuetimems", qTime); + responseInfo.put("responsesize", responseSize); + responseInfo.put("client", clientAddress); + responseInfo.put("class", instance.getClass().getSimpleName()); + responseInfo.put("method", methodName); + if (params.length == 2 && instance instanceof HRegionServer && + params[0] instanceof byte[] && + params[1] instanceof Operation) { + // if the slow process is a query, we want to log its table as well + // as its own fingerprint + byte [] tableName = + HRegionInfo.parseRegionName((byte[]) params[0])[0]; + responseInfo.put("table", Bytes.toStringBinary(tableName)); + // annotate the response map with operation details + responseInfo.putAll(((Operation) params[1]).toMap()); + // report to the log file + LOG.warn("(operation" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } else if (params.length == 1 && instance instanceof HRegionServer && + params[0] instanceof Operation) { + // annotate the response map with operation details + responseInfo.putAll(((Operation) params[0]).toMap()); + // report to the log file + LOG.warn("(operation" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } else { + // can't get JSON details, so just report call.toString() along with + // a more generic tag. + responseInfo.put("call", call); + LOG.warn("(response" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } + } + protected static void log(String value, Log LOG) { + String v = value; + if (v != null && v.length() > 55) + v = v.substring(0, 55)+"..."; + LOG.info(v); + } } } \ No newline at end of file Index: hbase-server/src/main/resources/hbase-default.xml =================================================================== --- hbase-server/src/main/resources/hbase-default.xml (revision 1371001) +++ hbase-server/src/main/resources/hbase-default.xml (working copy) @@ -523,7 +523,7 @@ hbase.rpc.engine - org.apache.hadoop.hbase.ipc.WritableRpcEngine + org.apache.hadoop.hbase.ipc.ProtobufRpcEngine Implementation of org.apache.hadoop.hbase.ipc.RpcEngine to be used for client / server RPC call marshalling.