From 49583660d0e66bedd1ec11efe97c16d6a578cdf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=AE=87?= Date: Tue, 8 Sep 2015 11:41:40 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4=E4=BD=9C=E4=B8=9A=E6=97=B6=E8=87=AA=E5=8A=A8=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0hadoop=E4=BD=9C=E4=B8=9A=E9=9C=80=E8=A6=81=E7=9A=84jar?= =?UTF-8?q?=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 冯宇 --- .../java/org/apache/kylin/common/KylinConfig.java | 11 +++ conf/kylin.properties | 5 +- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 92 +++++++++++++++++++++- 3 files changed, 103 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 764986b..d9c12a3 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -76,6 +76,8 @@ public class KylinConfig { public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args"; public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = "kylin.job.status.with.kerberos"; + + public static final String KYLIN_UPLOAD_DEPENDENCIES_JARS = "kylin.job.upload.jars.enabled"; /** * Toggle to indicate whether to use hive for table flattening. Default * true. @@ -279,6 +281,11 @@ public class KylinConfig { } return root + getMetadataUrlPrefix() + "/"; } + + private static String HADOOP_JOB_DEPENDENCIES_JARS_DIR = "kylin-job-dependencies"; + public String getHadoopDependencyJarsLocation() { + return getHdfsWorkingDirectory() + HADOOP_JOB_DEPENDENCIES_JARS_DIR; + } public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); @@ -378,6 +385,10 @@ public class KylinConfig { public boolean getKylinUseKerberosAuth() { return Boolean.valueOf(getOptional(KYLIN_GET_JOB_STATUS_WITH_KERBEROS, "false")); } + + public boolean isUploadJarsEnabled() { + return Boolean.valueOf(getOptional(KYLIN_UPLOAD_DEPENDENCIES_JARS, "false")); + } public String getOverrideHiveTableLocation(String table) { return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase()); diff --git a/conf/kylin.properties b/conf/kylin.properties index d42c277..3550597 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -117,4 +117,7 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 query.log.parse.result.table = kylin_query_log #if you should getting job status from RM with kerberos, set it true.. -kylin.job.status.with.kerberos=false \ No newline at end of file +kylin.job.status.with.kerberos=false + +#if you deploy hadoop cluster without jars which kylin dependency, set it true +kylin.job.upload.jars.enabled=false \ No newline at end of file diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index 4c60c52..e814155 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -138,15 +138,74 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { job.setJarByClass(this.getClass()); } + Configuration jobConf = job.getConfiguration(); + String classpath = getHadoopClasspath(jobConf); + if(!KylinConfig.getInstanceFromEnv().isUploadJarsEnabled()) { + jobConf.set(MAP_REDUCE_CLASSPATH, classpath); + logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + return ; + } + String hdfsLibDir = KylinConfig.getInstanceFromEnv().getHadoopDependencyJarsLocation(); + String normalClasspath = null; + String jarClasspath = null; + try { + FileSystem fs = FileSystem.get(new Configuration(jobConf)); + Path path = new Path(hdfsLibDir); + if(!fs.exists(path)) { + //upload all files inly if the directory is empty, if it is not contain all jars may get error too... + if(!fs.mkdirs(path)) { + logger.warn("Create directory for uploading hadoop dependency jars failed , location " + hdfsLibDir); + return ; + } + long start = System.currentTimeMillis(); + uploadJobClasspath(classpath, hdfsLibDir); + logger.info("Upload all dependency files to HDFS " + hdfsLibDir + ", cost " + (System.currentTimeMillis() - start)); + } + + StringBuffer jarClasspathBuffer = new StringBuffer(); + StringBuffer normalClasspathBuffer = new StringBuffer(); + FileStatus[] fList = fs.listStatus(path); + for(FileStatus file : fList) { + Path p = file.getPath(); + if(p.getName().endsWith(".jar")) { + jarClasspathBuffer.append(hdfsLibDir + "/" + file.getPath().getName()).append(","); + } else { + normalClasspathBuffer.append(hdfsLibDir + "/" + file.getPath().getName()).append(","); + } + } + jarClasspath = jarClasspathBuffer.length() > 1 ? jarClasspathBuffer.substring(0, jarClasspathBuffer.length() - 1) : null; + normalClasspath = normalClasspathBuffer.length() > 1 ? normalClasspathBuffer.substring(0, normalClasspathBuffer.length() - 1) : null; + } catch (IOException e) { + logger.error("Upload all kylin job dependency file to HDFS failed !", e); + return ; + } + + if(jarClasspath != null) { + jobConf.set("tmpjars", jarClasspath); + } + if(normalClasspath != null) { + String preClasspath = jobConf.get("tmpfiles", null); + if(preClasspath != null) + normalClasspath = preClasspath + "," + normalClasspath; + jobConf.set("tmpfiles", normalClasspath); + } + logger.info("Hadoop job jar classpath is: " + job.getConfiguration().get("tmpjars")); + logger.info("Hadoop job file classpath is " + job.getConfiguration().get("tmpfiles")); + } + + private String getHadoopClasspath(Configuration jobConf) { String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); - Configuration jobConf = job.getConfiguration(); String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); if (classpath == null || classpath.length() == 0) { logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value."); classpath = getDefaultMapRedClasspath(); + //remove last new line character + if(classpath.endsWith("\n")) + classpath = classpath.substring(0, classpath.length() - 1); + classpath = classpath.replace(":", ","); logger.info("The default mapred classpath is: " + classpath); } @@ -161,9 +220,31 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { kylinHiveDependency = kylinHiveDependency.replace(":", ","); classpath = classpath + "," + kylinHiveDependency; } + return classpath; + } + + private boolean uploadJobClasspath(String classpath, String target) { + if(classpath == null || classpath.isEmpty()) + return true; + + classpath = classpath.replace(",", " "); + return uploadHadoopClasspath(classpath, target); + } + + private boolean uploadHadoopClasspath(String classpath, String target) { + String command = "hadoop fs -put " + classpath + " " + target; + try { + CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor(); + ShellCmdOutput output = new ShellCmdOutput(); + executor.execute(command, output); - jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency); - logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); + logger.debug("Output : " + output.getOutput()); + } catch (IOException e) { + logger.error("Failed to run: " + command , e); + return false; + } + + return true; } private String getDefaultMapRedClasspath() { @@ -238,8 +319,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { // hadoop distributed cache String hdfsMetaDir = "file://" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()); logger.info("HDFS meta dir is: " + hdfsMetaDir); + String preClasspath = conf.get("tmpfiles", null); + if(preClasspath != null) { + hdfsMetaDir = preClasspath + "," + hdfsMetaDir; + } conf.set("tmpfiles", hdfsMetaDir); - } protected void cleanupTempConfFile(Configuration conf) { -- 1.9.4.msysgit.2