Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33155

Flink ResourceManager continuously fails to start TM container on YARN when Kerberos enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Deployment / YARN
    • None

    Description

      When Kerberos enabled(with key tab) and after one day(the container token expired), Flink fails to create the TaskManager container on YARN due to the following exception.

       

      2023-09-25 16:48:50,030 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_1695106898104_0003_01_000069 is terminated. Diagnostics: Container container_1695106898104_0003_01_000069 was invalid. Diagnostics: [2023-09-25 16:48:45.710]token (token for hadoop: HDFS_DELEGATION_TOKEN owner=hadoop/master-1-1.c-5ee7bdc598b6e1cc.cn-beijing.emr.aliyuncs.com@EMR.C-5EE7BDC598B6E1CC.COM, renewer=, realUser=, issueDate=1695196431487, maxDate=1695801231487, sequenceNumber=12, masterKeyId=3) can't be found in cache
      org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for hadoop: HDFS_DELEGATION_TOKEN owner=xxxx, renewer=, realUser=, issueDate=1695196431487, maxDate=1695801231487, sequenceNumber=12, masterKeyId=3) can't be found in cache
          at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
          at org.apache.hadoop.ipc.Client.call(Client.java:1491)
          at org.apache.hadoop.ipc.Client.call(Client.java:1388)
          at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
          at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
          at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
          at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:907)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:431)
          at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166)
          at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158)
          at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96)
          at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362)
          at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
          at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1666)
          at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
          at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1573)
          at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
          at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1588)
          at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
          at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
          at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
          at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
          at java.security.AccessController.doPrivileged(Native Method)
          at javax.security.auth.Subject.doAs(Subject.java:422)
          at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
          at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
          at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:243)
          at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:236)
          at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:224)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:750) 

      The root cause might be that we are reading the delegation token from JM local file[1]. It will expire after one day. When the old TaskManager container crashes and ResourceManager tries to create a new one, the YARN NodeManager will use the expired token to localize the resources for TaskManager and then fail.

      Instead, we could read the latest valid token from UGI and then set it in the ContainerLaunchContext.

       

      [1]. https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L434

      Attachments

        Activity

          People

            Unassigned Unassigned
            wangyang0918 Yang Wang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: