Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (revision 736069) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (working copy) @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; @@ -83,9 +82,9 @@ synchronized (metricsList) { // Iterate through the rpcMetrics hashmap to propogate the different rpc metrics. - Set keys = metricsList.keySet(); + Set keys = metricsList.keySet(); - Iterator keyIter = keys.iterator(); + Iterator keyIter = keys.iterator(); while (keyIter.hasNext()) { Object key = keyIter.next(); Index: src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 736069) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -244,9 +244,8 @@ private void handleTimeout(SocketTimeoutException e) throws IOException { if (shouldCloseConnection.get() || !running.get()) { throw e; - } else { - sendPing(); } + sendPing(); } /** Read a byte from the stream. @@ -270,6 +269,7 @@ * * @return the total number of bytes read; -1 if the connection is closed. */ + @Override public int read(byte[] buf, int off, int len) throws IOException { do { try { @@ -589,6 +589,7 @@ } /** Deliver result to result collector. */ + @Override protected void callComplete() { results.callComplete(this); } @@ -679,7 +680,7 @@ * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. */ public Writable call(Writable param, InetSocketAddress address) - throws InterruptedException, IOException { + throws IOException { return call(param, address, null); } @@ -685,7 +686,7 @@ public Writable call(Writable param, InetSocketAddress addr, UserGroupInformation ticket) - throws InterruptedException, IOException { + throws IOException { Call call = new Call(param); Connection connection = getConnection(addr, ticket, call); connection.sendParam(call); // send the parameter @@ -700,12 +701,10 @@ if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; - } else { // local exception - throw wrapException(addr, call.error); } - } else { - return call.value; + throw wrapException(addr, call.error); } + return call.value; } } @@ -744,8 +743,7 @@ * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. */ - public Writable[] call(Writable[] params, InetSocketAddress[] addresses) - throws IOException { + public Writable[] call(Writable[] params, InetSocketAddress[] addresses) { if (addresses.length == 0) return new Writable[0]; ParallelResults results = new ParallelResults(params.length); Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 736069) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -78,7 +78,7 @@ // Leave this out in the hadoop ipc package but keep class name. Do this // so that we dont' get the logging of this class's invocations by doing our // blanket enabling DEBUG on the o.a.h.h. package. - private static final Log LOG = + static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC"); private HBaseRPC() { @@ -256,7 +256,7 @@ client = new HBaseClient(HbaseObjectWritable.class, conf, factory); clients.put(factory, client); } else { - ((HBaseClient)client).incCount(); + client.incCount(); } return client; } @@ -278,12 +278,12 @@ */ private void stopClient(HBaseClient client) { synchronized (this) { - ((HBaseClient)client).decCount(); - if (((HBaseClient)client).isZeroReference()) { - clients.remove(((HBaseClient)client).getSocketFactory()); + client.decCount(); + if (client.isZeroReference()) { + clients.remove(client.getSocketFactory()); } } - if (((HBaseClient)client).isZeroReference()) { + if (client.isZeroReference()) { client.stop(); } } @@ -339,8 +339,9 @@ /** * A version mismatch for the RPC protocol. */ - @SuppressWarnings("serial") public static class VersionMismatch extends IOException { + + private static final long serialVersionUID = 6898932453052518466L; private String interfaceName; private long clientVersion; private long serverVersion; @@ -467,11 +468,10 @@ long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { - return proxy; - } else { throw new VersionMismatch(protocol.getName(), clientVersion, - serverVersion); + serverVersion); } + return proxy; } /** @@ -513,8 +513,7 @@ * @throws IOException */ public static Object[] call(Method method, Object[][] params, - InetSocketAddress[] addrs, Configuration conf) - throws IOException { + InetSocketAddress[] addrs, Configuration conf) { Invocation[] invocations = new Invocation[params.length]; for (int i = 0; i < params.length; i++) @@ -657,11 +656,10 @@ Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException)target; - } else { - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - throw ioe; } + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + throw ioe; } catch (Throwable e) { IOException ioe = new IOException(e.toString()); ioe.setStackTrace(e.getStackTrace()); @@ -668,11 +666,12 @@ throw ioe; } } - } - - private static void log(String value) { - if (value!= null && value.length() > 55) - value = value.substring(0, 55)+"..."; - LOG.info(value); + + private static void log(String value) { + String val = value; + if (val!= null && val.length() > 55) + val = val.substring(0, 55)+"..."; + LOG.info(val); + } } } Index: src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 736069) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -178,9 +178,8 @@ if ("Unresolved address".equals(e.getMessage())) { throw new UnknownHostException("Invalid hostname for server: " + address.getHostName()); - } else { - throw e; } + throw e; } } @@ -494,11 +493,7 @@ } for(Call call : calls) { - try { - doPurge(call, now); - } catch (IOException e) { - LOG.warn("Error in purging old calls " + e); - } + doPurge(call, now); } } catch (OutOfMemoryError e) { // @@ -545,14 +540,15 @@ // Remove calls that have been pending in the responseQueue // for a long time. // - private void doPurge(Call call, long now) throws IOException { + private void doPurge(Call call, long now) { LinkedList responseQueue = call.connection.responseQueue; synchronized (responseQueue) { Iterator iter = responseQueue.listIterator(0); + Call responseCall = call; while (iter.hasNext()) { - call = iter.next(); - if (now > call.timestamp + PURGE_INTERVAL) { - closeConnection(call.connection); + responseCall = iter.next(); + if (now > responseCall.timestamp + PURGE_INTERVAL) { + closeConnection(responseCall.connection); break; } } @@ -814,12 +810,11 @@ processData(); data = null; return count; - } else { - processHeader(); - headerRead = true; - data = null; - continue; } + processHeader(); + headerRead = true; + data = null; + continue; } return count; } @@ -850,7 +845,7 @@ callQueue.put(call); // queue the call; maybe blocked here } - private synchronized void close() throws IOException { + private synchronized void close() { data = null; dataLengthBuffer = null; if (!channel.isOpen()) @@ -855,11 +850,11 @@ dataLengthBuffer = null; if (!channel.isOpen()) return; - try {socket.shutdownOutput();} catch(Exception e) {} + try {socket.shutdownOutput();} catch(Exception e) {} //TODO Not nice handling of exception if (channel.isOpen()) { - try {channel.close();} catch(Exception e) {} + try {channel.close();} catch(Exception e) {} //TODO Not nice handling of exception } - try {socket.close();} catch(Exception e) {} + try {socket.close();} catch(Exception e) {} //TODO Not nice handling of exception } } @@ -913,7 +908,7 @@ out.writeInt(call.id); // write call id out.writeBoolean(error != null); // write error flag - if (error == null) { + if (error == null && value != null) { value.write(out); } else { WritableUtils.writeString(out, errorClass); @@ -981,10 +976,7 @@ if (connectionList.remove(connection)) numConnections--; } - try { - connection.close(); - } catch (IOException e) { - } + connection.close(); } /** Sets the socket buffer size used for responding to RPCs */ @@ -991,7 +983,7 @@ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } /** Starts the service. Must be called before any calls will be handled. */ - public synchronized void start() throws IOException { + public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount];