Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-1748

HDFS Sink should check if the thread is interrupted before performing any HDFS operations

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: None
    • Labels:
      None

      Description

      If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:

      [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
      java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
      	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
      	at $Proxy9.create(Unknown Source)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
      	at $Proxy9.create(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
      	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
      	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
      	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
      	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
      	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
      	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
      	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
      	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
      	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
      	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
      	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
      	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Caused by: java.nio.channels.ClosedByInterruptException
      	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
      	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
      	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
      	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
      	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
      	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
      	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
      	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
      	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
      	... 36 more
      

      The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup

      The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.

        public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
            ConnectionId remoteId) throws InterruptedException, IOException {
          Call call = new Call(rpcKind, rpcRequest);
          Connection connection = getConnection(remoteId, call);
          connection.sendParam(call);                 // send the parameter
          boolean interrupted = false;
          synchronized (call) {
            while (!call.done) {
              try {
                call.wait();                           // wait for the result
              } catch (InterruptedException ie) {
                // save the fact that we were interrupted
                interrupted = true;
              }
            }
      
            if (interrupted) {
              // set the interrupt flag now that we are done waiting
              Thread.currentThread().interrupt();
            }
      
            if (call.error != null) {
              if (call.error instanceof RemoteException) {
                call.error.fillInStackTrace();
                throw call.error;
              } else { // local exception
                InetSocketAddress address = connection.getRemoteAddress();
                throw NetUtils.wrapException(address.getHostName(),
                        address.getPort(),
                        NetUtils.getHostname(),
                        0,
                        call.error);
              }
            } else {
              return call.getRpcResult();
            }
          }
        }
      

        Attachments

        1. FLUME-1748.patch
          4 kB
          Hari Shreedharan
        2. FLUME-1748-1.patch
          4 kB
          Hari Shreedharan

          Issue Links

            Activity

              People

              • Assignee:
                hshreedharan Hari Shreedharan
                Reporter:
                hshreedharan Hari Shreedharan
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: