Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20935

can't write flink configuration to tmp file and add it to local resource in yarn session mode

    XMLWordPrintableJSON

Details

    Description

      In flink 1.12.0 or lastest version,when we execute command such as bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with follow errors:

      org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
      	at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411)
      	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498)
      	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
      	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730)
      Caused by: java.io.FileNotFoundException: File does not exist: /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp
      	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
      

      when we called startAppMaster method in YarnClusterDescriptor,it will be try to write flink configuration to tmp file and add it to local resource. but the follow code will make the tmp file system as a distribute file system

      // Upload the flink configuration
      		// write out configuration file
      		File tmpConfigurationFile = null;
      		try {
      			tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
      			BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
      
      			String flinkConfigKey = "flink-conf.yaml";
      			fileUploader.registerSingleLocalResource(
      				flinkConfigKey,
      				new Path(tmpConfigurationFile.getAbsolutePath()),
      				"",
      				LocalResourceType.FILE,
      				true,
      				true);
      			classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
      		} finally {
      			if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
      				LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
      			}
      		}
      
       tmpConfigurationFile.getAbsolutePath() 

      method will be return a path without file schema and the file system will be considered as a distribute file system

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yuemeng yuemeng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: