diff --git hcatalog/src/test/e2e/templeton/README.txt hcatalog/src/test/e2e/templeton/README.txt
index 3011e5c..00ea693 100644
--- hcatalog/src/test/e2e/templeton/README.txt
+++ hcatalog/src/test/e2e/templeton/README.txt
@@ -141,8 +141,10 @@ In order for this test suite to work, webhcat-site.xml should have webhcat.proxy
and webhcat.proxyuser.hue.hosts defined, i.e. 'hue' should be allowed to impersonate 'joe'.
[Of course, 'hcat' proxyuser should be configured in core-site.xml for the command to succeed.]
-Furthermore, metastore side file based security should be enabled. To do this 3 properties in
-hive-site.xml should be configured:
+Furthermore, metastore side file based security should be enabled.
+(See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-MetastoreServerSecurity for more info)
+
+To do this 3 properties in hive-site.xml should be configured:
1) hive.security.metastore.authorization.manager set to
org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider
2) hive.security.metastore.authenticator.manager set to
diff --git hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
index dcd6465..66a0717 100644
--- hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
+++ hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
@@ -788,7 +788,8 @@ sub compare
if ( (defined $testCmd->{'check_job_created'})
|| (defined $testCmd->{'check_job_complete'})
- || (defined $testCmd->{'check_job_exit_value'}) ) {
+ || (defined $testCmd->{'check_job_exit_value'})
+ || (defined $testCmd->{'check_job_percent_complete'}) ) {
my $jobid = $json_hash->{'id'};
if (!defined $jobid) {
print $log "$0::$subName WARN check failed: "
@@ -803,7 +804,8 @@ sub compare
. "jobresult not defined ";
$result = 0;
}
- if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
+ if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})
+ || defined($testCmd->{'check_job_percent_complete'})) {
my $jobComplete;
my $NUM_RETRIES = 60;
my $SLEEP_BETWEEN_RETRIES = 5;
@@ -841,6 +843,15 @@ sub compare
$result = 0;
}
}
+ # check the percentComplete value
+ if (defined($testCmd->{'check_job_percent_complete'})) {
+ my $pcValue = $res_hash->{'percentComplete'};
+ my $expectedPercentComplete = $testCmd->{'check_job_percent_complete'};
+ if ( (!defined $pcValue) || $pcValue ne $expectedPercentComplete ) {
+ print $log "check_job_percent_complete failed. got percentComplete $pcValue, expected $expectedPercentComplete";
+ $result = 0;
+ }
+ }
}
#Check userargs
diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
index 240b01e..01b2b68 100644
--- hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
+++ hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
@@ -73,6 +73,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_percent_complete' => 'map 100% reduce 100%',
'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -166,6 +167,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_percent_complete' => '100% complete',
'check_job_exit_value' => 0,
'check_call_back' => 1,
},
diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
index 5e17221..b23fd4b 100644
--- hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
+++ hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
@@ -54,7 +54,9 @@ $cfg =
},
{
#-ve test - no input file
- 'num' => 2,
+ #TempletonController job status should be success, but exit value should be 1
+ #if yarn log is redirected to stderr check_job_complete is FAILURE, if not SUCCESS (HIVE-5511)
+ 'num' => 2,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming',
'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts',
diff --git hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
index c8899b6..6b0b578 100644
--- hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
+++ hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
@@ -75,7 +75,7 @@ elif [ -e "${WEBHCAT_PREFIX}/conf/webhcat-env.sh" -o -e "${WEBHCAT_PREFIX}/etc/w
else
DEFAULT_CONF_DIR="/etc/webhcat"
fi
-WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
+export WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
#users can add various env vars to webhcat-env.sh in the conf
#rather than having to export them before running the command
diff --git hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
new file mode 100644
index 0000000..f6b740f
--- /dev/null
+++ hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+#
+# This log4j config overrides hadoop-yarn-server-nodemanager-2.1.0-beta.jar/container-log4j.properties.
+#In Hadoop 2, (by default) the log information about M/R job progress is not sent to stderr,
+#which is where LaunchMapper expects it. Thus WebHCat is unable to report the
+#percentComplete attribute in job status. There is something broken in YARN that doesn't allow
+#its log4j properties to be overridden. Thus for now (10/07/2013) we resort to overriding it
+#using this file, where log4j.rootLogger specify additional 'console' appender. This file is made
+#available through DistributedCache. See TrivialExecService and TempletonControllerJob for more
+#info.
+
+hadoop.root.logger=INFO,CLA
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, console, EventCounter
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# ContainerLog Appender
+#
+
+#Default values
+yarn.app.container.log.dir=null
+yarn.app.container.log.filesize=100
+
+log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender
+log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
+log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize}
+
+log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
+
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
diff --git hcatalog/webhcat/svr/src/main/config/webhcat-default.xml hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
index 1bef3c6..d369d5d 100644
--- hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
+++ hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
@@ -116,6 +116,25 @@
+ templeton.hive.home
+ hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin
+
+ The path to the Hive home within the tar. This is needed if Hive is not installed on all
+ nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig
+ job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set.
+
+
+
+ templeton.hcat.home
+ hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin/hcatalog
+
+ The path to the HCat home within the tar. This is needed if Hive is not installed on all
+ nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig
+ job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set.
+
+
+
+
templeton.hive.properties
hive.metastore.local=false,hive.metastore.uris=thrift://localhost:9933,hive.metastore.sasl.enabled=false
Properties to set when running hive.
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 4783ca9..10fc6df 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -71,6 +71,7 @@
};
public static final String TEMPLETON_HOME_VAR = "TEMPLETON_HOME";
+ public static final String WEBHCAT_CONF_DIR = "WEBHCAT_CONF_DIR";
public static final String[] TEMPLETON_CONF_FILENAMES = {
"webhcat-default.xml",
@@ -90,6 +91,14 @@
public static final String PYTHON_NAME = "templeton.python";
public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive";
public static final String HIVE_PATH_NAME = "templeton.hive.path";
+ /**
+ * see webhcat-default.xml
+ */
+ public static final String HIVE_HOME_PATH = "templeton.hive.home";
+ /**
+ * see webhcat-default.xml
+ */
+ public static final String HCAT_HOME_PATH = "templeton.hcat.home";
public static final String HIVE_PROPS_NAME = "templeton.hive.properties";
public static final String LIB_JARS_NAME = "templeton.libjars";
public static final String PIG_ARCHIVE_NAME = "templeton.pig.archive";
@@ -153,6 +162,9 @@ public String getHadoopConfDir() {
public static String getTempletonDir() {
return System.getenv(TEMPLETON_HOME_VAR);
}
+ public static String getWebhcatConfDir() {
+ return System.getenv(WEBHCAT_CONF_DIR);
+ }
private boolean loadOneFileConfig(String dir, String fname) {
if (dir != null) {
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
index 5f35176..1b9663d 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
@@ -68,7 +68,7 @@ public CompleteBean run(String id, String jobStatus)
try {
state = new JobState(id, Main.getAppConfigInstance());
if (state.getCompleteStatus() == null)
- failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null);
+ failed("Job not yet complete. jobId=" + id + " Status from JobTracker=" + jobStatus, null);
Long notified = state.getNotifiedTime();
if (notified != null) {
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index c616d6c..a9e13c6 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -128,7 +128,7 @@ public EnqueueBean run(String user, Map userArgs,
if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals(""))
{
- args.add("-archives");
+ args.add(ARCHIVES);
args.add(appConf.hiveArchive());
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index bea08bb..0d51bc7 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -43,14 +43,14 @@ public EnqueueBean run(String user, Map userArgs, String jar, St
String libjars, String files,
List jarArgs, List defines,
String statusdir, String callback,
- boolean usehcatalog, String completedUrl,
+ boolean usesHcatalog, String completedUrl,
boolean enablelog, JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
runAs = user;
List args = makeArgs(jar, mainClass,
libjars, files, jarArgs, defines,
- statusdir, usehcatalog, completedUrl, enablelog, jobType);
+ statusdir, usesHcatalog, completedUrl, enablelog, jobType);
return enqueueController(user, userArgs, callback, args);
}
@@ -58,7 +58,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St
private List makeArgs(String jar, String mainClass,
String libjars, String files,
List jarArgs, List defines,
- String statusdir, boolean usehcatalog, String completedUrl,
+ String statusdir, boolean usesHcatalog, String completedUrl,
boolean enablelog, JobType jobType)
throws BadParam, IOException, InterruptedException {
ArrayList args = new ArrayList();
@@ -72,7 +72,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St
TempletonUtils.addCmdForWindows(args);
//check if the rest command specified explicitly to use hcatalog
- if(usehcatalog){
+ if(usesHcatalog){
addHiveMetaStoreTokenArg();
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index b8b5973..854aa99 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -97,6 +97,9 @@ public EnqueueBean enqueueController(String user, Map userArgs,
private String queueAsUser(UserGroupInformation ugi, final List args)
throws IOException, InterruptedException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Launching job: " + args);
+ }
return ugi.doAs(new PrivilegedExceptionAction() {
public String run() throws Exception {
String[] array = new String[args.size()];
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
index 0f37278..c52ea77 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java
@@ -202,10 +202,13 @@ public Server runServer(int port)
public FilterHolder makeAuthFilter() {
FilterHolder authFilter = new FilterHolder(AuthFilter.class);
if (UserGroupInformation.isSecurityEnabled()) {
+ //http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/security/authentication/server/AuthenticationFilter.html
authFilter.setInitParameter("dfs.web.authentication.signature.secret",
conf.kerberosSecret());
+ //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html
authFilter.setInitParameter("dfs.web.authentication.kerberos.principal",
conf.kerberosPrincipal());
+ //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html
authFilter.setInitParameter("dfs.web.authentication.kerberos.keytab",
conf.kerberosKeytab());
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index 9adb1d9..64a68bd 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -29,6 +29,7 @@
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hive.hcatalog.templeton.tool.JobSubmissionConstants;
import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
@@ -47,13 +48,13 @@ public EnqueueBean run(String user, Map userArgs,
String execute, String srcFile,
List pigArgs, String otherFiles,
String statusdir, String callback,
- boolean usehcatalog, String completedUrl, boolean enablelog)
+ boolean useSHcatalog, String completedUrl, boolean enablelog)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
runAs = user;
List args = makeArgs(execute,
srcFile, pigArgs,
- otherFiles, statusdir, usehcatalog, completedUrl, enablelog);
+ otherFiles, statusdir, useSHcatalog, completedUrl, enablelog);
return enqueueController(user, userArgs, callback, args);
}
@@ -64,20 +65,23 @@ public EnqueueBean run(String user, Map userArgs,
* @param pigArgs pig command line arguments
* @param otherFiles files to be copied to the map reduce cluster
* @param statusdir status dir location
- * @param usehcatalog whether the command uses hcatalog/needs to connect
+ * @param usesHcatalog whether the command uses hcatalog/needs to connect
* to hive metastore server
* @param completedUrl call back url
- * @return
* @throws BadParam
* @throws IOException
* @throws InterruptedException
*/
private List makeArgs(String execute, String srcFile,
List pigArgs, String otherFiles,
- String statusdir, boolean usehcatalog,
- String completedUrl, boolean enablelog)
- throws BadParam, IOException, InterruptedException {
+ String statusdir, boolean usesHcatalog,
+ String completedUrl, boolean enablelog) throws BadParam, IOException,
+ InterruptedException {
ArrayList args = new ArrayList();
+ //check if the REST command specified explicitly to use hcatalog
+ // or if it says that implicitly using the pig -useHCatalog arg
+ boolean needsMetastoreAccess = usesHcatalog || hasPigArgUseHcat(pigArgs);
+
try {
ArrayList allFiles = new ArrayList();
if (TempletonUtils.isset(srcFile)) {
@@ -89,12 +93,32 @@ public EnqueueBean run(String user, Map userArgs,
}
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG));
- if (appConf.pigArchive() != null && !appConf.pigArchive().equals(""))
- {
- args.add("-archives");
- args.add(appConf.pigArchive());
+ boolean shipPigTar = appConf.pigArchive() != null && !appConf.pigArchive().equals("");
+ boolean shipHiveTar = needsMetastoreAccess && appConf.hiveArchive() != null
+ && !appConf.hiveArchive().equals("");
+ if(shipPigTar || shipHiveTar) {
+ args.add(ARCHIVES);
+ StringBuilder archives = new StringBuilder();
+ if(shipPigTar) {
+ archives.append(appConf.pigArchive());
+ }
+ if(shipPigTar && shipHiveTar) {
+ archives.append(",");
+ }
+ if(shipHiveTar) {
+ archives.append(appConf.hiveArchive());
+ }
+ args.add(archives.toString());
+ }
+ if(shipHiveTar) {
+ addDef(args, JobSubmissionConstants.PigConstants.HIVE_HOME,
+ appConf.get(AppConfig.HIVE_HOME_PATH));
+ addDef(args, JobSubmissionConstants.PigConstants.HCAT_HOME,
+ appConf.get(AppConfig.HCAT_HOME_PATH));
+ //Pig which uses HCat will pass this to HCat so that it can find the metastore
+ addDef(args, JobSubmissionConstants.PigConstants.PIG_OPTS,
+ appConf.get(AppConfig.HIVE_PROPS_NAME));
}
-
args.add("--");
TempletonUtils.addCmdForWindows(args);
args.add(appConf.pigPath());
@@ -104,9 +128,7 @@ public EnqueueBean run(String user, Map userArgs,
for (String pigArg : pigArgs) {
args.add(TempletonUtils.quoteForWindows(pigArg));
}
- //check if the REST command specified explicitly to use hcatalog
- // or if it says that implicitly using the pig -useHCatalog arg
- if(usehcatalog || hasPigArgUseHcat(pigArgs)){
+ if(needsMetastoreAccess) {
addHiveMetaStoreTokenArg();
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index f41450c..665e5f9 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -635,7 +635,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs,
/**
* Run a MapReduce Jar job.
* Params correspond to the REST api params
- * @param usehcatalog if {@code true}, means the Jar uses HCat and thus needs to access
+ * @param usesHcatalog if {@code true}, means the Jar uses HCat and thus needs to access
* metastore, which requires additional steps for WebHCat to perform in a secure cluster.
* @param callback URL which WebHCat will call when the hive job finishes
* @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob
@@ -651,7 +651,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar,
@FormParam("define") List defines,
@FormParam("statusdir") String statusdir,
@FormParam("callback") String callback,
- @FormParam("usehcatalog") boolean usehcatalog,
+ @FormParam("usehcatalog") boolean usesHcatalog,
@FormParam("enablelog") boolean enablelog)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
ExecuteException, IOException, InterruptedException {
@@ -677,14 +677,14 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar,
return d.run(getDoAsUser(), userArgs,
jar, mainClass,
libjars, files, args, defines,
- statusdir, callback, usehcatalog, getCompletedUrl(), enablelog, JobType.JAR);
+ statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR);
}
/**
* Run a Pig job.
- * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usehcatalog},
+ * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usesHcatalog},
* is interpreted as true.
- * @param usehcatalog if {@code true}, means the Pig script uses HCat and thus needs to access
+ * @param usesHcatalog if {@code true}, means the Pig script uses HCat and thus needs to access
* metastore, which requires additional steps for WebHCat to perform in a secure cluster.
* This does nothing to ensure that Pig is installed on target node in the cluster.
* @param callback URL which WebHCat will call when the hive job finishes
@@ -699,7 +699,7 @@ public EnqueueBean pig(@FormParam("execute") String execute,
@FormParam("files") String otherFiles,
@FormParam("statusdir") String statusdir,
@FormParam("callback") String callback,
- @FormParam("usehcatalog") boolean usehcatalog,
+ @FormParam("usehcatalog") boolean usesHcatalog,
@FormParam("enablelog") boolean enablelog)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
ExecuteException, IOException, InterruptedException {
@@ -725,7 +725,7 @@ public EnqueueBean pig(@FormParam("execute") String execute,
return d.run(getDoAsUser(), userArgs,
execute, srcFile,
pigArgs, otherFiles,
- statusdir, callback, usehcatalog, getCompletedUrl(), enablelog);
+ statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog);
}
/**
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java
index 532a191..cd20c26 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java
@@ -24,6 +24,11 @@
* or hive.
*/
public class TempletonDelegator {
+ /**
+ * http://hadoop.apache.org/docs/r1.0.4/commands_manual.html#Generic+Options
+ */
+ public static final String ARCHIVES = "-archives";
+
protected AppConfig appConf;
public TempletonDelegator(AppConfig appConf) {
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
index dfb66ca..fb46b58 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
@@ -91,6 +91,12 @@ public String getField(Type type, String id, String key) {
BufferedReader in = null;
Path p = new Path(getPath(type) + "/" + id + "/" + key);
try {
+ if(!fs.exists(p)) {
+ //check first, otherwise webhcat.log is full of stack traces from FileSystem when
+ //clients check for status ('exitValue', 'completed', etc.)
+ LOG.debug(p + " does not exist.");
+ return null;
+ }
in = new BufferedReader(new InputStreamReader(fs.open(p)));
String line = null;
String val = "";
@@ -102,9 +108,7 @@ public String getField(Type type, String id, String key) {
}
return val;
} catch (Exception e) {
- //don't print stack trace since clients poll for 'exitValue', 'completed',
- //files which are not there until job completes
- LOG.info("Couldn't find " + p + ": " + e.getMessage());
+ LOG.error("Couldn't find " + p + ": " + e.getMessage(), e);
} finally {
close(in);
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
index 9d3e05c..f46edb3 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
@@ -32,6 +32,6 @@
@Override
List parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
index 2ef404f..2d68918 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
@@ -32,7 +32,7 @@
@Override
List parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
new file mode 100644
index 0000000..482e993
--- /dev/null
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+public interface JobSubmissionConstants {
+ public static final String COPY_NAME = "templeton.copy";
+ public static final String STATUSDIR_NAME = "templeton.statusdir";
+ public static final String ENABLE_LOG = "templeton.enablelog";
+ public static final String JOB_TYPE = "templeton.jobtype";
+ public static final String JAR_ARGS_NAME = "templeton.args";
+ public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
+ public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j";
+ //name of file
+ static final String CONTAINER_LOG4J_PROPS = "override-container-log4j.properties";
+ public static final String STDOUT_FNAME = "stdout";
+ public static final String STDERR_FNAME = "stderr";
+ public static final String EXIT_FNAME = "exit";
+ public static final int WATCHER_TIMEOUT_SECS = 10;
+ public static final int KEEP_ALIVE_MSEC = 60 * 1000;
+ public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__";
+ /**
+ * constants needed for Pig job submission
+ */
+ public static interface PigConstants {
+ public static final String HIVE_HOME = "HIVE_HOME";
+ public static final String HCAT_HOME = "HCAT_HOME";
+ public static final String PIG_OPTS = "PIG_OPTS";
+ }
+}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
new file mode 100644
index 0000000..db8928a
--- /dev/null
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.hcatalog.templeton.BadParam;
+import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Note that this class is used in a different JVM than WebHCat server. Thus it should not call
+ * any classes not available on every node in the cluster (outside webhcat jar).
+ * TempletonControllerJob#run() calls Job.setJarByClass(LaunchMapper.class) which
+ * causes webhcat jar to be shipped to target node, but not it's transitive closure.
+ * Long term we need to clean up this separation and create a separate jar to ship so that the
+ * dependencies are clear. (This used to be an inner class of TempletonControllerJob)
+ */
+@InterfaceAudience.Private
+public class LaunchMapper extends Mapper implements
+ JobSubmissionConstants {
+ /**
+ * This class currently sends everything to stderr, but it should probably use Log4J -
+ * it will end up in 'syslog' of this Map task. For example, look for KeepAlive heartbeat msgs.
+ */
+ private static final Log LOG = LogFactory.getLog(LaunchMapper.class);
+ /**
+ * When a Pig job is submitted and it uses HCat, WebHCat may be configured to ship hive tar
+ * to the target node. Pig on the target node needs some env vars configured.
+ */
+ private static void handlePigEnvVars(Configuration conf, Map env) {
+ if(conf.get(PigConstants.HIVE_HOME) != null) {
+ env.put(PigConstants.HIVE_HOME, new File(conf.get(PigConstants.HIVE_HOME)).getAbsolutePath());
+ }
+ if(conf.get(PigConstants.HCAT_HOME) != null) {
+ env.put(PigConstants.HCAT_HOME, new File(conf.get(PigConstants.HCAT_HOME)).getAbsolutePath());
+ }
+ if(conf.get(PigConstants.PIG_OPTS) != null) {
+ StringBuilder pigOpts = new StringBuilder();
+ for(String prop : conf.get(PigConstants.PIG_OPTS).split(",")) {
+ pigOpts.append("-D").append(prop).append(" ");
+ }
+ env.put(PigConstants.PIG_OPTS, pigOpts.toString());
+ }
+ }
+ protected Process startJob(Context context, String user, String overrideClasspath)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ copyLocal(COPY_NAME, conf);
+ String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
+
+ ArrayList removeEnv = new ArrayList();
+ //why the hell are these removed?
+ removeEnv.add("HADOOP_ROOT_LOGGER");
+ removeEnv.add("hadoop-command");
+ removeEnv.add("CLASS");
+ removeEnv.add("mapredcommand");
+ Map env = TempletonUtils.hadoopUserEnv(user, overrideClasspath);
+ handlePigEnvVars(conf, env);
+ List jarArgsList = new LinkedList(Arrays.asList(jarArgs));
+ String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+
+
+ if (tokenFile != null) {
+ //Token is available, so replace the placeholder
+ tokenFile = tokenFile.replaceAll("\"", "");
+ String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
+ if (Shell.WINDOWS) {
+ try {
+ tokenArg = TempletonUtils.quoteForWindows(tokenArg);
+ } catch (BadParam e) {
+ String msg = "cannot pass " + tokenFile + " to mapreduce.job.credentials.binary";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+ for(int i=0; i it = jarArgsList.iterator();
+ while(it.hasNext()){
+ String arg = it.next();
+ if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
+ it.remove();
+ }
+ }
+ }
+ boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ?
+ false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS));
+ return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps);
+ }
+
+ private void copyLocal(String var, Configuration conf) throws IOException {
+ String[] filenames = TempletonUtils.decodeArray(conf.get(var));
+ if (filenames != null) {
+ for (String filename : filenames) {
+ Path src = new Path(filename);
+ Path dst = new Path(src.getName());
+ FileSystem fs = src.getFileSystem(conf);
+ LOG.info("templeton: copy " + src + " => " + dst);
+ fs.copyToLocalFile(src, dst);
+ }
+ }
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+
+ Process proc = startJob(context,
+ conf.get("user.name"),
+ conf.get(OVERRIDE_CLASSPATH));
+
+ String statusdir = conf.get(STATUSDIR_NAME);
+
+ if (statusdir != null) {
+ try {
+ statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
+ conf.get("user.name"));
+ } catch (URISyntaxException e) {
+ String msg = "Invalid status dir URI";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+
+ Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
+ LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
+
+ ExecutorService pool = Executors.newCachedThreadPool();
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getInputStream(), statusdir, STDOUT_FNAME);
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getErrorStream(), statusdir, STDERR_FNAME);
+ KeepAlive keepAlive = startCounterKeepAlive(pool, context);
+
+ proc.waitFor();
+ keepAlive.sendReport = false;
+ pool.shutdown();
+ if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+
+ writeExitValue(conf, proc.exitValue(), statusdir);
+ JobState state = new JobState(context.getJobID().toString(), conf);
+ state.setExitValue(proc.exitValue());
+ state.setCompleteStatus("done");
+ state.close();
+
+ if (enablelog && TempletonUtils.isset(statusdir)) {
+ LOG.info("templeton: collecting logs for " + context.getJobID().toString()
+ + " to " + statusdir + "/logs");
+ LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
+ logRetriever.run();
+ }
+
+ if (proc.exitValue() != 0) {
+ LOG.info("templeton: job failed with exit code "
+ + proc.exitValue());
+ } else {
+ LOG.info("templeton: job completed with exit code 0");
+ }
+ }
+
+ private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in,
+ String statusdir, String name) throws IOException {
+ Watcher w = new Watcher(conf, jobid, in, statusdir, name);
+ pool.execute(w);
+ }
+
+ private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException {
+ KeepAlive k = new KeepAlive(context);
+ pool.execute(k);
+ return k;
+ }
+
+ private void writeExitValue(Configuration conf, int exitValue, String statusdir)
+ throws IOException {
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, EXIT_FNAME);
+ FileSystem fs = p.getFileSystem(conf);
+ OutputStream out = fs.create(p);
+ LOG.info("templeton: Writing exit value " + exitValue + " to " + p);
+ PrintWriter writer = new PrintWriter(out);
+ writer.println(exitValue);
+ writer.close();
+ }
+ }
+
+
+ private static class Watcher implements Runnable {
+ private final InputStream in;
+ private OutputStream out;
+ private final JobID jobid;
+ private final Configuration conf;
+
+ public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name)
+ throws IOException {
+ this.conf = conf;
+ this.jobid = jobid;
+ this.in = in;
+
+ if (name.equals(STDERR_FNAME)) {
+ out = System.err;
+ } else {
+ out = System.out;
+ }
+
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, name);
+ FileSystem fs = p.getFileSystem(conf);
+ out = fs.create(p);
+ LOG.info("templeton: Writing status to " + p);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ InputStreamReader isr = new InputStreamReader(in);
+ BufferedReader reader = new BufferedReader(isr);
+ PrintWriter writer = new PrintWriter(out);
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ writer.println(line);
+ JobState state = null;
+ try {
+ String percent = TempletonUtils.extractPercentComplete(line);
+ String childid = TempletonUtils.extractChildJobId(line);
+
+ if (percent != null || childid != null) {
+ state = new JobState(jobid.toString(), conf);
+ state.setPercentComplete(percent);
+ state.setChildId(childid);
+ }
+ } catch (IOException e) {
+ LOG.error("templeton: state error: ", e);
+ } finally {
+ if (state != null) {
+ try {
+ state.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+ }
+ writer.flush();
+ if(out != System.err && out != System.out) {
+ //depending on FileSystem implementation flush() may or may not do anything
+ writer.close();
+ }
+ } catch (IOException e) {
+ LOG.error("templeton: execute error: ", e);
+ }
+ }
+ }
+
+ private static class KeepAlive implements Runnable {
+ private final Context context;
+ private volatile boolean sendReport;
+
+ public KeepAlive(Context context)
+ {
+ this.sendReport = true;
+ this.context = context;
+ }
+ private static StringBuilder makeDots(int count) {
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < count; i++) {
+ sb.append('.');
+ }
+ return sb;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int count = 0;
+ while (sendReport) {
+ // Periodically report progress on the Context object
+ // to prevent TaskTracker from killing the Templeton
+ // Controller task
+ context.progress();
+ count++;
+ String msg = "KeepAlive Heart beat" + makeDots(count);
+ LOG.info(msg);
+ Thread.sleep(KEEP_ALIVE_MSEC);
+ }
+ } catch (InterruptedException e) {
+ // Ok to be interrupted
+ }
+ }
+ }
+}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
index 007ac2f..048440c 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
@@ -32,6 +32,6 @@
@Override
List parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index 110ddbb..c1dedce 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -18,23 +18,11 @@
*/
package org.apache.hive.hcatalog.templeton.tool;
-import java.io.BufferedReader;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.URISyntaxException;
+import java.net.URI;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,24 +30,24 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
-import org.apache.hive.hcatalog.templeton.BadParam;
-import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+import org.apache.hive.hcatalog.templeton.AppConfig;
+import org.apache.hive.hcatalog.templeton.Main;
import org.apache.hive.hcatalog.templeton.SecureProxySupport;
import org.apache.hive.hcatalog.templeton.UgiFactory;
import org.apache.thrift.TException;
@@ -83,281 +71,97 @@
* parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job
* is complete.
*/
-public class TempletonControllerJob extends Configured implements Tool {
- public static final String COPY_NAME = "templeton.copy";
- public static final String STATUSDIR_NAME = "templeton.statusdir";
- public static final String ENABLE_LOG = "templeton.enablelog";
- public static final String JOB_TYPE = "templeton.jobtype";
- public static final String JAR_ARGS_NAME = "templeton.args";
- public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
-
- public static final String STDOUT_FNAME = "stdout";
- public static final String STDERR_FNAME = "stderr";
- public static final String EXIT_FNAME = "exit";
-
- public static final int WATCHER_TIMEOUT_SECS = 10;
- public static final int KEEP_ALIVE_MSEC = 60 * 1000;
-
- public static final String TOKEN_FILE_ARG_PLACEHOLDER
- = "__WEBHCAT_TOKEN_FILE_LOCATION__";
-
- private static TrivialExecService execService = TrivialExecService.getInstance();
-
+@InterfaceAudience.Private
+public class TempletonControllerJob extends Configured implements Tool, JobSubmissionConstants {
private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
- private final boolean secureMetastoreAccess;
+ //file to add to DistributedCache
+ private static URI overrideLog4jURI = null;
+ private static boolean overrideContainerLog4jProps;
+ //Jar cmd submission likely will be affected, Pig likely not
+ private static final String affectedMsg = "Monitoring of Hadoop jobs submitted through WebHCat " +
+ "may be affected.";
/**
- * @param secureMetastoreAccess - if true, a delegation token will be created
- * and added to the job
+ * Copy the file from local file system to home dir of the user WebHCat is running as
*/
- public TempletonControllerJob(boolean secureMetastoreAccess) {
- super();
- this.secureMetastoreAccess = secureMetastoreAccess;
- }
- public static class LaunchMapper
- extends Mapper {
- protected Process startJob(Context context, String user,
- String overrideClasspath)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- copyLocal(COPY_NAME, conf);
- String[] jarArgs
- = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
-
- ArrayList removeEnv = new ArrayList();
- removeEnv.add("HADOOP_ROOT_LOGGER");
- removeEnv.add("hadoop-command");
- removeEnv.add("CLASS");
- removeEnv.add("mapredcommand");
- Map env = TempletonUtils.hadoopUserEnv(user,
- overrideClasspath);
- List jarArgsList = new LinkedList(Arrays.asList(jarArgs));
- String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
-
-
- if (tokenFile != null) {
- //Token is available, so replace the placeholder
- tokenFile = tokenFile.replaceAll("\"", "");
- String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
- if (Shell.WINDOWS) {
- try {
- tokenArg = TempletonUtils.quoteForWindows(tokenArg);
- } catch (BadParam e) {
- throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e);
- }
- }
- for(int i=0; i it = jarArgsList.iterator();
- while(it.hasNext()){
- String arg = it.next();
- if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
- it.remove();
- }
- }
- }
- return execService.run(jarArgsList, removeEnv, env);
- }
-
- private void copyLocal(String var, Configuration conf)
- throws IOException {
- String[] filenames = TempletonUtils.decodeArray(conf.get(var));
- if (filenames != null) {
- for (String filename : filenames) {
- Path src = new Path(filename);
- Path dst = new Path(src.getName());
- FileSystem fs = src.getFileSystem(conf);
- System.err.println("templeton: copy " + src + " => " + dst);
- fs.copyToLocalFile(src, dst);
+ private static URI copyLog4JtoFileSystem(final String localFile) throws IOException,
+ InterruptedException {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ return ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public URI run() throws IOException {
+ AppConfig appConfig = Main.getAppConfigInstance();
+ String fsTmpDir = appConfig.get("hadoop.tmp.dir");
+ if(fsTmpDir == null || fsTmpDir.length() <= 0) {
+ fsTmpDir = Path.SEPARATOR + "users";
+ LOG.warn("Could not find 'hadoop.tmp.dir'; will try " + fsTmpDir);
}
- }
- }
-
- @Override
- public void run(Context context)
- throws IOException, InterruptedException {
-
- Configuration conf = context.getConfiguration();
-
- Process proc = startJob(context,
- conf.get("user.name"),
- conf.get(OVERRIDE_CLASSPATH));
-
- String statusdir = conf.get(STATUSDIR_NAME);
-
- if (statusdir != null) {
- try {
- statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
- conf.get("user.name"));
- } catch (URISyntaxException e) {
- throw new IOException("Invalid status dir URI", e);
+ FileSystem fs = FileSystem.get(appConfig);
+ Path dirPath = new Path(fsTmpDir);
+ if(!fs.exists(dirPath)) {
+ fs.mkdirs(dirPath, new FsPermission((short)0775));
}
+ Path dst = fs.makeQualified(new Path(fsTmpDir, CONTAINER_LOG4J_PROPS));
+ fs.copyFromLocalFile(new Path(localFile), dst);
+ //make readable by all users since TempletonControllerJob#run() is run as submitting user
+ fs.setPermission(dst, new FsPermission((short)0644));
+ return dst.toUri();
}
-
- Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
- LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
-
- ExecutorService pool = Executors.newCachedThreadPool();
- executeWatcher(pool, conf, context.getJobID(),
- proc.getInputStream(), statusdir, STDOUT_FNAME);
- executeWatcher(pool, conf, context.getJobID(),
- proc.getErrorStream(), statusdir, STDERR_FNAME);
- KeepAlive keepAlive = startCounterKeepAlive(pool, context);
-
- proc.waitFor();
- keepAlive.sendReport = false;
- pool.shutdown();
- if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- }
-
- writeExitValue(conf, proc.exitValue(), statusdir);
- JobState state = new JobState(context.getJobID().toString(), conf);
- state.setExitValue(proc.exitValue());
- state.setCompleteStatus("done");
- state.close();
-
- if (enablelog && TempletonUtils.isset(statusdir)) {
- System.err.println("templeton: collecting logs for " + context.getJobID().toString()
- + " to " + statusdir + "/logs");
- LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
- logRetriever.run();
- }
-
- if (proc.exitValue() != 0) {
- System.err.println("templeton: job failed with exit code "
- + proc.exitValue());
- }
- else {
- System.err.println("templeton: job completed with exit code 0");
- }
- }
-
- private void executeWatcher(ExecutorService pool, Configuration conf,
- JobID jobid, InputStream in, String statusdir,
- String name)
- throws IOException {
- Watcher w = new Watcher(conf, jobid, in, statusdir, name);
- pool.execute(w);
- }
-
- private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context)
- throws IOException {
- KeepAlive k = new KeepAlive(context);
- pool.execute(k);
- return k;
- }
-
- private void writeExitValue(Configuration conf, int exitValue, String statusdir)
- throws IOException {
- if (TempletonUtils.isset(statusdir)) {
- Path p = new Path(statusdir, EXIT_FNAME);
- FileSystem fs = p.getFileSystem(conf);
- OutputStream out = fs.create(p);
- System.err.println("templeton: Writing exit value "
- + exitValue + " to " + p);
- PrintWriter writer = new PrintWriter(out);
- writer.println(exitValue);
- writer.close();
- }
- }
+ });
}
-
- private static class Watcher implements Runnable {
- private final InputStream in;
- private OutputStream out;
- private final JobID jobid;
- private final Configuration conf;
-
- public Watcher(Configuration conf, JobID jobid, InputStream in,
- String statusdir, String name)
- throws IOException {
- this.conf = conf;
- this.jobid = jobid;
- this.in = in;
-
- if (name.equals(STDERR_FNAME))
- out = System.err;
- else
- out = System.out;
-
- if (TempletonUtils.isset(statusdir)) {
- Path p = new Path(statusdir, name);
- FileSystem fs = p.getFileSystem(conf);
- out = fs.create(p);
- System.err.println("templeton: Writing status to " + p);
- }
- }
-
- @Override
- public void run() {
- try {
- InputStreamReader isr = new InputStreamReader(in);
- BufferedReader reader = new BufferedReader(isr);
- PrintWriter writer = new PrintWriter(out);
-
- String line;
- while ((line = reader.readLine()) != null) {
- writer.println(line);
- JobState state = null;
+ /**
+ * local file system
+ * @return
+ */
+ private static String getLog4JPropsLocal() {
+ return AppConfig.getWebhcatConfDir() + File.separator + CONTAINER_LOG4J_PROPS;
+ }
+ static {
+ //initialize once-per-JVM (i.e. one running WebHCat server) state and log it once since it's
+ // the same for every job
+ try {
+ //safe (thread) publication
+ // http://docs.oracle.com/javase/specs/jls/se5.0/html/execution.html#12.4.2
+ LOG.info("Using Hadoop Version: " + ShimLoader.getMajorVersion());
+ overrideContainerLog4jProps = "0.23".equals(ShimLoader.getMajorVersion());
+ if(overrideContainerLog4jProps) {
+ //see detailed note in CONTAINER_LOG4J_PROPS file
+ LOG.info(AppConfig.WEBHCAT_CONF_DIR + "=" + AppConfig.getWebhcatConfDir());
+ File localFile = new File(getLog4JPropsLocal());
+ if(localFile.exists()) {
+ LOG.info("Found " + localFile.getAbsolutePath() + " to use for job submission.");
try {
- String percent = TempletonUtils.extractPercentComplete(line);
- String childid = TempletonUtils.extractChildJobId(line);
-
- if (percent != null || childid != null) {
- state = new JobState(jobid.toString(), conf);
- state.setPercentComplete(percent);
- state.setChildId(childid);
- }
- } catch (IOException e) {
- System.err.println("templeton: state error: " + e);
- } finally {
- if (state != null) {
- try {
- state.close();
- } catch (IOException e) {
- }
- }
+ overrideLog4jURI = copyLog4JtoFileSystem(getLog4JPropsLocal());
+ LOG.info("Job submission will use log4j.properties=" + overrideLog4jURI);
}
+ catch(IOException ex) {
+ LOG.warn("Will not add " + CONTAINER_LOG4J_PROPS + " to Distributed Cache. " +
+ "Some fields in job status may be unavailable", ex);
+ }
+ }
+ else {
+ LOG.warn("Could not find " + localFile.getAbsolutePath() + ". " + affectedMsg);
}
- writer.flush();
- } catch (IOException e) {
- System.err.println("templeton: execute error: " + e);
}
}
+ catch(Throwable t) {
+ //this intentionally doesn't use TempletonControllerJob.class.getName() to be able to
+ //log errors which may be due to class loading
+ String msg = "org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob is not " +
+ "properly initialized. " + affectedMsg;
+ LOG.error(msg, t);
+ }
}
- public static class KeepAlive implements Runnable {
- private Context context;
- public boolean sendReport;
-
- public KeepAlive(Context context)
- {
- this.sendReport = true;
- this.context = context;
- }
+ private final boolean secureMetastoreAccess;
- @Override
- public void run() {
- try {
- while (sendReport) {
- // Periodically report progress on the Context object
- // to prevent TaskTracker from killing the Templeton
- // Controller task
- context.progress();
- System.err.println("KeepAlive Heart beat");
- Thread.sleep(KEEP_ALIVE_MSEC);
- }
- } catch (InterruptedException e) {
- // Ok to be interrupted
- }
- }
+ /**
+ * @param secureMetastoreAccess - if true, a delegation token will be created
+ * and added to the job
+ */
+ public TempletonControllerJob(boolean secureMetastoreAccess) {
+ super();
+ this.secureMetastoreAccess = secureMetastoreAccess;
}
private JobID submittedJobId;
@@ -365,8 +169,7 @@ public void run() {
public String getSubmittedId() {
if (submittedJobId == null) {
return null;
- }
- else {
+ } else {
return submittedJobId.toString();
}
}
@@ -376,20 +179,42 @@ public String getSubmittedId() {
* @see org.apache.hive.hcatalog.templeton.CompleteDelegator
*/
@Override
- public int run(String[] args)
- throws IOException, InterruptedException, ClassNotFoundException, TException {
+ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
+ TException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Preparing to submit job: " + Arrays.toString(args));
+ }
Configuration conf = getConf();
-
+
conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set("user.name", user);
+ if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+ //must be done before Job object is created
+ conf.set(OVERRIDE_CONTAINER_LOG4J_PROPS, Boolean.TRUE.toString());
+ }
Job job = new Job(conf);
- job.setJarByClass(TempletonControllerJob.class);
- job.setJobName("TempletonControllerJob");
+ job.setJarByClass(LaunchMapper.class);
+ job.setJobName(TempletonControllerJob.class.getSimpleName());
job.setMapperClass(LaunchMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(SingleInputFormat.class);
+ if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+ FileSystem fs = FileSystem.get(conf);
+ if(fs.exists(new Path(overrideLog4jURI))) {
+ ShimLoader.getHadoopShims().getWebHCatShim(conf, UgiFactory.getUgi(user)).addCacheFile(
+ overrideLog4jURI, job);
+ LOG.debug("added " + overrideLog4jURI + " to Dist Cache");
+ }
+ else {
+ //in case this file was deleted by someone issue a warning but don't try to add to
+ // DistributedCache as that will throw and fail job submission
+ LOG.warn("Cannot find " + overrideLog4jURI + " which is created on WebHCat startup/job " +
+ "submission. " + affectedMsg);
+ }
+ }
+
NullOutputFormat of = new NullOutputFormat();
job.setOutputFormatClass(of.getClass());
job.setNumReduceTasks(0);
@@ -404,13 +229,16 @@ public int run(String[] args)
job.submit();
submittedJobId = job.getJobID();
-
if(metastoreTokenStrForm != null) {
//so that it can be cancelled later from CompleteDelegator
DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
submittedJobId.toString(), metastoreTokenStrForm);
- LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " +
- "user=" + user);
+ LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() +
+ " user=" + user);
+ }
+ if(overrideContainerLog4jProps && overrideLog4jURI == null) {
+ //do this here so that log msg has JobID
+ LOG.warn("Could not override container log4j properties for " + submittedJobId);
}
return 0;
}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
index 2de3f27..7140caa 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
@@ -18,21 +18,30 @@
*/
package org.apache.hive.hcatalog.templeton.tool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* Execute a local program. This is a singleton service that will
* execute a programs on the local box.
+ *
+ * Note that is is executed from LaunchMapper which is executed in
+ * different JVM from WebHCat (Templeton) server. Thus it should not call any classes
+ * not available on every node in the cluster (outside webhcat jar)
*/
-public class TrivialExecService {
- private static volatile TrivialExecService theSingleton;
+final class TrivialExecService {
+ //with default log4j config, this output ends up in 'syslog' of the LaunchMapper task
private static final Log LOG = LogFactory.getLog(TrivialExecService.class);
-
+ private static volatile TrivialExecService theSingleton;
+ private static final String HADOOP_CLIENT_OPTS = "HADOOP_CLIENT_OPTS";
/**
* Retrieve the singleton.
*/
@@ -41,32 +50,58 @@ public static synchronized TrivialExecService getInstance() {
theSingleton = new TrivialExecService();
return theSingleton;
}
-
+ /**
+ * See {@link JobSubmissionConstants#CONTAINER_LOG4J_PROPS} file for details.
+ */
+ private static void hadoop2LogRedirect(ProcessBuilder processBuilder) {
+ Map env = processBuilder.environment();
+ if(!env.containsKey(HADOOP_CLIENT_OPTS)) {
+ return;
+ }
+ String hcopts = env.get(HADOOP_CLIENT_OPTS);
+ if(!hcopts.contains("log4j.configuration=container-log4j.properties")) {
+ return;
+ }
+ //TempletonControllerJob ensures that this file is in DistributedCache
+ File log4jProps = new File(JobSubmissionConstants.CONTAINER_LOG4J_PROPS);
+ hcopts = hcopts.replace("log4j.configuration=container-log4j.properties",
+ "log4j.configuration=file://" + log4jProps.getAbsolutePath());
+ //helps figure out what log4j is doing, but may confuse
+ //some jobs due to extra output to stdout
+ //hcopts = hcopts + " -Dlog4j.debug=true";
+ env.put(HADOOP_CLIENT_OPTS, hcopts);
+ }
public Process run(List cmd, List removeEnv,
- Map environmentVariables)
+ Map environmentVariables, boolean overrideContainerLog4jProps)
throws IOException {
- logDebugCmd(cmd, environmentVariables);
+ logDebugMsg("run(cmd, removeEnv, environmentVariables, " + overrideContainerLog4jProps + ")");
+ logDebugMsg("Starting cmd: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd);
- for (String key : removeEnv)
+ for (String key : removeEnv) {
+ if(pb.environment().containsKey(key)) {
+ logDebugMsg("Removing env var: " + key + "=" + pb.environment().get(key));
+ }
pb.environment().remove(key);
+ }
pb.environment().putAll(environmentVariables);
+ if(overrideContainerLog4jProps) {
+ hadoop2LogRedirect(pb);
+ }
+ logDebugInfo("Starting process with env:", pb.environment());
return pb.start();
}
-
- private void logDebugCmd(List cmd,
- Map environmentVariables) {
- if(!LOG.isDebugEnabled()){
- return;
- }
- LOG.debug("starting " + cmd);
- LOG.debug("With environment variables: " );
- for(Map.Entry keyVal : environmentVariables.entrySet()){
- LOG.debug(keyVal.getKey() + "=" + keyVal.getValue());
- }
- LOG.debug("With environment variables already set: " );
- Map env = System.getenv();
- for (String envName : env.keySet()) {
- LOG.debug(envName + "=" + env.get(envName));
- }
+ private static void logDebugInfo(String msg, Map props) {
+ logDebugMsg(msg);
+ List keys = new ArrayList();
+ keys.addAll(props.keySet());
+ Collections.sort(keys);
+ for(String key : keys) {
+ logDebugMsg(key + "=" + props.get(key));
+ }
+ }
+ //private static final String MSG_PREFIX = TrivialExecService.class.getSimpleName() + ": ";
+ private static void logDebugMsg(String msg) {
+ //System.err.println(MSG_PREFIX + msg);
+ LOG.info(msg);
}
}
diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
index a873a96..b76e69a 100644
--- hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
+++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
@@ -38,7 +38,7 @@ public void test() {
Process process = TrivialExecService.getInstance()
.run(list,
new ArrayList(),
- new HashMap());
+ new HashMap(),false);
out = new BufferedReader(new InputStreamReader(
process.getInputStream()));
err = new BufferedReader(new InputStreamReader(
diff --git shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
index ce2e5b7..470dc76 100644
--- shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
+++ shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
@@ -18,82 +18,89 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
/**
* This is in org.apache.hadoop.mapred package because it relies on
* JobSubmissionProtocol which is package private
*/
public class WebHCatJTShim20S implements WebHCatJTShim {
- private JobSubmissionProtocol cnx;
+ private JobSubmissionProtocol cnx;
- /**
- * Create a connection to the Job Tracker.
- */
- public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
- throws IOException {
- cnx = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID,
- getAddress(conf),
- ugi,
- conf,
- NetUtils.getSocketFactory(conf,
- JobSubmissionProtocol.class));
- }
+ /**
+ * Create a connection to the Job Tracker.
+ */
+ public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ cnx = (JobSubmissionProtocol)
+ RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID,
+ getAddress(conf),
+ ugi,
+ conf,
+ NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
+ }
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Profile of the job, or null if not found.
- */
- public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- return cnx.getJobProfile(jobid);
- }
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobProfile(jobid);
+ }
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Status of the job, or null if not found.
- */
- public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- return cnx.getJobStatus(jobid);
- }
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobStatus(jobid);
+ }
- /**
- * Kill a job.
- */
- public void killJob(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- cnx.killJob(jobid);
- }
+ /**
+ * Kill a job.
+ */
+ public void killJob(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ cnx.killJob(jobid);
+ }
- /**
- * Get all the jobs submitted.
- */
- public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
- throws IOException {
- return cnx.getAllJobs();
- }
+ /**
+ * Get all the jobs submitted.
+ */
+ public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
+ throws IOException {
+ return cnx.getAllJobs();
+ }
- /**
- * Close the connection to the Job Tracker.
- */
- public void close() {
- RPC.stopProxy(cnx);
- }
- private InetSocketAddress getAddress(Configuration conf) {
- String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
- return NetUtils.createSocketAddr(jobTrackerStr);
- }
+ /**
+ * Close the connection to the Job Tracker.
+ */
+ public void close() {
+ RPC.stopProxy(cnx);
+ }
+ private InetSocketAddress getAddress(Configuration conf) {
+ String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
+ return NetUtils.createSocketAddr(jobTrackerStr);
+ }
+ @Override
+ public void addCacheFile(URI uri, Job job) {
+ DistributedCache.addCacheFile(uri, job.getConfiguration());
}
+}
diff --git shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
index abb3911..d0a4bf7 100644
--- shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
+++ shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import java.io.IOException;
+import java.net.URI;
public class WebHCatJTShim23 implements WebHCatJTShim {
private JobClient jc;
@@ -88,4 +90,8 @@ public void close() {
} catch (IOException e) {
}
}
+ @Override
+ public void addCacheFile(URI uri, Job job) {
+ job.addCacheFile(uri);
+ }
}
diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
index c7529dc..62ff878 100644
--- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -561,6 +561,11 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte
* Close the connection to the Job Tracker.
*/
public void close();
+ /**
+ * Does exactly what org.apache.hadoop.mapreduce.Job#addCacheFile(URI) in Hadoop 2.
+ * Assumes that both parameters are not {@code null}.
+ */
+ public void addCacheFile(URI uri, Job job);
}
/**