From a0ea98c6dbbd7c8f638eef9eca6c70a9adb3438a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 19 May 2017 18:17:47 +0800 Subject: [PATCH] test --- .../ipc/TestRpcServerSlowConnectionSetup.java | 120 +++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java new file mode 100644 index 0000000..50fbfdd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.junit.Assert.*; + +import com.google.common.collect.Lists; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, MediumTests.class }) +public class TestRpcServerSlowConnectionSetup { + + private static Configuration CONF = HBaseConfiguration.create(); + + private static RpcServer SERVER; + + private static Socket SOCKET; + + @BeforeClass + public static void setUp() throws IOException { + SERVER = RpcServerFactory.createRpcServer(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + SERVER.start(); + SOCKET = new Socket("localhost", SERVER.getListenerAddress().getPort()); + } + + @AfterClass + public static void tearDown() throws IOException { + if (SOCKET != null) { + SOCKET.close(); + } + if (SERVER != null) { + SERVER.stop(); + } + } + + @Test + public void test() throws IOException, InterruptedException { + int rpcHeaderLen = HConstants.RPC_HEADER.length; + byte[] preamble = new byte[rpcHeaderLen + 2]; + System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); + preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; + preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code; + SOCKET.getOutputStream().write(preamble, 0, rpcHeaderLen + 1); + SOCKET.getOutputStream().flush(); + Thread.sleep(5000); + SOCKET.getOutputStream().write(preamble, rpcHeaderLen + 1, 1); + SOCKET.getOutputStream().flush(); + + ConnectionHeader header = ConnectionHeader.newBuilder() + .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName()) + .setVersionInfo(ProtobufUtil.getVersionInfo()).build(); + DataOutputStream dos = new DataOutputStream(SOCKET.getOutputStream()); + dos.writeInt(header.getSerializedSize()); + header.writeTo(dos); + dos.flush(); + + int callId = 10; + Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"), + EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000, + HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats()); + RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null); + dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param)); + requestHeader.writeDelimitedTo(dos); + call.param.writeDelimitedTo(dos); + dos.flush(); + + DataInputStream dis = new DataInputStream(new BufferedInputStream(SOCKET.getInputStream())); + int size = dis.readInt(); + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis); + assertEquals(callId, responseHeader.getCallId()); + EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder(); + builder.mergeDelimitedFrom(dis); + assertEquals(size, IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build())); + } +} -- 2.7.4