Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously via the same connection as in the following synchronized code block:
synchronized (sendRpcRequestLock) { Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id + " " + call.rpcRequest); } byte[] data = d.getData(); int totalLength = d.getLength(); out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } }
And it then waits for result asynchronously via
/* Receive a response. * Because only one receiver, so no synchronization on in. */ private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header); int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); ......
However, we can see that the call returned by receiveRpcResonse() above may be in any order.
The following code
int totalLen = in.readInt();
eventually calls one of the following two methods, where rpcTimeOut is checked against:
/** Read a byte from the stream. * Send a ping if timeout on read. Retries if no failure is detected * until a byte is read. * @throws IOException for any IO problem other than socket timeout */ @Override public int read() throws IOException { int waiting = 0; do { try { return super.read(); } catch (SocketTimeoutException e) { waiting += soTimeout; handleTimeout(e, waiting); } } while (true); } /** Read bytes into a buffer starting from offset <code>off</code> * Send a ping if timeout on read. Retries if no failure is detected * until a byte is read. * * @return the total number of bytes read; -1 if the connection is closed. */ @Override public int read(byte[] buf, int off, int len) throws IOException { int waiting = 0; do { try { return super.read(buf, off, len); } catch (SocketTimeoutException e) { waiting += soTimeout; handleTimeout(e, waiting); } } while (true); }
But the waiting time is always initialized to 0 for each of the above read calls, so each call can take up to rpcTimeout. And the real time to time out a call appears to be accumulative.
For example, if the client issue call1, call2, then it waits for result, if the first call1 took (rpcTimeout - 1), thus no time out, the second took (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) which could be much bigger than rpcTimeout and should time out.
Worst case is that a RPC may take indeterminatey long and doesn't time out.
It seems more accurate to remember the time that an RPC is sent to the server, and then check time out here:
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { final Call call = createCall(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { throw new IOException("connection has been closed", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("interrupted waiting to send rpc request to server", e); throw new IOException(e); } synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new InterruptedIOException("Call interrupted"); } <=should check how long it has waited here, time out if rpcTimeout has been reached } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { return call.getRpcResponse(); } } }
basically we should change the call highlighted above from
public final void wait() throws InterruptedException
to
public final void wait(long timeout, int nanos) throws InterruptedException
and apply rpcTimeout as the parameter value here (notice that I'm ignoring the time needed to send rpc over to the server, and ideally we should include that too, so rpcTimeout could mean what it intends to mean).
Hi daryn and kihwal, would you please help taking a look at my above analysis to see if I have any misunderstanding here?
Thanks a lot.