From 63163a280f973f07ce91596848741cb1c07fd7f9 Mon Sep 17 00:00:00 2001 From: terry Date: Tue, 11 Oct 2016 17:33:45 +0800 Subject: [PATCH] KYLIN-1839, support kylin lib in HDFS Signed-off-by: terry --- .../kylin/engine/mr/common/AbstractHadoopJob.java | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index af2ed9f..73563d2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -252,14 +253,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { // for KylinJobMRLibDir String mrLibDir = kylinConf.getKylinJobMRLibDir(); if (!StringUtils.isBlank(mrLibDir)) { - File dirFileMRLIB = new File(mrLibDir); - if (dirFileMRLIB.exists()) { - if (kylinDependency.length() > 0) - kylinDependency.append(","); + if(kylinDependency.length() > 0) { + kylinDependency.append(","); + } kylinDependency.append(mrLibDir); - } else { - logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!"); - } } setJobTmpJarsAndFiles(job, kylinDependency.toString()); @@ -300,21 +297,27 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { try { Configuration jobConf = job.getConfiguration(); FileSystem fs = FileSystem.getLocal(jobConf); + FileSystem hdfs = FileSystem.get(jobConf); StringBuilder jarList = new StringBuilder(); StringBuilder fileList = new StringBuilder(); for (String fileName : fNameList) { Path p = new Path(fileName); - if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, fileName); + FileSystem current = (fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs); + if(!current.exists(p)) { + logger.warn("The directory '" + fileName + "for kylin dependency does not exist!!!"); + continue; + } + if (current.getFileStatus(p).isDirectory()) { + appendTmpDir(job, current, fileName); continue; } StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; if (list.length() > 0) list.append(","); - list.append(fs.getFileStatus(p).getPath().toString()); + list.append(current.getFileStatus(p).getPath()); } appendTmpFiles(fileList.toString(), jobConf); @@ -324,13 +327,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - private void appendTmpDir(Job job, String tmpDir) { + private void appendTmpDir(Job job, FileSystem fs, String tmpDir) { if (StringUtils.isBlank(tmpDir)) return; try { Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.getLocal(jobConf); FileStatus[] fList = fs.listStatus(new Path(tmpDir)); StringBuilder jarList = new StringBuilder(); @@ -339,7 +341,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (FileStatus file : fList) { Path p = file.getPath(); if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, p.toString()); + appendTmpDir(job, fs, p.toString()); continue; } @@ -622,3 +624,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } + -- 1.7.10.4