Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
[SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457) - HDFS IO error java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759) at org.apache.hadoop.ipc.Client.call(Client.java:1164) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) at $Proxy9.create(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) at $Proxy9.create(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192) at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674) at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60) at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210) at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570) at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213) at org.apache.hadoop.ipc.Client.call(Client.java:1140) ... 36 more
The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { return call.getRpcResult(); } } }
Attachments
Attachments
Issue Links
- is related to
-
HADOOP-9107 Hadoop IPC client eats InterruptedException and sets interrupt on the thread which is not documented
- Open