Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25099

flink on yarn Accessing two HDFS Clusters

    XMLWordPrintableJSON

Details

    Description

      Flink version 1.13 supports configuration of Hadoop properties in flink-conf.yaml via flink.hadoop.*. There is A requirement to write checkpoint to HDFS with SSDS (called cluster B) to speed checkpoint writing, but this HDFS cluster is not the default HDFS in the flink client (called cluster A by default). Yaml is configured with nameservices for cluster A and cluster B, which is similar to HDFS federated mode.

      The configuration is as follows:

       

      flink.hadoop.dfs.nameservices: ACluster,BCluster
      flink.hadoop.fs.defaultFS: hdfs://BCluster
      
      flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
      flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.xxxx:9000
      flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.xxxx:50070
      flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xxxxxx:9000
      flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xxxxxx:50070
      flink.hadoop.dfs.client.failover.proxy.provider.ACluster: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
      
      flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
      flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xxxxxx:9000
      flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xxxxxx:50070
      flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xxxxxx:9000
      flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.xxxxx:50070
      flink.hadoop.dfs.client.failover.proxy.provider.BCluster: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
      

       

      However, an error occurred during the startup of the job, which is reported as follows:

      (change configuration items to A flink local client default HDFS cluster, the operation can be normal boot:  flink.hadoop.fs.DefaultFS: hdfs: / / ACluster)

      Caused by: BCluster
      java.net.UnknownHostException: BCluster
      at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
      at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:374)
      at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:308)
      at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
      at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
      at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
      at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
      at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
      at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
      at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
      at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
      at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
      at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
      at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      Is there a solution to the above problems? The pain point is that Flink can access two HDFS clusters, preferably through the configuration of Flink-conf. yaml.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            libra_816 Qizhu Chan
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: