Index: src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1130308) +++ src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.*; +//import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -48,6 +50,10 @@ // DEBUG log level does NOT emit RPC-level logging. private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); + //writableRpcVersion should be updated if there is a change + //in format of the rpc messages. + public static long writableRpcVersion = 1L; + /* Cache a client using its socket factory as the hash key */ static private class ClientCache { private Map clients = @@ -300,6 +306,31 @@ call.getParameterClasses()); method.setAccessible(true); + // Verify rpc version + if (call.getRpcVersion() != writableRpcVersion) { + // Client is using a different version of WritableRpc + throw new IOException( + "WritableRpc version mismatch, client side version=" + + call.getRpcVersion() + ", server side version=" + + writableRpcVersion); + } + + //Verify protocol version. + //Bypass the version check for VersionedProtocol + if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { + long clientVersion = call.getProtocolVersion(); + /* + ProtocolSignature serverInfo = ((VersionedProtocol) instance) + .getProtocolSignature(protocol.getCanonicalName(), call + .getProtocolVersion(), call.getClientMethodsHash()); + long serverVersion = serverInfo.getVersion(); + if (serverVersion != clientVersion) { + LOG.warn("Version mismatch: client version=" + clientVersion + + ", server version=" + serverVersion); + throw new RPC.VersionMismatch(protocol.getName(), clientVersion, + serverVersion); + } */ + } Object impl = null; if (protocol.isAssignableFrom(this.implementation)) { impl = this.instance; Index: src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (revision 1130308) +++ src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (working copy) @@ -23,10 +23,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.io.Writable; +//import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.VersionedProtocol; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; /** A method invocation, including the method name and its parameters.*/ @@ -36,13 +39,38 @@ protected Class[] parameterClasses; protected Object[] parameters; protected Configuration conf; + private long clientVersion; + private int clientMethodsHash; + //This could be different from static writableRpcVersion when received + //at server, if client is using a different version. + private long rpcVersion; + public Invocation() {} public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; + rpcVersion = WritableRpcEngine.writableRpcVersion; + if (method.getDeclaringClass().equals(VersionedProtocol.class)) { + //VersionedProtocol is exempted from version check. + clientVersion = 0; + clientMethodsHash = 0; + } else { + try { + Field versionField = method.getDeclaringClass().getField("VERSION"); + versionField.setAccessible(true); + this.clientVersion = versionField.getLong(method.getDeclaringClass()); + } catch (NoSuchFieldException ex) { + throw new RuntimeException("The " + method.getDeclaringClass(), ex); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + /* + this.clientMethodsHash = ProtocolSignature.getFingerprint(method + .getDeclaringClass().getMethods()); */ + } } /** @return The name of the method invoked. */ @@ -55,8 +83,27 @@ /** @return The parameter instances. */ public Object[] getParameters() { return parameters; } + long getProtocolVersion() { + return clientVersion; + } + + private int getClientMethodsHash() { + return clientMethodsHash; + } + + /** + * Returns the rpc version used by the client. + * @return rpcVersion + */ + public long getRpcVersion() { + return rpcVersion; + } + public void readFields(DataInput in) throws IOException { + rpcVersion = in.readLong(); methodName = in.readUTF(); + clientVersion = in.readLong(); + clientMethodsHash = in.readInt(); parameters = new Object[in.readInt()]; parameterClasses = new Class[parameters.length]; HbaseObjectWritable objectWritable = new HbaseObjectWritable(); @@ -68,7 +115,10 @@ } public void write(DataOutput out) throws IOException { + out.writeLong(rpcVersion); out.writeUTF(this.methodName); + out.writeLong(clientVersion); + out.writeInt(clientMethodsHash); out.writeInt(parameterClasses.length); for (int i = 0; i < parameterClasses.length; i++) { HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i], @@ -87,6 +137,9 @@ buffer.append(parameters[i]); } buffer.append(")"); + buffer.append(", rpc version="+rpcVersion); + buffer.append(", client version="+clientVersion); + buffer.append(", methodsFingerPrint="+clientMethodsHash); return buffer.toString(); }