Index: src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.conf.*; @@ -48,6 +48,10 @@ // DEBUG log level does NOT emit RPC-level logging. private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); + //writableRpcVersion should be updated if there is a change + //in format of the rpc messages. + public static long writableRpcVersion = 1L; + /* Cache a client using its socket factory as the hash key */ static private class ClientCache { private Map clients = @@ -300,6 +304,30 @@ call.getParameterClasses()); method.setAccessible(true); + // Verify rpc version + if (call.getRpcVersion() != writableRpcVersion) { + // Client is using a different version of WritableRpc + throw new IOException( + "WritableRpc version mismatch, client side version=" + + call.getRpcVersion() + ", server side version=" + + writableRpcVersion); + } + + //Verify protocol version. + //Bypass the version check for VersionedProtocol + if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { + long clientVersion = call.getProtocolVersion(); + ProtocolSignature serverInfo = ((VersionedProtocol) instance) + .getProtocolSignature(protocol.getCanonicalName(), call + .getProtocolVersion(), call.getClientMethodsHash()); + long serverVersion = serverInfo.getVersion(); + if (serverVersion != clientVersion) { + LOG.warn("Version mismatch: client version=" + clientVersion + + ", server version=" + serverVersion); + throw new RPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } + } Object impl = null; if (protocol.isAssignableFrom(this.implementation)) { impl = this.instance; Index: src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (working copy) @@ -27,6 +27,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; /** A method invocation, including the method name and its parameters.*/ @@ -36,13 +37,37 @@ protected Class[] parameterClasses; protected Object[] parameters; protected Configuration conf; + private long clientVersion; + private int clientMethodsHash; + //This could be different from static writableRpcVersion when received + //at server, if client is using a different version. + private long rpcVersion; + public Invocation() {} public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; + rpcVersion = WritableRpcEngine.writableRpcVersion; + if (method.getDeclaringClass().equals(VersionedProtocol.class)) { + //VersionedProtocol is exempted from version check. + clientVersion = 0; + clientMethodsHash = 0; + } else { + try { + Field versionField = method.getDeclaringClass().getField("VERSION"); + versionField.setAccessible(true); + this.clientVersion = versionField.getLong(method.getDeclaringClass()); + } catch (NoSuchFieldException ex) { + throw new RuntimeException("The " + method.getDeclaringClass(), ex); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + this.clientMethodsHash = ProtocolSignature.getFingerprint(method + .getDeclaringClass().getMethods()); + } } /** @return The name of the method invoked. */ @@ -55,8 +80,27 @@ /** @return The parameter instances. */ public Object[] getParameters() { return parameters; } + long getProtocolVersion() { + return clientVersion; + } + + protected int getClientMethodsHash() { + return clientMethodsHash; + } + + /** + * Returns the rpc version used by the client. + * @return rpcVersion + */ + public long getRpcVersion() { + return rpcVersion; + } + public void readFields(DataInput in) throws IOException { + rpcVersion = in.readLong(); methodName = in.readUTF(); + clientVersion = in.readLong(); + clientMethodsHash = in.readInt(); parameters = new Object[in.readInt()]; parameterClasses = new Class[parameters.length]; HbaseObjectWritable objectWritable = new HbaseObjectWritable(); @@ -68,7 +112,10 @@ } public void write(DataOutput out) throws IOException { + out.writeLong(rpcVersion); out.writeUTF(this.methodName); + out.writeLong(clientVersion); + out.writeInt(clientMethodsHash); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i], @@ -87,6 +134,9 @@ buffer.append(parameters[i]); } buffer.append(")"); + buffer.append(", rpc version="+rpcVersion); + buffer.append(", client version="+clientVersion); + buffer.append(", methodsFingerPrint="+clientMethodsHash); return buffer.toString(); } Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (working copy) @@ -24,7 +24,6 @@ import java.net.InetSocketAddress; import javax.net.SocketFactory; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -22,7 +22,6 @@ import com.google.common.base.Function; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.VersionedProtocol; import java.io.IOException; import java.net.InetSocketAddress; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.VersionedProtocol; /** * Clients interact with HRegionServers using a handle to the HRegionInterface. Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (working copy) @@ -22,7 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -55,7 +55,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; Index: src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (working copy) @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.UnknownRegionException; -import org.apache.hadoop.ipc.VersionedProtocol; /** * Clients interact with the HMasterInterface to gain access to meta-level Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; Index: src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (working copy) @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.ipc.VersionedProtocol; /** * The Master publishes this Interface for RegionServers to register themselves Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.ipc.Invocation; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; @@ -2565,6 +2570,17 @@ @Override @QosPriority(priority=HIGH_QOS) + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + if (protocol.equals(HRegionInterface.class.getName())) { + return new ProtocolSignature(HRegionInterface.VERSION, null); + } + throw new IOException("Unknown protocol: " + protocol); + } + + @Override + @QosPriority(priority=HIGH_QOS) public long getProtocolVersion(final String protocol, final long clientVersion) throws IOException { if (protocol.equals(HRegionInterface.class.getName())) { Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1134732) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; @@ -515,6 +516,18 @@ return assigned; } + @Override + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + if (HMasterInterface.class.getName().equals(protocol)) { + return new ProtocolSignature(HMasterInterface.VERSION, null); + } else if (HMasterRegionInterface.class.getName().equals(protocol)) { + return new ProtocolSignature(HMasterRegionInterface.VERSION, null); + } + throw new IOException("Unknown protocol: " + protocol); + } + public long getProtocolVersion(String protocol, long clientVersion) { if (HMasterInterface.class.getName().equals(protocol)) { return HMasterInterface.VERSION; Index: src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +/** + * Superclass of all protocols that use Hadoop RPC. + * Subclasses of this interface are also supposed to have + * a static final long versionID field. + */ +public interface VersionedProtocol { + + /** + * Return protocol version corresponding to protocol interface. + * @param protocol The classname of the protocol interface + * @param clientVersion The version of the protocol that the client speaks + * @return the version that the server will speak + * @throws IOException if any IO error occurs + */ + @Deprecated + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException; + + /** + * Return protocol version corresponding to protocol interface. + * @param protocol The classname of the protocol interface + * @param clientVersion The version of the protocol that the client speaks + * @param clientMethodsHash the hashcode of client protocol methods + * @return the server protocol signature containing its version and + * a list of its supported methods + * @see ProtocolSignature#getProtocolSignature(VersionedProtocol, String, + * long, int) for a default implementation + */ + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, + int clientMethodsHash) throws IOException; +} Index: src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java (revision 0) @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +public class ProtocolSignature implements Writable { + static { // register a ctor + WritableFactories.setFactory + (ProtocolSignature.class, + new WritableFactory() { + public Writable newInstance() { return new ProtocolSignature(); } + }); + } + + private long version; + private int[] methods = null; // an array of method hash codes + + /** + * default constructor + */ + public ProtocolSignature() { + } + + /** + * Constructor + * + * @param version server version + * @param methodHashcodes hash codes of the methods supported by server + */ + public ProtocolSignature(long version, int[] methodHashcodes) { + this.version = version; + this.methods = methodHashcodes; + } + + public long getVersion() { + return version; + } + + public int[] getMethods() { + return methods; + } + + @Override + public void readFields(DataInput in) throws IOException { + version = in.readLong(); + boolean hasMethods = in.readBoolean(); + if (hasMethods) { + int numMethods = in.readInt(); + methods = new int[numMethods]; + for (int i=0; i type : method.getParameterTypes()) { + hashcode = 31*hashcode ^ type.getName().hashCode(); + } + return hashcode; + } + + /** + * Convert an array of Method into an array of hash codes + * + * @param methods + * @return array of hash codes + */ + private static int[] getFingerprints(Method[] methods) { + if (methods == null) { + return null; + } + int[] hashCodes = new int[methods.length]; + for (int i = 0; i + PROTOCOL_FINGERPRINT_CACHE = + new HashMap(); + + /** + * Return a protocol's signature and finger print from cache + * + * @param protocol a protocol class + * @param serverVersion protocol version + * @return its signature and finger print + */ + private static ProtocolSigFingerprint getSigFingerprint( + Class protocol, long serverVersion) { + String protocolName = protocol.getName(); + synchronized (PROTOCOL_FINGERPRINT_CACHE) { + ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName); + if (sig == null) { + int[] serverMethodHashcodes = getFingerprints(protocol.getMethods()); + sig = new ProtocolSigFingerprint( + new ProtocolSignature(serverVersion, serverMethodHashcodes), + getFingerprint(serverMethodHashcodes)); + PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig); + } + return sig; + } + } + + /** + * Get a server protocol's signature + * + * @param clientMethodsHashCode client protocol methods hashcode + * @param serverVersion server protocol version + * @param protocol protocol + * @return the server's protocol signature + */ + static ProtocolSignature getProtocolSignature( + int clientMethodsHashCode, + long serverVersion, + Class protocol) { + // try to get the finger print & signature from the cache + ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion); + + // check if the client side protocol matches the one on the server side + if (clientMethodsHashCode == sig.fingerprint) { + return new ProtocolSignature(serverVersion, null); // null indicates a match + } + + return sig.signature; + } + + /** + * Get a server protocol's signature + * + * @param server server implementation + * @param protocol server protocol + * @param clientVersion client's version + * @param clientMethodsHash client's protocol's hash code + * @return the server protocol's signature + * @throws IOException if any error occurs + */ + @SuppressWarnings("unchecked") + public static ProtocolSignature getProtocolSignature(VersionedProtocol server, + String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class inter; + try { + inter = (Class)Class.forName(protocol); + } catch (Exception e) { + throw new IOException(e); + } + long serverVersion = server.getProtocolVersion(protocol, clientVersion); + return ProtocolSignature.getProtocolSignature( + clientMethodsHash, serverVersion, inter); + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (revision 1137262) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Pair; @@ -40,6 +41,16 @@ protected static Log log = LogFactory.getLog(AggregateImplementation.class); @Override + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + if (AggregateProtocol.class.getName().equals(protocol)) { + return new ProtocolSignature(AggregateProtocol.VERSION, null); + } + throw new IOException("Unknown protocol: " + protocol); + } + + @Override public T getMax(ColumnInterpreter ci, Scan scan) throws IOException { T temp; Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (revision 1137262) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (working copy) @@ -39,6 +39,7 @@ * input parameters. */ public interface AggregateProtocol extends CoprocessorProtocol { + public static final long VERSION = 1L; /** * Gives the maximum for a given combination of column qualifier and column Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java (revision 1137262) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java (working copy) @@ -19,7 +19,8 @@ import java.io.IOException; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; +import org.apache.hadoop.hbase.ipc.VersionedProtocol; /** * This abstract class provides default implementation of an Endpoint. @@ -60,6 +61,13 @@ public void stop(CoprocessorEnvironment env) { } @Override + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + return new ProtocolSignature(VERSION, null); + } + + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return VERSION; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java (revision 1137280) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java (working copy) @@ -36,9 +36,12 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -48,6 +51,7 @@ public class TestServerCustomProtocol { /* Test protocol */ private static interface PingProtocol extends CoprocessorProtocol { + static long VERSION = 1; public String ping(); public int getPingCount(); public int incrementCount(int diff); @@ -56,8 +60,6 @@ /* Test protocol implementation */ private static class PingHandler implements PingProtocol, VersionedProtocol { - static int VERSION = 1; - private int counter = 0; @Override public String ping() { @@ -87,6 +89,13 @@ } @Override + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + return new ProtocolSignature(VERSION, null); + } + + @Override public long getProtocolVersion(String s, long l) throws IOException { return VERSION; } Index: src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java (revision 1137280) +++ src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorProtocol.java (working copy) @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hadoop.ipc.VersionedProtocol; - /** * All custom RPC protocols to be exported by Coprocessors must extend this interface. * @@ -37,4 +35,5 @@ *

