Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-1748

HDFS Sink should check if the thread is interrupted before performing any HDFS operations

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: None
    • Labels:
      None

      Description

      If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:

      [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
      java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
      	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
      	at $Proxy9.create(Unknown Source)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
      	at $Proxy9.create(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
      	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
      	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
      	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
      	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
      	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
      	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
      	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
      	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
      	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
      	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
      	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
      	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Caused by: java.nio.channels.ClosedByInterruptException
      	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
      	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
      	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
      	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
      	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
      	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
      	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
      	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
      	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
      	... 36 more
      

      The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup

      The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.

        public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
            ConnectionId remoteId) throws InterruptedException, IOException {
          Call call = new Call(rpcKind, rpcRequest);
          Connection connection = getConnection(remoteId, call);
          connection.sendParam(call);                 // send the parameter
          boolean interrupted = false;
          synchronized (call) {
            while (!call.done) {
              try {
                call.wait();                           // wait for the result
              } catch (InterruptedException ie) {
                // save the fact that we were interrupted
                interrupted = true;
              }
            }
      
            if (interrupted) {
              // set the interrupt flag now that we are done waiting
              Thread.currentThread().interrupt();
            }
      
            if (call.error != null) {
              if (call.error instanceof RemoteException) {
                call.error.fillInStackTrace();
                throw call.error;
              } else { // local exception
                InetSocketAddress address = connection.getRemoteAddress();
                throw NetUtils.wrapException(address.getHostName(),
                        address.getPort(),
                        NetUtils.getHostname(),
                        0,
                        call.error);
              }
            } else {
              return call.getRpcResult();
            }
          }
        }
      
      1. FLUME-1748-1.patch
        4 kB
        Hari Shreedharan
      2. FLUME-1748.patch
        4 kB
        Hari Shreedharan

        Issue Links

          Activity

          Hide
          hshreedharan Hari Shreedharan added a comment -

          Seems like a previous Hadoop RPC using the same bucket writer taking too long can cause this issue, because several threads end up being blocked on the same method. This eventually will cause the HDFS sink threads to be interrupted by the future.cancel call – but they still end up going into the RPC calls only to be hit by a bunch of exceptions.

          Show
          hshreedharan Hari Shreedharan added a comment - Seems like a previous Hadoop RPC using the same bucket writer taking too long can cause this issue, because several threads end up being blocked on the same method. This eventually will cause the HDFS sink threads to be interrupted by the future.cancel call – but they still end up going into the RPC calls only to be hit by a bunch of exceptions.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Simple patch, not run unit tests. Will update later if necessary. Let me know if you don't agree with this.

          Show
          hshreedharan Hari Shreedharan added a comment - Simple patch, not run unit tests. Will update later if necessary. Let me know if you don't agree with this.
          Hide
          brocknoland Brock Noland added a comment -

          The error messages are missing spaces.

          Show
          brocknoland Brock Noland added a comment - The error messages are missing spaces.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Adding some context:

          Because all of the HDFS write operations happen from synchronized calls, they block on the object monitor to write to HDFS. Since Hadoop RPC does not respond to interrupts until the call actually completes, the timeouts from Flume are pretty much useless. So if one of the calls takes way longer than the timeout, the futures for other writes may be cancelled before they acquire the monitor itself - therefore these threads end up having their interrupt flag set. The IO operations usually check the interrupt status and fail the operation if the flag is set (usually throwing a ClosedByInterruptedException or something).

          The patch only tries to throw an exception which makes this clear.

          Show
          hshreedharan Hari Shreedharan added a comment - Adding some context: Because all of the HDFS write operations happen from synchronized calls, they block on the object monitor to write to HDFS. Since Hadoop RPC does not respond to interrupts until the call actually completes, the timeouts from Flume are pretty much useless. So if one of the calls takes way longer than the timeout, the futures for other writes may be cancelled before they acquire the monitor itself - therefore these threads end up having their interrupt flag set. The IO operations usually check the interrupt status and fail the operation if the flag is set (usually throwing a ClosedByInterruptedException or something). The patch only tries to throw an exception which makes this clear.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Added spaces in the error message.

          Show
          hshreedharan Hari Shreedharan added a comment - Added spaces in the error message.
          Hide
          brocknoland Brock Noland added a comment -

          Committed to trunk and 1.4. Thank you Hari for your contribution!

          Show
          brocknoland Brock Noland added a comment - Committed to trunk and 1.4. Thank you Hari for your contribution!
          Hide
          hudson Hudson added a comment -

          Integrated in flume-trunk #336 (See https://builds.apache.org/job/flume-trunk/336/)
          FLUME-1748: HDFS Sink should check if the thread is interrupted before performing any HDFS operations (Revision aa549c4f27db848cb8900533fd0f16562d971aa2)

          Result = SUCCESS
          brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aa549c4f27db848cb8900533fd0f16562d971aa2
          Files :

          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
          Show
          hudson Hudson added a comment - Integrated in flume-trunk #336 (See https://builds.apache.org/job/flume-trunk/336/ ) FLUME-1748 : HDFS Sink should check if the thread is interrupted before performing any HDFS operations (Revision aa549c4f27db848cb8900533fd0f16562d971aa2) Result = SUCCESS brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aa549c4f27db848cb8900533fd0f16562d971aa2 Files : flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java

            People

            • Assignee:
              hshreedharan Hari Shreedharan
              Reporter:
              hshreedharan Hari Shreedharan
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development