diff --git hcatalog/src/test/e2e/templeton/README.txt hcatalog/src/test/e2e/templeton/README.txt index 3011e5c..00ea693 100644 --- hcatalog/src/test/e2e/templeton/README.txt +++ hcatalog/src/test/e2e/templeton/README.txt @@ -141,8 +141,10 @@ In order for this test suite to work, webhcat-site.xml should have webhcat.proxy and webhcat.proxyuser.hue.hosts defined, i.e. 'hue' should be allowed to impersonate 'joe'. [Of course, 'hcat' proxyuser should be configured in core-site.xml for the command to succeed.] -Furthermore, metastore side file based security should be enabled. To do this 3 properties in -hive-site.xml should be configured: +Furthermore, metastore side file based security should be enabled. +(See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-MetastoreServerSecurity for more info) + +To do this 3 properties in hive-site.xml should be configured: 1) hive.security.metastore.authorization.manager set to org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider 2) hive.security.metastore.authenticator.manager set to diff --git hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm index dcd6465..66a0717 100644 --- hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm +++ hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm @@ -788,7 +788,8 @@ sub compare if ( (defined $testCmd->{'check_job_created'}) || (defined $testCmd->{'check_job_complete'}) - || (defined $testCmd->{'check_job_exit_value'}) ) { + || (defined $testCmd->{'check_job_exit_value'}) + || (defined $testCmd->{'check_job_percent_complete'}) ) { my $jobid = $json_hash->{'id'}; if (!defined $jobid) { print $log "$0::$subName WARN check failed: " @@ -803,7 +804,8 @@ sub compare . "jobresult not defined "; $result = 0; } - if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) { + if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'}) + || defined($testCmd->{'check_job_percent_complete'})) { my $jobComplete; my $NUM_RETRIES = 60; my $SLEEP_BETWEEN_RETRIES = 5; @@ -841,6 +843,15 @@ sub compare $result = 0; } } + # check the percentComplete value + if (defined($testCmd->{'check_job_percent_complete'})) { + my $pcValue = $res_hash->{'percentComplete'}; + my $expectedPercentComplete = $testCmd->{'check_job_percent_complete'}; + if ( (!defined $pcValue) || $pcValue ne $expectedPercentComplete ) { + print $log "check_job_percent_complete failed. got percentComplete $pcValue, expected $expectedPercentComplete"; + $result = 0; + } + } } #Check userargs diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf index 240b01e..0baf63c 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf @@ -73,6 +73,7 @@ $cfg = 'status_code' => 200, 'check_job_created' => 1, 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => 'map 100% reduce 100%', 'check_job_exit_value' => 0, 'check_call_back' => 1, }, @@ -166,6 +167,7 @@ $cfg = 'status_code' => 200, 'check_job_created' => 1, 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => '100% complete', 'check_job_exit_value' => 0, 'check_call_back' => 1, }, @@ -386,6 +388,7 @@ $cfg = 'status_code' => 200, 'check_job_created' => 1, 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => 'map 100% reduce 100%', 'check_job_exit_value' => 0, 'check_call_back' => 1, diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf index 5e17221..b23fd4b 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf @@ -54,7 +54,9 @@ $cfg = }, { #-ve test - no input file - 'num' => 2, + #TempletonController job status should be success, but exit value should be 1 + #if yarn log is redirected to stderr check_job_complete is FAILURE, if not SUCCESS (HIVE-5511) + 'num' => 2, 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', diff --git hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh index c8899b6..6b0b578 100644 --- hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh +++ hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh @@ -75,7 +75,7 @@ elif [ -e "${WEBHCAT_PREFIX}/conf/webhcat-env.sh" -o -e "${WEBHCAT_PREFIX}/etc/w else DEFAULT_CONF_DIR="/etc/webhcat" fi -WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}" +export WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}" #users can add various env vars to webhcat-env.sh in the conf #rather than having to export them before running the command diff --git hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties new file mode 100644 index 0000000..f6b740f --- /dev/null +++ hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# +# This log4j config overrides hadoop-yarn-server-nodemanager-2.1.0-beta.jar/container-log4j.properties. +#In Hadoop 2, (by default) the log information about M/R job progress is not sent to stderr, +#which is where LaunchMapper expects it. Thus WebHCat is unable to report the +#percentComplete attribute in job status. There is something broken in YARN that doesn't allow +#its log4j properties to be overridden. Thus for now (10/07/2013) we resort to overriding it +#using this file, where log4j.rootLogger specify additional 'console' appender. This file is made +#available through DistributedCache. See TrivialExecService and TempletonControllerJob for more +#info. + +hadoop.root.logger=INFO,CLA + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hadoop.root.logger}, console, EventCounter + +# Logging Threshold +log4j.threshold=ALL + +# +# ContainerLog Appender +# + +#Default values +yarn.app.container.log.dir=null +yarn.app.container.log.filesize=100 + +log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender +log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize} + +log4j.appender.CLA.layout=org.apache.log4j.PatternLayout +log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter + + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 4783ca9..01dff5b 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -71,6 +71,7 @@ }; public static final String TEMPLETON_HOME_VAR = "TEMPLETON_HOME"; + public static final String WEBHCAT_CONF_DIR = "WEBHCAT_CONF_DIR"; public static final String[] TEMPLETON_CONF_FILENAMES = { "webhcat-default.xml", @@ -153,6 +154,9 @@ public String getHadoopConfDir() { public static String getTempletonDir() { return System.getenv(TEMPLETON_HOME_VAR); } + public static String getWebhcatConfDir() { + return System.getenv(WEBHCAT_CONF_DIR); + } private boolean loadOneFileConfig(String dir, String fname) { if (dir != null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java index 5f35176..1b9663d 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java @@ -68,7 +68,7 @@ public CompleteBean run(String id, String jobStatus) try { state = new JobState(id, Main.getAppConfigInstance()); if (state.getCompleteStatus() == null) - failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null); + failed("Job not yet complete. jobId=" + id + " Status from JobTracker=" + jobStatus, null); Long notified = state.getNotifiedTime(); if (notified != null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java index dfb66ca..fb46b58 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java @@ -91,6 +91,12 @@ public String getField(Type type, String id, String key) { BufferedReader in = null; Path p = new Path(getPath(type) + "/" + id + "/" + key); try { + if(!fs.exists(p)) { + //check first, otherwise webhcat.log is full of stack traces from FileSystem when + //clients check for status ('exitValue', 'completed', etc.) + LOG.debug(p + " does not exist."); + return null; + } in = new BufferedReader(new InputStreamReader(fs.open(p))); String line = null; String val = ""; @@ -102,9 +108,7 @@ public String getField(Type type, String id, String key) { } return val; } catch (Exception e) { - //don't print stack trace since clients poll for 'exitValue', 'completed', - //files which are not there until job completes - LOG.info("Couldn't find " + p + ": " + e.getMessage()); + LOG.error("Couldn't find " + p + ": " + e.getMessage(), e); } finally { close(in); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java index 9d3e05c..f46edb3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java @@ -32,6 +32,6 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java index 2ef404f..2d68918 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java @@ -32,7 +32,7 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java new file mode 100644 index 0000000..1daea39 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +public interface JobSubmissionConstants { + public static final String COPY_NAME = "templeton.copy"; + public static final String STATUSDIR_NAME = "templeton.statusdir"; + public static final String ENABLE_LOG = "templeton.enablelog"; + public static final String JOB_TYPE = "templeton.jobtype"; + public static final String JAR_ARGS_NAME = "templeton.args"; + public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; + public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j"; + //name of file + static final String CONTAINER_LOG4J_PROPS = "override-container-log4j.properties"; + public static final String STDOUT_FNAME = "stdout"; + public static final String STDERR_FNAME = "stderr"; + public static final String EXIT_FNAME = "exit"; + public static final int WATCHER_TIMEOUT_SECS = 10; + public static final int KEEP_ALIVE_MSEC = 60 * 1000; + public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__"; +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java new file mode 100644 index 0000000..126e875 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Shell; +import org.apache.hive.hcatalog.templeton.BadParam; +import org.apache.hive.hcatalog.templeton.LauncherDelegator; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Note that this class is used in a different JVM than WebHCat server. Thus it should not call + * any classes not available on every node in the cluster (outside webhcat jar). + * TempletonControllerJob#run() calls Job.setJarByClass(LaunchMapper.class) which + * causes webhcat jar to be shipped to target node, but not it's transitive closure. + * Long term we need to clean up this separation and create a separate jar to ship so that the + * dependencies are clear. (This used to be an inner class of TempletonControllerJob) + */ +@InterfaceAudience.Private +public class LaunchMapper extends Mapper implements + JobSubmissionConstants { + /** + * This class currently sends everything to stderr, but it should probably use Log4J - + * it will end up in 'syslog' of this Map task. For example, look for KeepAlive heartbeat msgs. + */ + private static final Log LOG = LogFactory.getLog(LaunchMapper.class); + + + protected Process startJob(Context context, String user, String overrideClasspath) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + copyLocal(COPY_NAME, conf); + String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); + + ArrayList removeEnv = new ArrayList(); + //todo: we really need some comments to explain exactly why each of these is removed + removeEnv.add("HADOOP_ROOT_LOGGER"); + removeEnv.add("hadoop-command"); + removeEnv.add("CLASS"); + removeEnv.add("mapredcommand"); + Map env = TempletonUtils.hadoopUserEnv(user, + overrideClasspath); + List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); + String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + + + if (tokenFile != null) { + //Token is available, so replace the placeholder + tokenFile = tokenFile.replaceAll("\"", ""); + String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; + if (Shell.WINDOWS) { + try { + tokenArg = TempletonUtils.quoteForWindows(tokenArg); + } catch (BadParam e) { + String msg = "cannot pass " + tokenFile + " to mapreduce.job.credentials.binary"; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + for(int i=0; i it = jarArgsList.iterator(); + while(it.hasNext()){ + String arg = it.next(); + if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){ + it.remove(); + } + } + } + boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ? + false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS)); + return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps); + } + + private void copyLocal(String var, Configuration conf) throws IOException { + String[] filenames = TempletonUtils.decodeArray(conf.get(var)); + if (filenames != null) { + for (String filename : filenames) { + Path src = new Path(filename); + Path dst = new Path(src.getName()); + FileSystem fs = src.getFileSystem(conf); + LOG.info("templeton: copy " + src + " => " + dst); + fs.copyToLocalFile(src, dst); + } + } + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + + Configuration conf = context.getConfiguration(); + + Process proc = startJob(context, + conf.get("user.name"), + conf.get(OVERRIDE_CLASSPATH)); + + String statusdir = conf.get(STATUSDIR_NAME); + + if (statusdir != null) { + try { + statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, + conf.get("user.name")); + } catch (URISyntaxException e) { + String msg = "Invalid status dir URI"; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + + Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); + + ExecutorService pool = Executors.newCachedThreadPool(); + executeWatcher(pool, conf, context.getJobID(), + proc.getInputStream(), statusdir, STDOUT_FNAME); + executeWatcher(pool, conf, context.getJobID(), + proc.getErrorStream(), statusdir, STDERR_FNAME); + KeepAlive keepAlive = startCounterKeepAlive(pool, context); + + proc.waitFor(); + keepAlive.sendReport = false; + pool.shutdown(); + if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + + writeExitValue(conf, proc.exitValue(), statusdir); + JobState state = new JobState(context.getJobID().toString(), conf); + state.setExitValue(proc.exitValue()); + state.setCompleteStatus("done"); + state.close(); + + if (enablelog && TempletonUtils.isset(statusdir)) { + LOG.info("templeton: collecting logs for " + context.getJobID().toString() + + " to " + statusdir + "/logs"); + LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); + logRetriever.run(); + } + + if (proc.exitValue() != 0) { + LOG.info("templeton: job failed with exit code " + + proc.exitValue()); + } else { + LOG.info("templeton: job completed with exit code 0"); + } + } + + private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in, + String statusdir, String name) throws IOException { + Watcher w = new Watcher(conf, jobid, in, statusdir, name); + pool.execute(w); + } + + private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException { + KeepAlive k = new KeepAlive(context); + pool.execute(k); + return k; + } + + private void writeExitValue(Configuration conf, int exitValue, String statusdir) + throws IOException { + if (TempletonUtils.isset(statusdir)) { + Path p = new Path(statusdir, EXIT_FNAME); + FileSystem fs = p.getFileSystem(conf); + OutputStream out = fs.create(p); + LOG.info("templeton: Writing exit value " + exitValue + " to " + p); + PrintWriter writer = new PrintWriter(out); + writer.println(exitValue); + writer.close(); + } + } + + + private static class Watcher implements Runnable { + private final InputStream in; + private OutputStream out; + private final JobID jobid; + private final Configuration conf; + + public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name) + throws IOException { + this.conf = conf; + this.jobid = jobid; + this.in = in; + + if (name.equals(STDERR_FNAME)) { + out = System.err; + } else { + out = System.out; + } + + if (TempletonUtils.isset(statusdir)) { + Path p = new Path(statusdir, name); + FileSystem fs = p.getFileSystem(conf); + out = fs.create(p); + LOG.info("templeton: Writing status to " + p); + } + } + + @Override + public void run() { + try { + InputStreamReader isr = new InputStreamReader(in); + BufferedReader reader = new BufferedReader(isr); + PrintWriter writer = new PrintWriter(out); + + String line; + while ((line = reader.readLine()) != null) { + writer.println(line); + JobState state = null; + try { + String percent = TempletonUtils.extractPercentComplete(line); + String childid = TempletonUtils.extractChildJobId(line); + + if (percent != null || childid != null) { + state = new JobState(jobid.toString(), conf); + state.setPercentComplete(percent); + state.setChildId(childid); + } + } catch (IOException e) { + LOG.error("templeton: state error: ", e); + } finally { + if (state != null) { + try { + state.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + } + writer.flush(); + if(out != System.err && out != System.out) { + //depending on FileSystem implementation flush() may or may not do anything + writer.close(); + } + } catch (IOException e) { + LOG.error("templeton: execute error: ", e); + } + } + } + + private static class KeepAlive implements Runnable { + private final Context context; + private volatile boolean sendReport; + + public KeepAlive(Context context) + { + this.sendReport = true; + this.context = context; + } + private static StringBuilder makeDots(int count) { + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < count; i++) { + sb.append('.'); + } + return sb; + } + + @Override + public void run() { + try { + int count = 0; + while (sendReport) { + // Periodically report progress on the Context object + // to prevent TaskTracker from killing the Templeton + // Controller task + context.progress(); + count++; + String msg = "KeepAlive Heart beat" + makeDots(count); + LOG.info(msg); + Thread.sleep(KEEP_ALIVE_MSEC); + } + } catch (InterruptedException e) { + // Ok to be interrupted + } + } + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java index 007ac2f..048440c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java @@ -32,6 +32,6 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 110ddbb..e162f11 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -18,23 +18,10 @@ */ package org.apache.hive.hcatalog.templeton.tool; -import java.io.BufferedReader; +import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.net.URISyntaxException; +import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,24 +29,24 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; -import org.apache.hive.hcatalog.templeton.BadParam; -import org.apache.hive.hcatalog.templeton.LauncherDelegator; +import org.apache.hive.hcatalog.templeton.AppConfig; +import org.apache.hive.hcatalog.templeton.Main; import org.apache.hive.hcatalog.templeton.SecureProxySupport; import org.apache.hive.hcatalog.templeton.UgiFactory; import org.apache.thrift.TException; @@ -83,281 +70,99 @@ * parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job * is complete. */ -public class TempletonControllerJob extends Configured implements Tool { - public static final String COPY_NAME = "templeton.copy"; - public static final String STATUSDIR_NAME = "templeton.statusdir"; - public static final String ENABLE_LOG = "templeton.enablelog"; - public static final String JOB_TYPE = "templeton.jobtype"; - public static final String JAR_ARGS_NAME = "templeton.args"; - public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; - - public static final String STDOUT_FNAME = "stdout"; - public static final String STDERR_FNAME = "stderr"; - public static final String EXIT_FNAME = "exit"; - - public static final int WATCHER_TIMEOUT_SECS = 10; - public static final int KEEP_ALIVE_MSEC = 60 * 1000; - - public static final String TOKEN_FILE_ARG_PLACEHOLDER - = "__WEBHCAT_TOKEN_FILE_LOCATION__"; - - private static TrivialExecService execService = TrivialExecService.getInstance(); - +@InterfaceAudience.Private +public class TempletonControllerJob extends Configured implements Tool, JobSubmissionConstants { private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); - private final boolean secureMetastoreAccess; + //file to add to DistributedCache + private static URI overrideLog4jURI = null; + private static boolean overrideContainerLog4jProps; + //Jar cmd submission likely will be affected, Pig likely not + private static final String affectedMsg = "Monitoring of Hadoop jobs submitted through WebHCat " + + "may be affected."; + private static final String TMP_DIR_PROP = "hadoop.tmp.dir"; /** - * @param secureMetastoreAccess - if true, a delegation token will be created - * and added to the job + * Copy the file from local file system to tmp dir */ - public TempletonControllerJob(boolean secureMetastoreAccess) { - super(); - this.secureMetastoreAccess = secureMetastoreAccess; - } - public static class LaunchMapper - extends Mapper { - protected Process startJob(Context context, String user, - String overrideClasspath) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - copyLocal(COPY_NAME, conf); - String[] jarArgs - = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); - - ArrayList removeEnv = new ArrayList(); - removeEnv.add("HADOOP_ROOT_LOGGER"); - removeEnv.add("hadoop-command"); - removeEnv.add("CLASS"); - removeEnv.add("mapredcommand"); - Map env = TempletonUtils.hadoopUserEnv(user, - overrideClasspath); - List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); - String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); - - - if (tokenFile != null) { - //Token is available, so replace the placeholder - tokenFile = tokenFile.replaceAll("\"", ""); - String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; - if (Shell.WINDOWS) { - try { - tokenArg = TempletonUtils.quoteForWindows(tokenArg); - } catch (BadParam e) { - throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e); - } - } - for(int i=0; i it = jarArgsList.iterator(); - while(it.hasNext()){ - String arg = it.next(); - if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){ - it.remove(); - } - } - } - return execService.run(jarArgsList, removeEnv, env); - } - - private void copyLocal(String var, Configuration conf) - throws IOException { - String[] filenames = TempletonUtils.decodeArray(conf.get(var)); - if (filenames != null) { - for (String filename : filenames) { - Path src = new Path(filename); - Path dst = new Path(src.getName()); - FileSystem fs = src.getFileSystem(conf); - System.err.println("templeton: copy " + src + " => " + dst); - fs.copyToLocalFile(src, dst); + private static URI copyLog4JtoFileSystem(final String localFile) throws IOException, + InterruptedException { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public URI run() throws IOException { + AppConfig appConfig = Main.getAppConfigInstance(); + String fsTmpDir = appConfig.get(TMP_DIR_PROP); + if(fsTmpDir == null || fsTmpDir.length() <= 0) { + LOG.warn("Could not find 'hadoop.tmp.dir'; " + affectedMsg); + return null; } - } - } - - @Override - public void run(Context context) - throws IOException, InterruptedException { - - Configuration conf = context.getConfiguration(); - - Process proc = startJob(context, - conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); - - String statusdir = conf.get(STATUSDIR_NAME); - - if (statusdir != null) { - try { - statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, - conf.get("user.name")); - } catch (URISyntaxException e) { - throw new IOException("Invalid status dir URI", e); + FileSystem fs = FileSystem.get(appConfig); + Path dirPath = new Path(fsTmpDir); + if(!fs.exists(dirPath)) { + LOG.warn(dirPath + " does not exist; " + affectedMsg); + return null; } + Path dst = fs.makeQualified(new Path(fsTmpDir, CONTAINER_LOG4J_PROPS)); + fs.copyFromLocalFile(new Path(localFile), dst); + //make readable by all users since TempletonControllerJob#run() is run as submitting user + fs.setPermission(dst, new FsPermission((short)0644)); + return dst.toUri(); } - - Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); - LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); - - ExecutorService pool = Executors.newCachedThreadPool(); - executeWatcher(pool, conf, context.getJobID(), - proc.getInputStream(), statusdir, STDOUT_FNAME); - executeWatcher(pool, conf, context.getJobID(), - proc.getErrorStream(), statusdir, STDERR_FNAME); - KeepAlive keepAlive = startCounterKeepAlive(pool, context); - - proc.waitFor(); - keepAlive.sendReport = false; - pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { - pool.shutdownNow(); - } - - writeExitValue(conf, proc.exitValue(), statusdir); - JobState state = new JobState(context.getJobID().toString(), conf); - state.setExitValue(proc.exitValue()); - state.setCompleteStatus("done"); - state.close(); - - if (enablelog && TempletonUtils.isset(statusdir)) { - System.err.println("templeton: collecting logs for " + context.getJobID().toString() - + " to " + statusdir + "/logs"); - LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); - logRetriever.run(); - } - - if (proc.exitValue() != 0) { - System.err.println("templeton: job failed with exit code " - + proc.exitValue()); - } - else { - System.err.println("templeton: job completed with exit code 0"); - } - } - - private void executeWatcher(ExecutorService pool, Configuration conf, - JobID jobid, InputStream in, String statusdir, - String name) - throws IOException { - Watcher w = new Watcher(conf, jobid, in, statusdir, name); - pool.execute(w); - } - - private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) - throws IOException { - KeepAlive k = new KeepAlive(context); - pool.execute(k); - return k; - } - - private void writeExitValue(Configuration conf, int exitValue, String statusdir) - throws IOException { - if (TempletonUtils.isset(statusdir)) { - Path p = new Path(statusdir, EXIT_FNAME); - FileSystem fs = p.getFileSystem(conf); - OutputStream out = fs.create(p); - System.err.println("templeton: Writing exit value " - + exitValue + " to " + p); - PrintWriter writer = new PrintWriter(out); - writer.println(exitValue); - writer.close(); - } - } + }); } - - private static class Watcher implements Runnable { - private final InputStream in; - private OutputStream out; - private final JobID jobid; - private final Configuration conf; - - public Watcher(Configuration conf, JobID jobid, InputStream in, - String statusdir, String name) - throws IOException { - this.conf = conf; - this.jobid = jobid; - this.in = in; - - if (name.equals(STDERR_FNAME)) - out = System.err; - else - out = System.out; - - if (TempletonUtils.isset(statusdir)) { - Path p = new Path(statusdir, name); - FileSystem fs = p.getFileSystem(conf); - out = fs.create(p); - System.err.println("templeton: Writing status to " + p); - } - } - - @Override - public void run() { - try { - InputStreamReader isr = new InputStreamReader(in); - BufferedReader reader = new BufferedReader(isr); - PrintWriter writer = new PrintWriter(out); - - String line; - while ((line = reader.readLine()) != null) { - writer.println(line); - JobState state = null; + /** + * local file system + * @return + */ + private static String getLog4JPropsLocal() { + return AppConfig.getWebhcatConfDir() + File.separator + CONTAINER_LOG4J_PROPS; + } + static { + //initialize once-per-JVM (i.e. one running WebHCat server) state and log it once since it's + // the same for every job + try { + //safe (thread) publication + // http://docs.oracle.com/javase/specs/jls/se5.0/html/execution.html#12.4.2 + LOG.info("Using Hadoop Version: " + ShimLoader.getMajorVersion()); + overrideContainerLog4jProps = "0.23".equals(ShimLoader.getMajorVersion()); + if(overrideContainerLog4jProps) { + //see detailed note in CONTAINER_LOG4J_PROPS file + LOG.info(AppConfig.WEBHCAT_CONF_DIR + "=" + AppConfig.getWebhcatConfDir()); + File localFile = new File(getLog4JPropsLocal()); + if(localFile.exists()) { + LOG.info("Found " + localFile.getAbsolutePath() + " to use for job submission."); try { - String percent = TempletonUtils.extractPercentComplete(line); - String childid = TempletonUtils.extractChildJobId(line); - - if (percent != null || childid != null) { - state = new JobState(jobid.toString(), conf); - state.setPercentComplete(percent); - state.setChildId(childid); - } - } catch (IOException e) { - System.err.println("templeton: state error: " + e); - } finally { - if (state != null) { - try { - state.close(); - } catch (IOException e) { - } - } + overrideLog4jURI = copyLog4JtoFileSystem(getLog4JPropsLocal()); + LOG.info("Job submission will use log4j.properties=" + overrideLog4jURI); } + catch(IOException ex) { + LOG.warn("Will not add " + CONTAINER_LOG4J_PROPS + " to Distributed Cache. " + + "Some fields in job status may be unavailable", ex); + } + } + else { + LOG.warn("Could not find " + localFile.getAbsolutePath() + ". " + affectedMsg); } - writer.flush(); - } catch (IOException e) { - System.err.println("templeton: execute error: " + e); } } + catch(Throwable t) { + //this intentionally doesn't use TempletonControllerJob.class.getName() to be able to + //log errors which may be due to class loading + String msg = "org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob is not " + + "properly initialized. " + affectedMsg; + LOG.error(msg, t); + } } - public static class KeepAlive implements Runnable { - private Context context; - public boolean sendReport; - - public KeepAlive(Context context) - { - this.sendReport = true; - this.context = context; - } + private final boolean secureMetastoreAccess; - @Override - public void run() { - try { - while (sendReport) { - // Periodically report progress on the Context object - // to prevent TaskTracker from killing the Templeton - // Controller task - context.progress(); - System.err.println("KeepAlive Heart beat"); - Thread.sleep(KEEP_ALIVE_MSEC); - } - } catch (InterruptedException e) { - // Ok to be interrupted - } - } + /** + * @param secureMetastoreAccess - if true, a delegation token will be created + * and added to the job + */ + public TempletonControllerJob(boolean secureMetastoreAccess) { + super(); + this.secureMetastoreAccess = secureMetastoreAccess; } private JobID submittedJobId; @@ -365,8 +170,7 @@ public void run() { public String getSubmittedId() { if (submittedJobId == null) { return null; - } - else { + } else { return submittedJobId.toString(); } } @@ -376,20 +180,39 @@ public String getSubmittedId() { * @see org.apache.hive.hcatalog.templeton.CompleteDelegator */ @Override - public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException, TException { + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, + TException { Configuration conf = getConf(); - + conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); String user = UserGroupInformation.getCurrentUser().getShortUserName(); conf.set("user.name", user); + if(overrideContainerLog4jProps && overrideLog4jURI != null) { + //must be done before Job object is created + conf.set(OVERRIDE_CONTAINER_LOG4J_PROPS, Boolean.TRUE.toString()); + } Job job = new Job(conf); - job.setJarByClass(TempletonControllerJob.class); - job.setJobName("TempletonControllerJob"); + job.setJarByClass(LaunchMapper.class); + job.setJobName(TempletonControllerJob.class.getSimpleName()); job.setMapperClass(LaunchMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SingleInputFormat.class); + if(overrideContainerLog4jProps && overrideLog4jURI != null) { + FileSystem fs = FileSystem.get(conf); + if(fs.exists(new Path(overrideLog4jURI))) { + ShimLoader.getHadoopShims().getWebHCatShim(conf, UgiFactory.getUgi(user)).addCacheFile( + overrideLog4jURI, job); + LOG.debug("added " + overrideLog4jURI + " to Dist Cache"); + } + else { + //in case this file was deleted by someone issue a warning but don't try to add to + // DistributedCache as that will throw and fail job submission + LOG.warn("Cannot find " + overrideLog4jURI + " which is created on WebHCat startup/job " + + "submission. " + affectedMsg); + } + } + NullOutputFormat of = new NullOutputFormat(); job.setOutputFormatClass(of.getClass()); job.setNumReduceTasks(0); @@ -404,13 +227,16 @@ public int run(String[] args) job.submit(); submittedJobId = job.getJobID(); - if(metastoreTokenStrForm != null) { //so that it can be cancelled later from CompleteDelegator DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( submittedJobId.toString(), metastoreTokenStrForm); - LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " + - "user=" + user); + LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + + " user=" + user); + } + if(overrideContainerLog4jProps && overrideLog4jURI == null) { + //do this here so that log msg has JobID + LOG.warn("Could not override container log4j properties for " + submittedJobId); } return 0; } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java index 33be932..a2ee1a0 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java @@ -85,10 +85,11 @@ public static boolean isset(char ch) { return (col != null) && (!col.isEmpty()); } - - public static final Pattern JAR_COMPLETE - = Pattern.compile(" map \\d+%\\s+reduce \\d+%$"); + //looking for map 100% reduce 100% + public static final Pattern JAR_COMPLETE = Pattern.compile(" map \\d+%\\s+reduce \\d+%$"); public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$"); + //looking for map = 100%, reduce = 100% + public static final Pattern HIVE_COMPLETE = Pattern.compile(" map = \\d+%,\\s+reduce = \\d+%$"); /** * Extract the percent complete line from Pig or Jar jobs. @@ -101,7 +102,19 @@ public static String extractPercentComplete(String line) { Matcher pig = PIG_COMPLETE.matcher(line); if (pig.find()) return pig.group().trim(); - + + Matcher hive = HIVE_COMPLETE.matcher(line); + if(hive.find()) { + StringBuilder sb = new StringBuilder(hive.group().trim()); + String[] toRemove = {"= ", ", "}; + for(String pattern : toRemove) { + int pos; + while((pos = sb.indexOf(pattern)) >= 0) { + sb.delete(pos, pos + pattern.length()); + } + } + return sb.toString();//normalized to look like JAR_COMPLETE + } return null; } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java index 2de3f27..73892cc 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java @@ -18,21 +18,30 @@ */ package org.apache.hive.hcatalog.templeton.tool; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * Execute a local program. This is a singleton service that will * execute a programs on the local box. + * + * Note that is is executed from LaunchMapper which is executed in + * different JVM from WebHCat (Templeton) server. Thus it should not call any classes + * not available on every node in the cluster (outside webhcat jar) */ -public class TrivialExecService { - private static volatile TrivialExecService theSingleton; +final class TrivialExecService { + //with default log4j config, this output ends up in 'syslog' of the LaunchMapper task private static final Log LOG = LogFactory.getLog(TrivialExecService.class); - + private static volatile TrivialExecService theSingleton; + private static final String HADOOP_CLIENT_OPTS = "HADOOP_CLIENT_OPTS"; /** * Retrieve the singleton. */ @@ -41,32 +50,53 @@ public static synchronized TrivialExecService getInstance() { theSingleton = new TrivialExecService(); return theSingleton; } - + /** + * See {@link JobSubmissionConstants#CONTAINER_LOG4J_PROPS} file for details. + */ + private static void hadoop2LogRedirect(ProcessBuilder processBuilder) { + Map env = processBuilder.environment(); + if(!env.containsKey(HADOOP_CLIENT_OPTS)) { + return; + } + String hcopts = env.get(HADOOP_CLIENT_OPTS); + if(!hcopts.contains("log4j.configuration=container-log4j.properties")) { + return; + } + //TempletonControllerJob ensures that this file is in DistributedCache + File log4jProps = new File(JobSubmissionConstants.CONTAINER_LOG4J_PROPS); + hcopts = hcopts.replace("log4j.configuration=container-log4j.properties", + "log4j.configuration=file://" + log4jProps.getAbsolutePath()); + //helps figure out what log4j is doing, but may confuse + //some jobs due to extra output to stdout + //hcopts = hcopts + " -Dlog4j.debug=true"; + env.put(HADOOP_CLIENT_OPTS, hcopts); + } public Process run(List cmd, List removeEnv, - Map environmentVariables) + Map environmentVariables, boolean overrideContainerLog4jProps) throws IOException { - logDebugCmd(cmd, environmentVariables); + LOG.info("run(cmd, removeEnv, environmentVariables, " + overrideContainerLog4jProps + ")"); + LOG.info("Starting cmd: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd); - for (String key : removeEnv) + for (String key : removeEnv) { + if(pb.environment().containsKey(key)) { + LOG.info("Removing env var: " + key + "=" + pb.environment().get(key)); + } pb.environment().remove(key); + } pb.environment().putAll(environmentVariables); + if(overrideContainerLog4jProps) { + hadoop2LogRedirect(pb); + } + logDebugInfo("Starting process with env:", pb.environment()); return pb.start(); } - - private void logDebugCmd(List cmd, - Map environmentVariables) { - if(!LOG.isDebugEnabled()){ - return; - } - LOG.debug("starting " + cmd); - LOG.debug("With environment variables: " ); - for(Map.Entry keyVal : environmentVariables.entrySet()){ - LOG.debug(keyVal.getKey() + "=" + keyVal.getValue()); - } - LOG.debug("With environment variables already set: " ); - Map env = System.getenv(); - for (String envName : env.keySet()) { - LOG.debug(envName + "=" + env.get(envName)); - } + private static void logDebugInfo(String msg, Map props) { + LOG.info(msg); + List keys = new ArrayList(); + keys.addAll(props.keySet()); + Collections.sort(keys); + for(String key : keys) { + LOG.info(key + "=" + props.get(key)); + } } } diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java index a873a96..b76e69a 100644 --- hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java @@ -38,7 +38,7 @@ public void test() { Process process = TrivialExecService.getInstance() .run(list, new ArrayList(), - new HashMap()); + new HashMap(),false); out = new BufferedReader(new InputStreamReader( process.getInputStream())); err = new BufferedReader(new InputStreamReader( diff --git shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index ce2e5b7..470dc76 100644 --- shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -18,82 +18,89 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; /** * This is in org.apache.hadoop.mapred package because it relies on * JobSubmissionProtocol which is package private */ public class WebHCatJTShim20S implements WebHCatJTShim { - private JobSubmissionProtocol cnx; + private JobSubmissionProtocol cnx; - /** - * Create a connection to the Job Tracker. - */ - public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi) - throws IOException { - cnx = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - getAddress(conf), - ugi, - conf, - NetUtils.getSocketFactory(conf, - JobSubmissionProtocol.class)); - } + /** + * Create a connection to the Job Tracker. + */ + public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi) + throws IOException { + cnx = (JobSubmissionProtocol) + RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, + getAddress(conf), + ugi, + conf, + NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); + } - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - return cnx.getJobProfile(jobid); - } + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobProfile(jobid); + } - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - return cnx.getJobStatus(jobid); - } + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobStatus(jobid); + } - /** - * Kill a job. - */ - public void killJob(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - cnx.killJob(jobid); - } + /** + * Kill a job. + */ + public void killJob(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + cnx.killJob(jobid); + } - /** - * Get all the jobs submitted. - */ - public org.apache.hadoop.mapred.JobStatus[] getAllJobs() - throws IOException { - return cnx.getAllJobs(); - } + /** + * Get all the jobs submitted. + */ + public org.apache.hadoop.mapred.JobStatus[] getAllJobs() + throws IOException { + return cnx.getAllJobs(); + } - /** - * Close the connection to the Job Tracker. - */ - public void close() { - RPC.stopProxy(cnx); - } - private InetSocketAddress getAddress(Configuration conf) { - String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); - return NetUtils.createSocketAddr(jobTrackerStr); - } + /** + * Close the connection to the Job Tracker. + */ + public void close() { + RPC.stopProxy(cnx); + } + private InetSocketAddress getAddress(Configuration conf) { + String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); + return NetUtils.createSocketAddr(jobTrackerStr); + } + @Override + public void addCacheFile(URI uri, Job job) { + DistributedCache.addCacheFile(uri, job.getConfiguration()); } +} diff --git shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index abb3911..d0a4bf7 100644 --- shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -18,10 +18,12 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import java.io.IOException; +import java.net.URI; public class WebHCatJTShim23 implements WebHCatJTShim { private JobClient jc; @@ -88,4 +90,8 @@ public void close() { } catch (IOException e) { } } + @Override + public void addCacheFile(URI uri, Job job) { + job.addCacheFile(uri); + } } diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index c7529dc..62ff878 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -561,6 +561,11 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte * Close the connection to the Job Tracker. */ public void close(); + /** + * Does exactly what org.apache.hadoop.mapreduce.Job#addCacheFile(URI) in Hadoop 2. + * Assumes that both parameters are not {@code null}. + */ + public void addCacheFile(URI uri, Job job); } /**