*/ public interface CoprocessorProtocol extends VersionedProtocol { + public static final long VERSION = 1L; } Index: src/main/java/org/apache/hadoop/hbase/ipc/Status.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/Status.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/Status.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +/** + * Status of a Hadoop IPC call. + */ +enum Status { + SUCCESS (0), + ERROR (1), + FATAL (-1); + + int state; + private Status(int state) { + this.state = state; + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1137362) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -61,7 +62,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -84,7 +85,7 @@ * The first four bytes of Hadoop RPC connections */ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - public static final byte CURRENT_VERSION = 3; + public static final byte CURRENT_VERSION = 4; /** * How many calls/handler are allowed in the queue. @@ -294,8 +295,7 @@ new ThreadFactoryBuilder().setNameFormat( "IPC Reader %d on port " + port).build()); for (int i = 0; i < readThreads; ++i) { - Selector readSelector = Selector.open(); - Reader reader = new Reader(readSelector); + Reader reader = new Reader(); readers[i] = reader; readPool.execute(reader); } @@ -309,40 +309,51 @@ private class Reader implements Runnable { private volatile boolean adding = false; - private Selector readSelector = null; + private final Selector readSelector; - Reader(Selector readSelector) { - this.readSelector = readSelector; + Reader() throws IOException { + this.readSelector = Selector.open(); } public void run() { - synchronized(this) { - while (running) { - SelectionKey key = null; - try { - readSelector.select(); - while (adding) { - this.wait(1000); - } + LOG.info("Starting " + getName()); + try { + doRunLoop(); + } finally { + try { + readSelector.close(); + } catch (IOException ioe) { + LOG.error("Error closing read selector in " + getName(), ioe); + } + } + } - Iterator iter = readSelector.selectedKeys().iterator(); - while (iter.hasNext()) { - key = iter.next(); - iter.remove(); - if (key.isValid()) { - if (key.isReadable()) { - doRead(key); - } + private synchronized void doRunLoop() { + while (running) { + SelectionKey key = null; + try { + readSelector.select(); + while (adding) { + this.wait(1000); + } + + Iterator iter = readSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + key = iter.next(); + iter.remove(); + if (key.isValid()) { + if (key.isReadable()) { + doRead(key); } - key = null; } - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + "caught: " + - StringUtils.stringifyException(e)); - } - } catch (IOException ex) { - LOG.error("Error in Reader", ex); + key = null; } + } catch (InterruptedException e) { + if (running) { // unexpected -- log it + LOG.info(getName() + " unexpectedly interrupted: " + + StringUtils.stringifyException(e)); + } + } catch (IOException ex) { + LOG.error("Error in Reader", ex); } } } @@ -581,7 +592,7 @@ // Sends responses of RPC back to clients. private class Responder extends Thread { - private Selector writeSelector; + private final Selector writeSelector; private int pending; // connections waiting to register final static int PURGE_INTERVAL = 900000; // 15mins @@ -597,6 +608,19 @@ public void run() { LOG.info(getName() + ": starting"); SERVER.set(HBaseServer.this); + try { + doRunLoop(); + } finally { + LOG.info("Stopping " + this.getName()); + try { + writeSelector.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close write selector in " + this.getName(), ioe); + } + } + } + + private void doRunLoop() { long lastPurgeTime = 0; // last check for old calls. while (running) { @@ -937,6 +961,7 @@ hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); + setupBadVersionResponse(version); return -1; } dataLengthBuffer.clear(); @@ -975,6 +1000,40 @@ } } + /** + * Try to set up the response to indicate that the client version + * is incompatible with the server. This can contain special-case + * code to speak enough of past IPC protocols to pass back + * an exception to the caller. + * @param clientVersion the version the caller is using + * @throws IOException + */ + private void setupBadVersionResponse(int clientVersion) throws IOException { + String errMsg = "Server IPC version " + CURRENT_VERSION + + " cannot communicate with client version " + clientVersion; + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + if (clientVersion >= 3) { + Call fakeCall = new Call(-1, null, this); + // Versions 3 and greater can interpret this exception + // response in the same manner + setupResponse(buffer, fakeCall, Status.FATAL, + null, VersionMismatch.class.getName(), errMsg); + + responder.doRespond(fakeCall); + } else if (clientVersion == 2) { // Hadoop 0.18.3 + Call fakeCall = new Call(0, null, this); + DataOutputStream out = new DataOutputStream(buffer); + out.writeInt(0); // call ID + out.writeBoolean(true); // error + WritableUtils.writeString(out, VersionMismatch.class.getName()); + WritableUtils.writeString(out, errMsg); + fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray())); + + responder.doRespond(fakeCall); + } + } + /// Reads the connection header following version private void processHeader() throws IOException { DataInputStream in = @@ -1000,9 +1059,23 @@ if (LOG.isDebugEnabled()) LOG.debug(" got #" + id); - Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param - param.readFields(dis); + Writable param; + try { + param = ReflectionUtils.newInstance(paramClass, conf);//read param + param.readFields(dis); + } catch (Throwable t) { + LOG.warn("Unable to read call parameters for client " + + getHostAddress(), t); + final Call readParamsFailedCall = new Call(id, null, this); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, + t.getClass().getName(), + "IPC server unable to read call parameters: " + t.getMessage()); + responder.doRespond(readParamsFailedCall); + return; + } + Call call = new Call(id, param, this); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { @@ -1207,6 +1280,46 @@ responder = new Responder(); } + /** + * Setup response for the IPC Call. + * + * @param response buffer to serialize the response into + * @param call {@link Call} to which we are setting up the response + * @param status {@link Status} of the IPC call + * @param rv return value for the IPC Call, if the call was successful + * @param errorClass error class, if the the call failed + * @param error error message, if the call failed + * @throws IOException + */ + private void setupResponse(ByteArrayOutputStream response, + Call call, Status status, + Writable rv, String errorClass, String error) + throws IOException { + response.reset(); + DataOutputStream out = new DataOutputStream(response); + out.writeInt(call.id); // write call id + out.writeInt(status.state); // write status + + if (status == Status.SUCCESS) { + try { + rv.write(out); + } catch (Throwable t) { + LOG.warn("Error serializing call response for call " + call, t); + // Call back to same function - this is OK since the + // buffer is reset at the top, and since status is changed + // to ERROR it won't infinite loop. + setupResponse(response, call, Status.ERROR, + null, t.getClass().getName(), + StringUtils.stringifyException(t)); + return; + } + } else { + WritableUtils.writeString(out, errorClass); + WritableUtils.writeString(out, error); + } + call.setResponse(ByteBuffer.wrap(response.toByteArray())); + } + protected void closeConnection(Connection connection) { synchronized (connectionList) { if (connectionList.remove(connection))