From 9552c4764d77d084e1f3b9d366ff5c911b2f82e8 Mon Sep 17 00:00:00 2001 From: Zhong Yanghong Date: Sun, 24 Jan 2016 08:02:57 +0000 Subject: [PATCH] Feature changed: automatically append hive dependent jars to 'tmpjars' --- .../kylin/engine/mr/common/AbstractHadoopJob.java | 110 +++++++++++++++++--- 1 files changed, 97 insertions(+), 13 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 748bac9..9893cfa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -24,12 +24,14 @@ package org.apache.kylin.engine.mr.common; */ import static org.apache.hadoop.util.StringUtils.*; - +import java.net.URI; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -145,6 +147,26 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } + private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar"; + + String filterKylinHiveDependency(String kylinHiveDependency) { + if (StringUtils.isBlank(kylinHiveDependency)) + return ""; + + StringBuilder jarList = new StringBuilder(); + + Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS); + Matcher matcher = hivePattern.matcher(kylinHiveDependency); + + while (matcher.find()) { + if (jarList.length() > 0) + jarList.append(","); + jarList.append(matcher.group()); + } + + return jarList.toString(); + } + private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; protected void setJobClasspath(Job job) { @@ -159,7 +181,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); - logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); + logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); Configuration jobConf = job.getConfiguration(); String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); @@ -175,38 +197,100 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { classpath = classpath + "," + kylinHBaseDependency; } + jobConf.set(MAP_REDUCE_CLASSPATH, classpath); + logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + + /* + * set extra dependencies as tmpjars & tmpfiles if configured + */ + StringBuilder kylinDependency = new StringBuilder(); + + // for hive dependencies if (kylinHiveDependency != null) { // yarn classpath is comma separated kylinHiveDependency = kylinHiveDependency.replace(":", ","); - classpath = classpath + "," + kylinHiveDependency; + + logger.info("Hive Dependencies Before Filtered: "+kylinHiveDependency); + String filteredHive = filterKylinHiveDependency(kylinHiveDependency); + logger.info("Hive Dependencies After Filtered: "+filteredHive); + + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(filteredHive); } - jobConf.set(MAP_REDUCE_CLASSPATH, classpath); - logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + // for KylinJobMRLibDir + String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir(); + if (!StringUtils.isBlank(mrLibDir)) { + File dirFileMRLIB = new File(mrLibDir); + if(dirFileMRLIB.exists()){ + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(mrLibDir); + }else{ + logger.info("The directory '"+mrLibDir+"' for 'kylin.job.mr.lib.dir' does not exist!!!"); + } + } - // set extra dependencies as tmpjars & tmpfiles if configured - setJobTmpJarsAndFiles(job); + setJobTmpJarsAndFiles(job, kylinDependency.toString()); } - private void setJobTmpJarsAndFiles(Job job) { - String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir(); - if (StringUtils.isBlank(mrLibDir)) + private void setJobTmpJarsAndFiles(Job job, String kylinDependency) { + if (StringUtils.isBlank(kylinDependency)) + return; + + String[] fNameList = kylinDependency.split(","); + + try { + Configuration jobConf = job.getConfiguration(); + FileSystem fs = FileSystem.getLocal(jobConf); + + StringBuilder jarList = new StringBuilder(); + StringBuilder fileList = new StringBuilder(); + + for (String fileName : fNameList) { + Path p = new Path(fileName); + if (fs.getFileStatus(p).isDirectory()) { + appendTmpDir(job, fileName); + continue; + } + + StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; + if (list.length() > 0) + list.append(","); + list.append(fs.getFileStatus(p).getPath().toString()); + } + + appendTmpFiles(fileList.toString(), jobConf); + appendTmpJars(jarList.toString(), jobConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void appendTmpDir(Job job, String tmpDir) { + if (StringUtils.isBlank(tmpDir)) return; try { Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.get(new Configuration(jobConf)); - FileStatus[] fList = fs.listStatus(new Path(mrLibDir)); + FileSystem fs = FileSystem.getLocal(jobConf); + FileStatus[] fList = fs.listStatus(new Path(tmpDir)); StringBuilder jarList = new StringBuilder(); StringBuilder fileList = new StringBuilder(); for (FileStatus file : fList) { Path p = file.getPath(); + if (fs.getFileStatus(p).isDirectory()) { + appendTmpDir(job, p.toString()); + continue; + } + StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; if (list.length() > 0) list.append(","); - list.append(mrLibDir + "/" + file.getPath().getName()); + list.append(fs.getFileStatus(p).getPath().toString()); } appendTmpFiles(fileList.toString(), jobConf); -- 1.7.1