From 7ce15ccee153bb1eb5094a238d688966f22cc25e Mon Sep 17 00:00:00 2001 From: Zhong Yanghong Date: Sun, 24 Jan 2016 09:35:22 +0000 Subject: [PATCH] Feature changed: automatically append hive dependent jars to 'tmpjars' --- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 123 ++++++++++++++++--- 1 files changed, 103 insertions(+), 20 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index a851756..f5c85eb 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -127,6 +129,26 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return retVal; } + private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar"; + + String filterKylinHiveDependency(String kylinHiveDependency) { + if (StringUtils.isBlank(kylinHiveDependency)) + return ""; + + StringBuilder jarList = new StringBuilder(); + + Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS); + Matcher matcher = hivePattern.matcher(kylinHiveDependency); + + while (matcher.find()) { + if (jarList.length() > 0) + jarList.append(","); + jarList.append(matcher.group()); + } + + return jarList.toString(); + } + private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; protected void setJobClasspath(Job job) { @@ -141,14 +163,13 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); - logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); + logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); Configuration jobConf = job.getConfiguration(); String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); if (classpath == null || classpath.length() == 0) { logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value."); classpath = getDefaultMapRedClasspath(); - classpath = classpath.replace(":", ","); // yarn classpath is comma separated logger.info("The default mapred classpath is: " + classpath); } @@ -158,40 +179,102 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { classpath = classpath + "," + kylinHBaseDependency; } + jobConf.set(MAP_REDUCE_CLASSPATH, classpath); + logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + + /* + * set extra dependencies as tmpjars & tmpfiles if configured + */ + StringBuilder kylinDependency = new StringBuilder(); + + // for hive dependencies if (kylinHiveDependency != null) { // yarn classpath is comma separated kylinHiveDependency = kylinHiveDependency.replace(":", ","); - classpath = classpath + "," + kylinHiveDependency; + + logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency); + String filteredHive = filterKylinHiveDependency(kylinHiveDependency); + logger.info("Hive Dependencies After Filtered: " + filteredHive); + + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(filteredHive); } - jobConf.set(MAP_REDUCE_CLASSPATH, classpath); - logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + // for KylinJobMRLibDir + String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir(); + if (!StringUtils.isBlank(mrLibDir)) { + File dirFileMRLIB = new File(mrLibDir); + if (dirFileMRLIB.exists()) { + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(mrLibDir); + } else { + logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!"); + } + } - // set extra dependencies as tmpjars & tmpfiles if configured - setJobTmpJarsAndFiles(job); + setJobTmpJarsAndFiles(job, kylinDependency.toString()); } - private void setJobTmpJarsAndFiles(Job job) { - String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir(); - if (StringUtils.isBlank(mrLibDir)) + private void setJobTmpJarsAndFiles(Job job, String kylinDependency) { + if (StringUtils.isBlank(kylinDependency)) return; + String[] fNameList = kylinDependency.split(","); + try { Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.get(new Configuration(jobConf)); - FileStatus[] fList = fs.listStatus(new Path(mrLibDir)); - + FileSystem fs = FileSystem.getLocal(jobConf); + StringBuilder jarList = new StringBuilder(); StringBuilder fileList = new StringBuilder(); - + + for (String fileName : fNameList) { + Path p = new Path(fileName); + if (fs.getFileStatus(p).isDirectory()) { + appendTmpDir(job, fileName); + continue; + } + + StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; + if (list.length() > 0) + list.append(","); + list.append(fs.getFileStatus(p).getPath().toString()); + } + + appendTmpFiles(fileList.toString(), jobConf); + appendTmpJars(jarList.toString(), jobConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void appendTmpDir(Job job, String tmpDir) { + if (StringUtils.isBlank(tmpDir)) + return; + + try { + Configuration jobConf = job.getConfiguration(); + FileSystem fs = FileSystem.getLocal(jobConf); + FileStatus[] fList = fs.listStatus(new Path(tmpDir)); + + StringBuilder jarList = new StringBuilder(); + StringBuilder fileList = new StringBuilder(); + for (FileStatus file : fList) { Path p = file.getPath(); + if (fs.getFileStatus(p).isDirectory()) { + appendTmpDir(job, p.toString()); + continue; + } + StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; if (list.length() > 0) list.append(","); - list.append(mrLibDir + "/" + file.getPath().getName()); + list.append(fs.getFileStatus(p).getPath().toString()); } - + appendTmpFiles(fileList.toString(), jobConf); appendTmpJars(jarList.toString(), jobConf); @@ -203,7 +286,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { private void appendTmpJars(String jarList, Configuration conf) { if (StringUtils.isBlank(jarList)) return; - + String tmpJars = conf.get("tmpjars", null); if (tmpJars == null) { tmpJars = jarList; @@ -217,7 +300,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { private void appendTmpFiles(String fileList, Configuration conf) { if (StringUtils.isBlank(fileList)) return; - + String tmpFiles = conf.get("tmpfiles", null); if (tmpFiles == null) { tmpFiles = fileList; @@ -338,10 +421,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { else hdfsMetaDir = "file:///" + hdfsMetaDir; logger.info("HDFS meta dir is: " + hdfsMetaDir); - + appendTmpFiles(hdfsMetaDir, conf); } - + private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList dumpList) throws IOException { ResourceStore from = ResourceStore.getStore(kylinConfig); KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); -- 1.7.1