Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.14.0, 1.12.5, 1.13.2
-
None
-
None
Description
This issue was brought up on the ML thread "hdfs lease issues on flink retry". See attached jobmanager.log which was provided by the user:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /user/p2epda/lake/delp_prod/PROD/APPROVED/data/gsam_qa_campaign_extract/NmsRtEvent/2728/temp/data/_temporary/0/_temporary/attempt__0000_r_000008_0/partMapper-r-00008.snappy.parquet for DFSClient_NONMAPREDUCE_-910267331_98 on 10.59.155.152 because this file lease is currently owned by DFSClient_NONMAPREDUCE_1747340094_77 on 10.50.196.245 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3194) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2813) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy19.create(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:313) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184) at com.sun.proxy.$Proxy20.create(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1828) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1712) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1647) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:480) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:476) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:491) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:417) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:930) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302) at com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:90) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745)
The user ran into FileAlreadyExistsException when it tried to create a file for which a lease already existed. Admittedly, they use their own implementation with GRHadoopOutputFormat. But dmvk helped investigating this as he was familiar with the error.
We seem to have the problem in HadoopOutputFormatBase where we use a fixed retry id 0 in HadoopOutputFormatBase:137.
Each resource in HDFS is allowed to have only one Writer accessing it. The LeaseManager manages this through leases. It appears that we tried to access the same file through another task due to HadoopOutputFormatBase generating the same TaskAttemptId. The retry interval was shorter (in that case 10 seconds) than Hadoop's hard-coded soft lease limit of 1min (see hadoop:HdfsConstants:62).
We could be able to overcome this by adding a dynamic retry count instead of _0.