diff --git hcatalog/src/test/e2e/templeton/README.txt hcatalog/src/test/e2e/templeton/README.txt index 3011e5c..00ea693 100644 --- hcatalog/src/test/e2e/templeton/README.txt +++ hcatalog/src/test/e2e/templeton/README.txt @@ -141,8 +141,10 @@ In order for this test suite to work, webhcat-site.xml should have webhcat.proxy and webhcat.proxyuser.hue.hosts defined, i.e. 'hue' should be allowed to impersonate 'joe'. [Of course, 'hcat' proxyuser should be configured in core-site.xml for the command to succeed.] -Furthermore, metastore side file based security should be enabled. To do this 3 properties in -hive-site.xml should be configured: +Furthermore, metastore side file based security should be enabled. +(See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-MetastoreServerSecurity for more info) + +To do this 3 properties in hive-site.xml should be configured: 1) hive.security.metastore.authorization.manager set to org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider 2) hive.security.metastore.authenticator.manager set to diff --git hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm index dcd6465..66a0717 100644 --- hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm +++ hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm @@ -788,7 +788,8 @@ sub compare if ( (defined $testCmd->{'check_job_created'}) || (defined $testCmd->{'check_job_complete'}) - || (defined $testCmd->{'check_job_exit_value'}) ) { + || (defined $testCmd->{'check_job_exit_value'}) + || (defined $testCmd->{'check_job_percent_complete'}) ) { my $jobid = $json_hash->{'id'}; if (!defined $jobid) { print $log "$0::$subName WARN check failed: " @@ -803,7 +804,8 @@ sub compare . "jobresult not defined "; $result = 0; } - if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) { + if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'}) + || defined($testCmd->{'check_job_percent_complete'})) { my $jobComplete; my $NUM_RETRIES = 60; my $SLEEP_BETWEEN_RETRIES = 5; @@ -841,6 +843,15 @@ sub compare $result = 0; } } + # check the percentComplete value + if (defined($testCmd->{'check_job_percent_complete'})) { + my $pcValue = $res_hash->{'percentComplete'}; + my $expectedPercentComplete = $testCmd->{'check_job_percent_complete'}; + if ( (!defined $pcValue) || $pcValue ne $expectedPercentComplete ) { + print $log "check_job_percent_complete failed. got percentComplete $pcValue, expected $expectedPercentComplete"; + $result = 0; + } + } } #Check userargs diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf index 240b01e..01b2b68 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf @@ -73,6 +73,7 @@ $cfg = 'status_code' => 200, 'check_job_created' => 1, 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => 'map 100% reduce 100%', 'check_job_exit_value' => 0, 'check_call_back' => 1, }, @@ -166,6 +167,7 @@ $cfg = 'status_code' => 200, 'check_job_created' => 1, 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => '100% complete', 'check_job_exit_value' => 0, 'check_call_back' => 1, }, diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf index 5e17221..b23fd4b 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf @@ -54,7 +54,9 @@ $cfg = }, { #-ve test - no input file - 'num' => 2, + #TempletonController job status should be success, but exit value should be 1 + #if yarn log is redirected to stderr check_job_complete is FAILURE, if not SUCCESS (HIVE-5511) + 'num' => 2, 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', diff --git hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh index c8899b6..6b0b578 100644 --- hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh +++ hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh @@ -75,7 +75,7 @@ elif [ -e "${WEBHCAT_PREFIX}/conf/webhcat-env.sh" -o -e "${WEBHCAT_PREFIX}/etc/w else DEFAULT_CONF_DIR="/etc/webhcat" fi -WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}" +export WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}" #users can add various env vars to webhcat-env.sh in the conf #rather than having to export them before running the command diff --git hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties new file mode 100644 index 0000000..f6b740f --- /dev/null +++ hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# +# This log4j config overrides hadoop-yarn-server-nodemanager-2.1.0-beta.jar/container-log4j.properties. +#In Hadoop 2, (by default) the log information about M/R job progress is not sent to stderr, +#which is where LaunchMapper expects it. Thus WebHCat is unable to report the +#percentComplete attribute in job status. There is something broken in YARN that doesn't allow +#its log4j properties to be overridden. Thus for now (10/07/2013) we resort to overriding it +#using this file, where log4j.rootLogger specify additional 'console' appender. This file is made +#available through DistributedCache. See TrivialExecService and TempletonControllerJob for more +#info. + +hadoop.root.logger=INFO,CLA + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hadoop.root.logger}, console, EventCounter + +# Logging Threshold +log4j.threshold=ALL + +# +# ContainerLog Appender +# + +#Default values +yarn.app.container.log.dir=null +yarn.app.container.log.filesize=100 + +log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender +log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize} + +log4j.appender.CLA.layout=org.apache.log4j.PatternLayout +log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter + + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n diff --git hcatalog/webhcat/svr/src/main/config/webhcat-default.xml hcatalog/webhcat/svr/src/main/config/webhcat-default.xml index 1bef3c6..d369d5d 100644 --- hcatalog/webhcat/svr/src/main/config/webhcat-default.xml +++ hcatalog/webhcat/svr/src/main/config/webhcat-default.xml @@ -116,6 +116,25 @@ + templeton.hive.home + hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin + + The path to the Hive home within the tar. This is needed if Hive is not installed on all + nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig + job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set. + + + + templeton.hcat.home + hive-0.13.0-SNAPSHOT-bin.tar.gz/hive-0.13.0-SNAPSHOT-bin/hcatalog + + The path to the HCat home within the tar. This is needed if Hive is not installed on all + nodes in the cluster and needs to be shipped to the target node in the cluster to execute Pig + job which uses HCat, Hive query, etc. Has no effect if templeton.hive.archive is not set. + + + + templeton.hive.properties hive.metastore.local=false,hive.metastore.uris=thrift://localhost:9933,hive.metastore.sasl.enabled=false Properties to set when running hive. diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 4783ca9..10fc6df 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -71,6 +71,7 @@ }; public static final String TEMPLETON_HOME_VAR = "TEMPLETON_HOME"; + public static final String WEBHCAT_CONF_DIR = "WEBHCAT_CONF_DIR"; public static final String[] TEMPLETON_CONF_FILENAMES = { "webhcat-default.xml", @@ -90,6 +91,14 @@ public static final String PYTHON_NAME = "templeton.python"; public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive"; public static final String HIVE_PATH_NAME = "templeton.hive.path"; + /** + * see webhcat-default.xml + */ + public static final String HIVE_HOME_PATH = "templeton.hive.home"; + /** + * see webhcat-default.xml + */ + public static final String HCAT_HOME_PATH = "templeton.hcat.home"; public static final String HIVE_PROPS_NAME = "templeton.hive.properties"; public static final String LIB_JARS_NAME = "templeton.libjars"; public static final String PIG_ARCHIVE_NAME = "templeton.pig.archive"; @@ -153,6 +162,9 @@ public String getHadoopConfDir() { public static String getTempletonDir() { return System.getenv(TEMPLETON_HOME_VAR); } + public static String getWebhcatConfDir() { + return System.getenv(WEBHCAT_CONF_DIR); + } private boolean loadOneFileConfig(String dir, String fname) { if (dir != null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java index 5f35176..1b9663d 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java @@ -68,7 +68,7 @@ public CompleteBean run(String id, String jobStatus) try { state = new JobState(id, Main.getAppConfigInstance()); if (state.getCompleteStatus() == null) - failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null); + failed("Job not yet complete. jobId=" + id + " Status from JobTracker=" + jobStatus, null); Long notified = state.getNotifiedTime(); if (notified != null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index c616d6c..a9e13c6 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -128,7 +128,7 @@ public EnqueueBean run(String user, Map userArgs, if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals("")) { - args.add("-archives"); + args.add(ARCHIVES); args.add(appConf.hiveArchive()); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index bea08bb..0d51bc7 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -43,14 +43,14 @@ public EnqueueBean run(String user, Map userArgs, String jar, St String libjars, String files, List jarArgs, List defines, String statusdir, String callback, - boolean usehcatalog, String completedUrl, + boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, usehcatalog, completedUrl, enablelog, jobType); + statusdir, usesHcatalog, completedUrl, enablelog, jobType); return enqueueController(user, userArgs, callback, args); } @@ -58,7 +58,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St private List makeArgs(String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, boolean usehcatalog, String completedUrl, + String statusdir, boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); @@ -72,7 +72,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St TempletonUtils.addCmdForWindows(args); //check if the rest command specified explicitly to use hcatalog - if(usehcatalog){ + if(usesHcatalog){ addHiveMetaStoreTokenArg(); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b8b5973..854aa99 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -97,6 +97,9 @@ public EnqueueBean enqueueController(String user, Map userArgs, private String queueAsUser(UserGroupInformation ugi, final List args) throws IOException, InterruptedException { + if(LOG.isDebugEnabled()) { + LOG.debug("Launching job: " + args); + } return ugi.doAs(new PrivilegedExceptionAction() { public String run() throws Exception { String[] array = new String[args.size()]; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java index 0f37278..c52ea77 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java @@ -202,10 +202,13 @@ public Server runServer(int port) public FilterHolder makeAuthFilter() { FilterHolder authFilter = new FilterHolder(AuthFilter.class); if (UserGroupInformation.isSecurityEnabled()) { + //http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/security/authentication/server/AuthenticationFilter.html authFilter.setInitParameter("dfs.web.authentication.signature.secret", conf.kerberosSecret()); + //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html authFilter.setInitParameter("dfs.web.authentication.kerberos.principal", conf.kerberosPrincipal()); + //http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.2.2/bk_installing_manually_book/content/rpm-chap14-2-3-2.html authFilter.setInitParameter("dfs.web.authentication.kerberos.keytab", conf.kerberosKeytab()); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 9adb1d9..64a68bd 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -29,6 +29,7 @@ import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hive.hcatalog.templeton.tool.JobSubmissionConstants; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -47,13 +48,13 @@ public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, String callback, - boolean usehcatalog, String completedUrl, boolean enablelog) + boolean useSHcatalog, String completedUrl, boolean enablelog) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, usehcatalog, completedUrl, enablelog); + otherFiles, statusdir, useSHcatalog, completedUrl, enablelog); return enqueueController(user, userArgs, callback, args); } @@ -64,20 +65,23 @@ public EnqueueBean run(String user, Map userArgs, * @param pigArgs pig command line arguments * @param otherFiles files to be copied to the map reduce cluster * @param statusdir status dir location - * @param usehcatalog whether the command uses hcatalog/needs to connect + * @param usesHcatalog whether the command uses hcatalog/needs to connect * to hive metastore server * @param completedUrl call back url - * @return * @throws BadParam * @throws IOException * @throws InterruptedException */ private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, boolean usehcatalog, - String completedUrl, boolean enablelog) - throws BadParam, IOException, InterruptedException { + String statusdir, boolean usesHcatalog, + String completedUrl, boolean enablelog) throws BadParam, IOException, + InterruptedException { ArrayList args = new ArrayList(); + //check if the REST command specified explicitly to use hcatalog + // or if it says that implicitly using the pig -useHCatalog arg + boolean needsMetastoreAccess = usesHcatalog || hasPigArgUseHcat(pigArgs); + try { ArrayList allFiles = new ArrayList(); if (TempletonUtils.isset(srcFile)) { @@ -89,12 +93,32 @@ public EnqueueBean run(String user, Map userArgs, } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG)); - if (appConf.pigArchive() != null && !appConf.pigArchive().equals("")) - { - args.add("-archives"); - args.add(appConf.pigArchive()); + boolean shipPigTar = appConf.pigArchive() != null && !appConf.pigArchive().equals(""); + boolean shipHiveTar = needsMetastoreAccess && appConf.hiveArchive() != null + && !appConf.hiveArchive().equals(""); + if(shipPigTar || shipHiveTar) { + args.add(ARCHIVES); + StringBuilder archives = new StringBuilder(); + if(shipPigTar) { + archives.append(appConf.pigArchive()); + } + if(shipPigTar && shipHiveTar) { + archives.append(","); + } + if(shipHiveTar) { + archives.append(appConf.hiveArchive()); + } + args.add(archives.toString()); + } + if(shipHiveTar) { + addDef(args, JobSubmissionConstants.PigConstants.HIVE_HOME, + appConf.get(AppConfig.HIVE_HOME_PATH)); + addDef(args, JobSubmissionConstants.PigConstants.HCAT_HOME, + appConf.get(AppConfig.HCAT_HOME_PATH)); + //Pig which uses HCat will pass this to HCat so that it can find the metastore + addDef(args, JobSubmissionConstants.PigConstants.PIG_OPTS, + appConf.get(AppConfig.HIVE_PROPS_NAME)); } - args.add("--"); TempletonUtils.addCmdForWindows(args); args.add(appConf.pigPath()); @@ -104,9 +128,7 @@ public EnqueueBean run(String user, Map userArgs, for (String pigArg : pigArgs) { args.add(TempletonUtils.quoteForWindows(pigArg)); } - //check if the REST command specified explicitly to use hcatalog - // or if it says that implicitly using the pig -useHCatalog arg - if(usehcatalog || hasPigArgUseHcat(pigArgs)){ + if(needsMetastoreAccess) { addHiveMetaStoreTokenArg(); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index f41450c..665e5f9 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -635,7 +635,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, /** * Run a MapReduce Jar job. * Params correspond to the REST api params - * @param usehcatalog if {@code true}, means the Jar uses HCat and thus needs to access + * @param usesHcatalog if {@code true}, means the Jar uses HCat and thus needs to access * metastore, which requires additional steps for WebHCat to perform in a secure cluster. * @param callback URL which WebHCat will call when the hive job finishes * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob @@ -651,7 +651,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("usehcatalog") boolean usehcatalog, + @FormParam("usehcatalog") boolean usesHcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { @@ -677,14 +677,14 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, return d.run(getDoAsUser(), userArgs, jar, mainClass, libjars, files, args, defines, - statusdir, callback, usehcatalog, getCompletedUrl(), enablelog, JobType.JAR); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR); } /** * Run a Pig job. - * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usehcatalog}, + * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usesHcatalog}, * is interpreted as true. - * @param usehcatalog if {@code true}, means the Pig script uses HCat and thus needs to access + * @param usesHcatalog if {@code true}, means the Pig script uses HCat and thus needs to access * metastore, which requires additional steps for WebHCat to perform in a secure cluster. * This does nothing to ensure that Pig is installed on target node in the cluster. * @param callback URL which WebHCat will call when the hive job finishes @@ -699,7 +699,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("usehcatalog") boolean usehcatalog, + @FormParam("usehcatalog") boolean usesHcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { @@ -725,7 +725,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, return d.run(getDoAsUser(), userArgs, execute, srcFile, pigArgs, otherFiles, - statusdir, callback, usehcatalog, getCompletedUrl(), enablelog); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog); } /** diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java index 532a191..cd20c26 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TempletonDelegator.java @@ -24,6 +24,11 @@ * or hive. */ public class TempletonDelegator { + /** + * http://hadoop.apache.org/docs/r1.0.4/commands_manual.html#Generic+Options + */ + public static final String ARCHIVES = "-archives"; + protected AppConfig appConf; public TempletonDelegator(AppConfig appConf) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java index dfb66ca..fb46b58 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java @@ -91,6 +91,12 @@ public String getField(Type type, String id, String key) { BufferedReader in = null; Path p = new Path(getPath(type) + "/" + id + "/" + key); try { + if(!fs.exists(p)) { + //check first, otherwise webhcat.log is full of stack traces from FileSystem when + //clients check for status ('exitValue', 'completed', etc.) + LOG.debug(p + " does not exist."); + return null; + } in = new BufferedReader(new InputStreamReader(fs.open(p))); String line = null; String val = ""; @@ -102,9 +108,7 @@ public String getField(Type type, String id, String key) { } return val; } catch (Exception e) { - //don't print stack trace since clients poll for 'exitValue', 'completed', - //files which are not there until job completes - LOG.info("Couldn't find " + p + ": " + e.getMessage()); + LOG.error("Couldn't find " + p + ": " + e.getMessage(), e); } finally { close(in); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java index 9d3e05c..f46edb3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java @@ -32,6 +32,6 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java index 2ef404f..2d68918 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java @@ -32,7 +32,7 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java new file mode 100644 index 0000000..482e993 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +public interface JobSubmissionConstants { + public static final String COPY_NAME = "templeton.copy"; + public static final String STATUSDIR_NAME = "templeton.statusdir"; + public static final String ENABLE_LOG = "templeton.enablelog"; + public static final String JOB_TYPE = "templeton.jobtype"; + public static final String JAR_ARGS_NAME = "templeton.args"; + public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; + public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j"; + //name of file + static final String CONTAINER_LOG4J_PROPS = "override-container-log4j.properties"; + public static final String STDOUT_FNAME = "stdout"; + public static final String STDERR_FNAME = "stderr"; + public static final String EXIT_FNAME = "exit"; + public static final int WATCHER_TIMEOUT_SECS = 10; + public static final int KEEP_ALIVE_MSEC = 60 * 1000; + public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__"; + /** + * constants needed for Pig job submission + */ + public static interface PigConstants { + public static final String HIVE_HOME = "HIVE_HOME"; + public static final String HCAT_HOME = "HCAT_HOME"; + public static final String PIG_OPTS = "PIG_OPTS"; + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java new file mode 100644 index 0000000..db8928a --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Shell; +import org.apache.hive.hcatalog.templeton.BadParam; +import org.apache.hive.hcatalog.templeton.LauncherDelegator; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Note that this class is used in a different JVM than WebHCat server. Thus it should not call + * any classes not available on every node in the cluster (outside webhcat jar). + * TempletonControllerJob#run() calls Job.setJarByClass(LaunchMapper.class) which + * causes webhcat jar to be shipped to target node, but not it's transitive closure. + * Long term we need to clean up this separation and create a separate jar to ship so that the + * dependencies are clear. (This used to be an inner class of TempletonControllerJob) + */ +@InterfaceAudience.Private +public class LaunchMapper extends Mapper implements + JobSubmissionConstants { + /** + * This class currently sends everything to stderr, but it should probably use Log4J - + * it will end up in 'syslog' of this Map task. For example, look for KeepAlive heartbeat msgs. + */ + private static final Log LOG = LogFactory.getLog(LaunchMapper.class); + /** + * When a Pig job is submitted and it uses HCat, WebHCat may be configured to ship hive tar + * to the target node. Pig on the target node needs some env vars configured. + */ + private static void handlePigEnvVars(Configuration conf, Map env) { + if(conf.get(PigConstants.HIVE_HOME) != null) { + env.put(PigConstants.HIVE_HOME, new File(conf.get(PigConstants.HIVE_HOME)).getAbsolutePath()); + } + if(conf.get(PigConstants.HCAT_HOME) != null) { + env.put(PigConstants.HCAT_HOME, new File(conf.get(PigConstants.HCAT_HOME)).getAbsolutePath()); + } + if(conf.get(PigConstants.PIG_OPTS) != null) { + StringBuilder pigOpts = new StringBuilder(); + for(String prop : conf.get(PigConstants.PIG_OPTS).split(",")) { + pigOpts.append("-D").append(prop).append(" "); + } + env.put(PigConstants.PIG_OPTS, pigOpts.toString()); + } + } + protected Process startJob(Context context, String user, String overrideClasspath) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + copyLocal(COPY_NAME, conf); + String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); + + ArrayList removeEnv = new ArrayList(); + //why the hell are these removed? + removeEnv.add("HADOOP_ROOT_LOGGER"); + removeEnv.add("hadoop-command"); + removeEnv.add("CLASS"); + removeEnv.add("mapredcommand"); + Map env = TempletonUtils.hadoopUserEnv(user, overrideClasspath); + handlePigEnvVars(conf, env); + List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); + String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + + + if (tokenFile != null) { + //Token is available, so replace the placeholder + tokenFile = tokenFile.replaceAll("\"", ""); + String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; + if (Shell.WINDOWS) { + try { + tokenArg = TempletonUtils.quoteForWindows(tokenArg); + } catch (BadParam e) { + String msg = "cannot pass " + tokenFile + " to mapreduce.job.credentials.binary"; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + for(int i=0; i it = jarArgsList.iterator(); + while(it.hasNext()){ + String arg = it.next(); + if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){ + it.remove(); + } + } + } + boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ? + false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS)); + return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps); + } + + private void copyLocal(String var, Configuration conf) throws IOException { + String[] filenames = TempletonUtils.decodeArray(conf.get(var)); + if (filenames != null) { + for (String filename : filenames) { + Path src = new Path(filename); + Path dst = new Path(src.getName()); + FileSystem fs = src.getFileSystem(conf); + LOG.info("templeton: copy " + src + " => " + dst); + fs.copyToLocalFile(src, dst); + } + } + } + + @Override + public void run(Context context) throws IOException, InterruptedException { + + Configuration conf = context.getConfiguration(); + + Process proc = startJob(context, + conf.get("user.name"), + conf.get(OVERRIDE_CLASSPATH)); + + String statusdir = conf.get(STATUSDIR_NAME); + + if (statusdir != null) { + try { + statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, + conf.get("user.name")); + } catch (URISyntaxException e) { + String msg = "Invalid status dir URI"; + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + + Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); + + ExecutorService pool = Executors.newCachedThreadPool(); + executeWatcher(pool, conf, context.getJobID(), + proc.getInputStream(), statusdir, STDOUT_FNAME); + executeWatcher(pool, conf, context.getJobID(), + proc.getErrorStream(), statusdir, STDERR_FNAME); + KeepAlive keepAlive = startCounterKeepAlive(pool, context); + + proc.waitFor(); + keepAlive.sendReport = false; + pool.shutdown(); + if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + + writeExitValue(conf, proc.exitValue(), statusdir); + JobState state = new JobState(context.getJobID().toString(), conf); + state.setExitValue(proc.exitValue()); + state.setCompleteStatus("done"); + state.close(); + + if (enablelog && TempletonUtils.isset(statusdir)) { + LOG.info("templeton: collecting logs for " + context.getJobID().toString() + + " to " + statusdir + "/logs"); + LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); + logRetriever.run(); + } + + if (proc.exitValue() != 0) { + LOG.info("templeton: job failed with exit code " + + proc.exitValue()); + } else { + LOG.info("templeton: job completed with exit code 0"); + } + } + + private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in, + String statusdir, String name) throws IOException { + Watcher w = new Watcher(conf, jobid, in, statusdir, name); + pool.execute(w); + } + + private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException { + KeepAlive k = new KeepAlive(context); + pool.execute(k); + return k; + } + + private void writeExitValue(Configuration conf, int exitValue, String statusdir) + throws IOException { + if (TempletonUtils.isset(statusdir)) { + Path p = new Path(statusdir, EXIT_FNAME); + FileSystem fs = p.getFileSystem(conf); + OutputStream out = fs.create(p); + LOG.info("templeton: Writing exit value " + exitValue + " to " + p); + PrintWriter writer = new PrintWriter(out); + writer.println(exitValue); + writer.close(); + } + } + + + private static class Watcher implements Runnable { + private final InputStream in; + private OutputStream out; + private final JobID jobid; + private final Configuration conf; + + public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name) + throws IOException { + this.conf = conf; + this.jobid = jobid; + this.in = in; + + if (name.equals(STDERR_FNAME)) { + out = System.err; + } else { + out = System.out; + } + + if (TempletonUtils.isset(statusdir)) { + Path p = new Path(statusdir, name); + FileSystem fs = p.getFileSystem(conf); + out = fs.create(p); + LOG.info("templeton: Writing status to " + p); + } + } + + @Override + public void run() { + try { + InputStreamReader isr = new InputStreamReader(in); + BufferedReader reader = new BufferedReader(isr); + PrintWriter writer = new PrintWriter(out); + + String line; + while ((line = reader.readLine()) != null) { + writer.println(line); + JobState state = null; + try { + String percent = TempletonUtils.extractPercentComplete(line); + String childid = TempletonUtils.extractChildJobId(line); + + if (percent != null || childid != null) { + state = new JobState(jobid.toString(), conf); + state.setPercentComplete(percent); + state.setChildId(childid); + } + } catch (IOException e) { + LOG.error("templeton: state error: ", e); + } finally { + if (state != null) { + try { + state.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + } + writer.flush(); + if(out != System.err && out != System.out) { + //depending on FileSystem implementation flush() may or may not do anything + writer.close(); + } + } catch (IOException e) { + LOG.error("templeton: execute error: ", e); + } + } + } + + private static class KeepAlive implements Runnable { + private final Context context; + private volatile boolean sendReport; + + public KeepAlive(Context context) + { + this.sendReport = true; + this.context = context; + } + private static StringBuilder makeDots(int count) { + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < count; i++) { + sb.append('.'); + } + return sb; + } + + @Override + public void run() { + try { + int count = 0; + while (sendReport) { + // Periodically report progress on the Context object + // to prevent TaskTracker from killing the Templeton + // Controller task + context.progress(); + count++; + String msg = "KeepAlive Heart beat" + makeDots(count); + LOG.info(msg); + Thread.sleep(KEEP_ALIVE_MSEC); + } + } catch (InterruptedException e) { + // Ok to be interrupted + } + } + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java index 007ac2f..048440c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java @@ -32,6 +32,6 @@ @Override List parseJobID() throws IOException { - return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 110ddbb..c1dedce 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -18,23 +18,11 @@ */ package org.apache.hive.hcatalog.templeton.tool; -import java.io.BufferedReader; +import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.net.URISyntaxException; +import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,24 +30,24 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; -import org.apache.hive.hcatalog.templeton.BadParam; -import org.apache.hive.hcatalog.templeton.LauncherDelegator; +import org.apache.hive.hcatalog.templeton.AppConfig; +import org.apache.hive.hcatalog.templeton.Main; import org.apache.hive.hcatalog.templeton.SecureProxySupport; import org.apache.hive.hcatalog.templeton.UgiFactory; import org.apache.thrift.TException; @@ -83,281 +71,97 @@ * parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job * is complete. */ -public class TempletonControllerJob extends Configured implements Tool { - public static final String COPY_NAME = "templeton.copy"; - public static final String STATUSDIR_NAME = "templeton.statusdir"; - public static final String ENABLE_LOG = "templeton.enablelog"; - public static final String JOB_TYPE = "templeton.jobtype"; - public static final String JAR_ARGS_NAME = "templeton.args"; - public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; - - public static final String STDOUT_FNAME = "stdout"; - public static final String STDERR_FNAME = "stderr"; - public static final String EXIT_FNAME = "exit"; - - public static final int WATCHER_TIMEOUT_SECS = 10; - public static final int KEEP_ALIVE_MSEC = 60 * 1000; - - public static final String TOKEN_FILE_ARG_PLACEHOLDER - = "__WEBHCAT_TOKEN_FILE_LOCATION__"; - - private static TrivialExecService execService = TrivialExecService.getInstance(); - +@InterfaceAudience.Private +public class TempletonControllerJob extends Configured implements Tool, JobSubmissionConstants { private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); - private final boolean secureMetastoreAccess; + //file to add to DistributedCache + private static URI overrideLog4jURI = null; + private static boolean overrideContainerLog4jProps; + //Jar cmd submission likely will be affected, Pig likely not + private static final String affectedMsg = "Monitoring of Hadoop jobs submitted through WebHCat " + + "may be affected."; /** - * @param secureMetastoreAccess - if true, a delegation token will be created - * and added to the job + * Copy the file from local file system to home dir of the user WebHCat is running as */ - public TempletonControllerJob(boolean secureMetastoreAccess) { - super(); - this.secureMetastoreAccess = secureMetastoreAccess; - } - public static class LaunchMapper - extends Mapper { - protected Process startJob(Context context, String user, - String overrideClasspath) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - copyLocal(COPY_NAME, conf); - String[] jarArgs - = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); - - ArrayList removeEnv = new ArrayList(); - removeEnv.add("HADOOP_ROOT_LOGGER"); - removeEnv.add("hadoop-command"); - removeEnv.add("CLASS"); - removeEnv.add("mapredcommand"); - Map env = TempletonUtils.hadoopUserEnv(user, - overrideClasspath); - List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); - String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); - - - if (tokenFile != null) { - //Token is available, so replace the placeholder - tokenFile = tokenFile.replaceAll("\"", ""); - String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; - if (Shell.WINDOWS) { - try { - tokenArg = TempletonUtils.quoteForWindows(tokenArg); - } catch (BadParam e) { - throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e); - } - } - for(int i=0; i it = jarArgsList.iterator(); - while(it.hasNext()){ - String arg = it.next(); - if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){ - it.remove(); - } - } - } - return execService.run(jarArgsList, removeEnv, env); - } - - private void copyLocal(String var, Configuration conf) - throws IOException { - String[] filenames = TempletonUtils.decodeArray(conf.get(var)); - if (filenames != null) { - for (String filename : filenames) { - Path src = new Path(filename); - Path dst = new Path(src.getName()); - FileSystem fs = src.getFileSystem(conf); - System.err.println("templeton: copy " + src + " => " + dst); - fs.copyToLocalFile(src, dst); + private static URI copyLog4JtoFileSystem(final String localFile) throws IOException, + InterruptedException { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public URI run() throws IOException { + AppConfig appConfig = Main.getAppConfigInstance(); + String fsTmpDir = appConfig.get("hadoop.tmp.dir"); + if(fsTmpDir == null || fsTmpDir.length() <= 0) { + fsTmpDir = Path.SEPARATOR + "users"; + LOG.warn("Could not find 'hadoop.tmp.dir'; will try " + fsTmpDir); } - } - } - - @Override - public void run(Context context) - throws IOException, InterruptedException { - - Configuration conf = context.getConfiguration(); - - Process proc = startJob(context, - conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); - - String statusdir = conf.get(STATUSDIR_NAME); - - if (statusdir != null) { - try { - statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, - conf.get("user.name")); - } catch (URISyntaxException e) { - throw new IOException("Invalid status dir URI", e); + FileSystem fs = FileSystem.get(appConfig); + Path dirPath = new Path(fsTmpDir); + if(!fs.exists(dirPath)) { + fs.mkdirs(dirPath, new FsPermission((short)0775)); } + Path dst = fs.makeQualified(new Path(fsTmpDir, CONTAINER_LOG4J_PROPS)); + fs.copyFromLocalFile(new Path(localFile), dst); + //make readable by all users since TempletonControllerJob#run() is run as submitting user + fs.setPermission(dst, new FsPermission((short)0644)); + return dst.toUri(); } - - Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); - LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); - - ExecutorService pool = Executors.newCachedThreadPool(); - executeWatcher(pool, conf, context.getJobID(), - proc.getInputStream(), statusdir, STDOUT_FNAME); - executeWatcher(pool, conf, context.getJobID(), - proc.getErrorStream(), statusdir, STDERR_FNAME); - KeepAlive keepAlive = startCounterKeepAlive(pool, context); - - proc.waitFor(); - keepAlive.sendReport = false; - pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { - pool.shutdownNow(); - } - - writeExitValue(conf, proc.exitValue(), statusdir); - JobState state = new JobState(context.getJobID().toString(), conf); - state.setExitValue(proc.exitValue()); - state.setCompleteStatus("done"); - state.close(); - - if (enablelog && TempletonUtils.isset(statusdir)) { - System.err.println("templeton: collecting logs for " + context.getJobID().toString() - + " to " + statusdir + "/logs"); - LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); - logRetriever.run(); - } - - if (proc.exitValue() != 0) { - System.err.println("templeton: job failed with exit code " - + proc.exitValue()); - } - else { - System.err.println("templeton: job completed with exit code 0"); - } - } - - private void executeWatcher(ExecutorService pool, Configuration conf, - JobID jobid, InputStream in, String statusdir, - String name) - throws IOException { - Watcher w = new Watcher(conf, jobid, in, statusdir, name); - pool.execute(w); - } - - private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) - throws IOException { - KeepAlive k = new KeepAlive(context); - pool.execute(k); - return k; - } - - private void writeExitValue(Configuration conf, int exitValue, String statusdir) - throws IOException { - if (TempletonUtils.isset(statusdir)) { - Path p = new Path(statusdir, EXIT_FNAME); - FileSystem fs = p.getFileSystem(conf); - OutputStream out = fs.create(p); - System.err.println("templeton: Writing exit value " - + exitValue + " to " + p); - PrintWriter writer = new PrintWriter(out); - writer.println(exitValue); - writer.close(); - } - } + }); } - - private static class Watcher implements Runnable { - private final InputStream in; - private OutputStream out; - private final JobID jobid; - private final Configuration conf; - - public Watcher(Configuration conf, JobID jobid, InputStream in, - String statusdir, String name) - throws IOException { - this.conf = conf; - this.jobid = jobid; - this.in = in; - - if (name.equals(STDERR_FNAME)) - out = System.err; - else - out = System.out; - - if (TempletonUtils.isset(statusdir)) { - Path p = new Path(statusdir, name); - FileSystem fs = p.getFileSystem(conf); - out = fs.create(p); - System.err.println("templeton: Writing status to " + p); - } - } - - @Override - public void run() { - try { - InputStreamReader isr = new InputStreamReader(in); - BufferedReader reader = new BufferedReader(isr); - PrintWriter writer = new PrintWriter(out); - - String line; - while ((line = reader.readLine()) != null) { - writer.println(line); - JobState state = null; + /** + * local file system + * @return + */ + private static String getLog4JPropsLocal() { + return AppConfig.getWebhcatConfDir() + File.separator + CONTAINER_LOG4J_PROPS; + } + static { + //initialize once-per-JVM (i.e. one running WebHCat server) state and log it once since it's + // the same for every job + try { + //safe (thread) publication + // http://docs.oracle.com/javase/specs/jls/se5.0/html/execution.html#12.4.2 + LOG.info("Using Hadoop Version: " + ShimLoader.getMajorVersion()); + overrideContainerLog4jProps = "0.23".equals(ShimLoader.getMajorVersion()); + if(overrideContainerLog4jProps) { + //see detailed note in CONTAINER_LOG4J_PROPS file + LOG.info(AppConfig.WEBHCAT_CONF_DIR + "=" + AppConfig.getWebhcatConfDir()); + File localFile = new File(getLog4JPropsLocal()); + if(localFile.exists()) { + LOG.info("Found " + localFile.getAbsolutePath() + " to use for job submission."); try { - String percent = TempletonUtils.extractPercentComplete(line); - String childid = TempletonUtils.extractChildJobId(line); - - if (percent != null || childid != null) { - state = new JobState(jobid.toString(), conf); - state.setPercentComplete(percent); - state.setChildId(childid); - } - } catch (IOException e) { - System.err.println("templeton: state error: " + e); - } finally { - if (state != null) { - try { - state.close(); - } catch (IOException e) { - } - } + overrideLog4jURI = copyLog4JtoFileSystem(getLog4JPropsLocal()); + LOG.info("Job submission will use log4j.properties=" + overrideLog4jURI); } + catch(IOException ex) { + LOG.warn("Will not add " + CONTAINER_LOG4J_PROPS + " to Distributed Cache. " + + "Some fields in job status may be unavailable", ex); + } + } + else { + LOG.warn("Could not find " + localFile.getAbsolutePath() + ". " + affectedMsg); } - writer.flush(); - } catch (IOException e) { - System.err.println("templeton: execute error: " + e); } } + catch(Throwable t) { + //this intentionally doesn't use TempletonControllerJob.class.getName() to be able to + //log errors which may be due to class loading + String msg = "org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob is not " + + "properly initialized. " + affectedMsg; + LOG.error(msg, t); + } } - public static class KeepAlive implements Runnable { - private Context context; - public boolean sendReport; - - public KeepAlive(Context context) - { - this.sendReport = true; - this.context = context; - } + private final boolean secureMetastoreAccess; - @Override - public void run() { - try { - while (sendReport) { - // Periodically report progress on the Context object - // to prevent TaskTracker from killing the Templeton - // Controller task - context.progress(); - System.err.println("KeepAlive Heart beat"); - Thread.sleep(KEEP_ALIVE_MSEC); - } - } catch (InterruptedException e) { - // Ok to be interrupted - } - } + /** + * @param secureMetastoreAccess - if true, a delegation token will be created + * and added to the job + */ + public TempletonControllerJob(boolean secureMetastoreAccess) { + super(); + this.secureMetastoreAccess = secureMetastoreAccess; } private JobID submittedJobId; @@ -365,8 +169,7 @@ public void run() { public String getSubmittedId() { if (submittedJobId == null) { return null; - } - else { + } else { return submittedJobId.toString(); } } @@ -376,20 +179,42 @@ public String getSubmittedId() { * @see org.apache.hive.hcatalog.templeton.CompleteDelegator */ @Override - public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException, TException { + public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, + TException { + if(LOG.isDebugEnabled()) { + LOG.debug("Preparing to submit job: " + Arrays.toString(args)); + } Configuration conf = getConf(); - + conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); String user = UserGroupInformation.getCurrentUser().getShortUserName(); conf.set("user.name", user); + if(overrideContainerLog4jProps && overrideLog4jURI != null) { + //must be done before Job object is created + conf.set(OVERRIDE_CONTAINER_LOG4J_PROPS, Boolean.TRUE.toString()); + } Job job = new Job(conf); - job.setJarByClass(TempletonControllerJob.class); - job.setJobName("TempletonControllerJob"); + job.setJarByClass(LaunchMapper.class); + job.setJobName(TempletonControllerJob.class.getSimpleName()); job.setMapperClass(LaunchMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SingleInputFormat.class); + if(overrideContainerLog4jProps && overrideLog4jURI != null) { + FileSystem fs = FileSystem.get(conf); + if(fs.exists(new Path(overrideLog4jURI))) { + ShimLoader.getHadoopShims().getWebHCatShim(conf, UgiFactory.getUgi(user)).addCacheFile( + overrideLog4jURI, job); + LOG.debug("added " + overrideLog4jURI + " to Dist Cache"); + } + else { + //in case this file was deleted by someone issue a warning but don't try to add to + // DistributedCache as that will throw and fail job submission + LOG.warn("Cannot find " + overrideLog4jURI + " which is created on WebHCat startup/job " + + "submission. " + affectedMsg); + } + } + NullOutputFormat of = new NullOutputFormat(); job.setOutputFormatClass(of.getClass()); job.setNumReduceTasks(0); @@ -404,13 +229,16 @@ public int run(String[] args) job.submit(); submittedJobId = job.getJobID(); - if(metastoreTokenStrForm != null) { //so that it can be cancelled later from CompleteDelegator DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( submittedJobId.toString(), metastoreTokenStrForm); - LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " + - "user=" + user); + LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + + " user=" + user); + } + if(overrideContainerLog4jProps && overrideLog4jURI == null) { + //do this here so that log msg has JobID + LOG.warn("Could not override container log4j properties for " + submittedJobId); } return 0; } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java index 2de3f27..7140caa 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java @@ -18,21 +18,30 @@ */ package org.apache.hive.hcatalog.templeton.tool; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * Execute a local program. This is a singleton service that will * execute a programs on the local box. + * + * Note that is is executed from LaunchMapper which is executed in + * different JVM from WebHCat (Templeton) server. Thus it should not call any classes + * not available on every node in the cluster (outside webhcat jar) */ -public class TrivialExecService { - private static volatile TrivialExecService theSingleton; +final class TrivialExecService { + //with default log4j config, this output ends up in 'syslog' of the LaunchMapper task private static final Log LOG = LogFactory.getLog(TrivialExecService.class); - + private static volatile TrivialExecService theSingleton; + private static final String HADOOP_CLIENT_OPTS = "HADOOP_CLIENT_OPTS"; /** * Retrieve the singleton. */ @@ -41,32 +50,58 @@ public static synchronized TrivialExecService getInstance() { theSingleton = new TrivialExecService(); return theSingleton; } - + /** + * See {@link JobSubmissionConstants#CONTAINER_LOG4J_PROPS} file for details. + */ + private static void hadoop2LogRedirect(ProcessBuilder processBuilder) { + Map env = processBuilder.environment(); + if(!env.containsKey(HADOOP_CLIENT_OPTS)) { + return; + } + String hcopts = env.get(HADOOP_CLIENT_OPTS); + if(!hcopts.contains("log4j.configuration=container-log4j.properties")) { + return; + } + //TempletonControllerJob ensures that this file is in DistributedCache + File log4jProps = new File(JobSubmissionConstants.CONTAINER_LOG4J_PROPS); + hcopts = hcopts.replace("log4j.configuration=container-log4j.properties", + "log4j.configuration=file://" + log4jProps.getAbsolutePath()); + //helps figure out what log4j is doing, but may confuse + //some jobs due to extra output to stdout + //hcopts = hcopts + " -Dlog4j.debug=true"; + env.put(HADOOP_CLIENT_OPTS, hcopts); + } public Process run(List cmd, List removeEnv, - Map environmentVariables) + Map environmentVariables, boolean overrideContainerLog4jProps) throws IOException { - logDebugCmd(cmd, environmentVariables); + logDebugMsg("run(cmd, removeEnv, environmentVariables, " + overrideContainerLog4jProps + ")"); + logDebugMsg("Starting cmd: " + cmd); ProcessBuilder pb = new ProcessBuilder(cmd); - for (String key : removeEnv) + for (String key : removeEnv) { + if(pb.environment().containsKey(key)) { + logDebugMsg("Removing env var: " + key + "=" + pb.environment().get(key)); + } pb.environment().remove(key); + } pb.environment().putAll(environmentVariables); + if(overrideContainerLog4jProps) { + hadoop2LogRedirect(pb); + } + logDebugInfo("Starting process with env:", pb.environment()); return pb.start(); } - - private void logDebugCmd(List cmd, - Map environmentVariables) { - if(!LOG.isDebugEnabled()){ - return; - } - LOG.debug("starting " + cmd); - LOG.debug("With environment variables: " ); - for(Map.Entry keyVal : environmentVariables.entrySet()){ - LOG.debug(keyVal.getKey() + "=" + keyVal.getValue()); - } - LOG.debug("With environment variables already set: " ); - Map env = System.getenv(); - for (String envName : env.keySet()) { - LOG.debug(envName + "=" + env.get(envName)); - } + private static void logDebugInfo(String msg, Map props) { + logDebugMsg(msg); + List keys = new ArrayList(); + keys.addAll(props.keySet()); + Collections.sort(keys); + for(String key : keys) { + logDebugMsg(key + "=" + props.get(key)); + } + } + //private static final String MSG_PREFIX = TrivialExecService.class.getSimpleName() + ": "; + private static void logDebugMsg(String msg) { + //System.err.println(MSG_PREFIX + msg); + LOG.info(msg); } } diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java index a873a96..b76e69a 100644 --- hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java @@ -38,7 +38,7 @@ public void test() { Process process = TrivialExecService.getInstance() .run(list, new ArrayList(), - new HashMap()); + new HashMap(),false); out = new BufferedReader(new InputStreamReader( process.getInputStream())); err = new BufferedReader(new InputStreamReader( diff --git shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index ce2e5b7..470dc76 100644 --- shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -18,82 +18,89 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; /** * This is in org.apache.hadoop.mapred package because it relies on * JobSubmissionProtocol which is package private */ public class WebHCatJTShim20S implements WebHCatJTShim { - private JobSubmissionProtocol cnx; + private JobSubmissionProtocol cnx; - /** - * Create a connection to the Job Tracker. - */ - public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi) - throws IOException { - cnx = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - getAddress(conf), - ugi, - conf, - NetUtils.getSocketFactory(conf, - JobSubmissionProtocol.class)); - } + /** + * Create a connection to the Job Tracker. + */ + public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi) + throws IOException { + cnx = (JobSubmissionProtocol) + RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, + getAddress(conf), + ugi, + conf, + NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); + } - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - return cnx.getJobProfile(jobid); - } + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobProfile(jobid); + } - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - return cnx.getJobStatus(jobid); - } + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobStatus(jobid); + } - /** - * Kill a job. - */ - public void killJob(org.apache.hadoop.mapred.JobID jobid) - throws IOException { - cnx.killJob(jobid); - } + /** + * Kill a job. + */ + public void killJob(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + cnx.killJob(jobid); + } - /** - * Get all the jobs submitted. - */ - public org.apache.hadoop.mapred.JobStatus[] getAllJobs() - throws IOException { - return cnx.getAllJobs(); - } + /** + * Get all the jobs submitted. + */ + public org.apache.hadoop.mapred.JobStatus[] getAllJobs() + throws IOException { + return cnx.getAllJobs(); + } - /** - * Close the connection to the Job Tracker. - */ - public void close() { - RPC.stopProxy(cnx); - } - private InetSocketAddress getAddress(Configuration conf) { - String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); - return NetUtils.createSocketAddr(jobTrackerStr); - } + /** + * Close the connection to the Job Tracker. + */ + public void close() { + RPC.stopProxy(cnx); + } + private InetSocketAddress getAddress(Configuration conf) { + String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); + return NetUtils.createSocketAddr(jobTrackerStr); + } + @Override + public void addCacheFile(URI uri, Job job) { + DistributedCache.addCacheFile(uri, job.getConfiguration()); } +} diff --git shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index abb3911..d0a4bf7 100644 --- shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -18,10 +18,12 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import java.io.IOException; +import java.net.URI; public class WebHCatJTShim23 implements WebHCatJTShim { private JobClient jc; @@ -88,4 +90,8 @@ public void close() { } catch (IOException e) { } } + @Override + public void addCacheFile(URI uri, Job job) { + job.addCacheFile(uri); + } } diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index c7529dc..62ff878 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -561,6 +561,11 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte * Close the connection to the Job Tracker. */ public void close(); + /** + * Does exactly what org.apache.hadoop.mapreduce.Job#addCacheFile(URI) in Hadoop 2. + * Assumes that both parameters are not {@code null}. + */ + public void addCacheFile(URI uri, Job job); } /**