Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
In this getSplits method, it will call hadoop FileInputFormat's getSplits method. related hadoop code is here. Simple code is as follows
// Hadoop FileInputFormat public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { StopWatch sw = new StopWatch().start(); FileStatus[] stats = listStatus(job); ...... } protected FileStatus[] listStatus(JobConf job) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); // Whether we need to recursive look into the directory structure ...... }
In listStatus method, it will obtain delegation tokens by calling TokenCache.obtainTokensForNamenodes method. Howerver this method will give up to get delegation tokens when credentials in jobconf.
So it's neccessary to inject current ugi credentials into jobconf.
Besides, when Flink support delegation tokens directly without keytab(refer to this PR), TokenCache.obtainTokensForNamenodes will failed without this patch because of no corresponding credentials.
Attachments
Issue Links
- links to