Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: common
    • Labels:
      None

      Description

      In Client.java, sendRpcRequest does the following

         /** Initiates a rpc call by sending the rpc request to the remote server.
           * Note: this is not called from the Connection thread, but by other
           * threads.
           * @param call - the rpc request
           */
          public void sendRpcRequest(final Call call)
              throws InterruptedException, IOException {
            if (shouldCloseConnection.get()) {
              return;
            }
      
            // Serialize the call to be sent. This is done from the actual
            // caller thread, rather than the sendParamsExecutor thread,
      
            // so that if the serialization throws an error, it is reported
            // properly. This also parallelizes the serialization.
            //
            // Format of a call on the wire:
            // 0) Length of rest below (1 + 2)
            // 1) RpcRequestHeader  - is serialized Delimited hence contains length
            // 2) RpcRequest
            //
            // Items '1' and '2' are prepared here. 
            RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
                call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
                clientId);
      
            final ResponseBuffer buf = new ResponseBuffer();
            header.writeDelimitedTo(buf);
            RpcWritable.wrap(call.rpcRequest).writeTo(buf);
      
            synchronized (sendRpcRequestLock) {
              Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
                @Override
                public void run() {
                  try {
                    synchronized (ipcStreams.out) {
                      if (shouldCloseConnection.get()) {
                        return;
                      }
                      if (LOG.isDebugEnabled()) {
                        LOG.debug(getName() + " sending #" + call.id
                            + " " + call.rpcRequest);
                      }
                      // RpcRequestHeader + RpcRequest
                      ipcStreams.sendRequest(buf.toByteArray());
                      ipcStreams.flush();
                    }
                  } catch (IOException e) {
                    // exception at this point would leave the connection in an
                    // unrecoverable state (eg half a call left on the wire).
                    // So, close the connection, killing any outstanding calls
                    markClosed(e);
                  } finally {
                    //the buffer is just an in-memory buffer, but it is still polite to
                    // close early
                    IOUtils.closeStream(buf);
                  }
                }
              });
      
              try {
                senderFuture.get();
              } catch (ExecutionException e) {
                Throwable cause = e.getCause();
      
                // cause should only be a RuntimeException as the Runnable above
                // catches IOException
                if (cause instanceof RuntimeException) {
                  throw (RuntimeException) cause;
                } else {
                  throw new RuntimeException("unexpected checked exception", cause);
                }
              }
            }
          }
      

      It's observed that the call can be stuck at senderFuture.get(); with the following stack

      "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting on condition [0x00007f697c582000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000006187e5ec0> (a java.util.concurrent.FutureTask)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
              at java.util.concurrent.FutureTask.get(FutureTask.java:191)
              at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088)
              - locked <0x00000006215c1e08> (a java.lang.Object)
              at org.apache.hadoop.ipc.Client.call(Client.java:1483)
              at org.apache.hadoop.ipc.Client.call(Client.java:1441)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
              at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266)
              at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
              at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
              at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323)
              at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310)
              at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298)
              at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
              at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275)
              - locked <0x00000006187e5530> (a java.lang.Object)
              at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
              at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629)
              at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338)
              at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334)
              at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
              at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
      

      Given that we support rpcTimeOut, we could chose the second method of Future below:

        /**
           * Waits if necessary for the computation to complete, and then
           * retrieves its result.
           *
           * @return the computed result
           * @throws CancellationException if the computation was cancelled
           * @throws ExecutionException if the computation threw an
           * exception
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
           */
          V get() throws InterruptedException, ExecutionException;
      
          /**
           * Waits if necessary for at most the given time for the computation
           * to complete, and then retrieves its result, if available.
           *
           * @param timeout the maximum time to wait
           * @param unit the time unit of the timeout argument
           * @return the computed result
           * @throws CancellationException if the computation was cancelled
           * @throws ExecutionException if the computation threw an
           * exception
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
           * @throws TimeoutException if the wait timed out
           */
          V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;
      

      In theory, since the RPC at client is serialized, we could just use the main thread to do the execution, instead of using a threadpool to create new thread. This can be discussed in a separate jira.

      And why the RPC is not processed and returned by NN is another topic (HADOOP-15538).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                yzhangal Yongjun Zhang
                Reporter:
                yzhangal Yongjun Zhang
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated: