diff --git conf/templeton-default.xml conf/templeton-default.xml index bdd86d0..2db7605 100644 --- conf/templeton-default.xml +++ conf/templeton-default.xml @@ -78,6 +78,12 @@ limitations under the License. + templeton.python + ${env.PYTHON_CMD} + The path to the python executable. + + + templeton.pig.archive hdfs:///user/templeton/pig-0.9.2.tar.gz The path to the Pig archive. diff --git conf/templeton-env.sh conf/templeton-env.sh new file mode 100644 index 0000000..2489eec --- /dev/null +++ conf/templeton-env.sh @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +export JAVA_HOME=${JAVA_HOME} + +export PYTHON_CMD=`which python` diff --git src/test/e2e/templeton/build.xml src/test/e2e/templeton/build.xml index ed85eba..64b5180 100644 --- src/test/e2e/templeton/build.xml +++ src/test/e2e/templeton/build.xml @@ -17,76 +17,89 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git src/test/e2e/templeton/drivers/TestDriverCurl.pm src/test/e2e/templeton/drivers/TestDriverCurl.pm index 79373bf..3591397 100644 --- src/test/e2e/templeton/drivers/TestDriverCurl.pm +++ src/test/e2e/templeton/drivers/TestDriverCurl.pm @@ -155,6 +155,9 @@ sub globalSetup # Setup the output path my $me = `whoami`; chomp $me; + #usernames on windows can be "domain\username" change the "\" + # as runid is used in file names + $me =~ s/\\/_/; $globalHash->{'runid'} = $me . "." . time; # if "-ignore false" was provided on the command line, @@ -495,7 +498,7 @@ sub execCurlCmd(){ push @curl_cmd, ("-X", $method, "-o", $res_body, "-D", $res_header); push @curl_cmd, ($url); - print $log "$0:$subName Going to run command : " . join (' ', @curl_cmd); + print $log "$0:$subName Going to run command : " . join (' , ', @curl_cmd); print $log "\n"; diff --git src/test/e2e/templeton/tests/ddl.conf src/test/e2e/templeton/tests/ddl.conf index 673b9f2..36c15f9 100644 --- src/test/e2e/templeton/tests/ddl.conf +++ src/test/e2e/templeton/tests/ddl.conf @@ -337,7 +337,7 @@ $cfg = "fieldsTerminatedBy" : "\u0001", "collectionItemsTerminatedBy" : "\u0002", "mapKeysTerminatedBy" : "\u0003", - "linesTerminatedBy" : "\n", + "linesTerminatedBy" : "\\\n", "serde" : { "name" : "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", @@ -500,15 +500,7 @@ $cfg = 'url' => ':TEMPLETON_URL:/templeton/v1/ddl', 'status_code' => 200, 'post_options' => ['user.name=:UNAME:', - 'exec=create table if not exists templetontest_parts (i int, j bigint, ip STRING COMMENT "IP Address of the User") -COMMENT "This is the page view table" - PARTITIONED BY(dt STRING, country STRING) -ROW FORMAT DELIMITED - FIELDS TERMINATED BY "\001" - COLLECTION ITEMS TERMINATED BY "\002" - MAP KEYS TERMINATED BY "\003" -STORED AS rcfile ---LOCATION "table1_location" '], + 'exec=create table if not exists templetontest_parts (i int, j bigint, ip STRING COMMENT \'IP Address of the User\') COMMENT \'This is the page view table\' PARTITIONED BY(dt STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY \'\001\' COLLECTION ITEMS TERMINATED BY \'\002\' MAP KEYS TERMINATED BY \'\003\' STORED AS rcfile --LOCATION \'table1_location\' '], 'json_field_substr_match' => {'stderr' => 'OK'}, }, { @@ -747,7 +739,7 @@ STORED AS rcfile 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/ddl?user.name=:UNAME:', 'status_code' => 200, - 'post_options' => ['user.name=:UNAME:','exec=create table if not exists templeton_testcol_tab (i int comment "column with comment", j bigint) STORED AS rcfile;'], + 'post_options' => ['user.name=:UNAME:','exec=create table if not exists templeton_testcol_tab (i int comment \'column with comment\', j bigint) STORED AS rcfile;'], 'json_field_substr_match' => {'stderr' => 'OK'}, }, { @@ -1074,9 +1066,7 @@ STORED AS rcfile 'status_code' => 200, 'post_options' => ['user.name=:UNAME:', 'permissions=---------', - 'exec=create table templetontest_hcatgp(i int, j bigint) - PARTITIONED BY(dt STRING, country STRING) - STORED AS rcfile;' + 'exec=create table templetontest_hcatgp(i int, j bigint) PARTITIONED BY(dt STRING, 3Bcountry STRING) STORED AS rcfile;' ], 'json_field_substr_match' => {'stderr' => 'OK', 'exitcode' => '^0$'} }, diff --git src/test/e2e/templeton/tests/jobsubmission.conf src/test/e2e/templeton/tests/jobsubmission.conf index 6e4ebed..69a8dea 100644 --- src/test/e2e/templeton/tests/jobsubmission.conf +++ src/test/e2e/templeton/tests/jobsubmission.conf @@ -18,42 +18,6 @@ $cfg = [ ##============================================================================================================= { - 'name' => 'TestStreaming', - 'tests' => - [ - { - 'num' => 1, - 'method' => 'POST', - 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', - 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', - 'mapper=/bin/cat', 'reducer=/usr/bin/wc'], - 'json_field_substr_match' => { 'id' => '\d+'}, - #results - 'status_code' => 200, - 'check_job_created' => 1, - 'check_job_complete' => 'SUCCESS', - 'check_call_back' => 1, - }, - { - #-ve test - no input file - 'num' => 2, - 'ignore' => 'wait for fix in hadoop 1.0.3', - 'method' => 'POST', - 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', - 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', - 'mapper=/bin/ls no_such-file-12e3', 'reducer=/usr/bin/wc'], - 'json_field_substr_match' => { 'id' => '\d+'}, - #results - 'status_code' => 200, - 'check_job_created' => 1, - 'check_job_complete' => 'FAILURE', - 'check_call_back' => 1, - }, - - ] - }, -##============================================================================================================= - { 'name' => 'TestKillJob', 'tests' => [ @@ -62,7 +26,7 @@ $cfg = 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', - 'mapper=/bin/sleep 100', 'reducer=/usr/bin/wc'], + 'mapper=\cygwin\bin\sleep 100', 'reducer=\cygwin\bin\wc'], 'json_field_substr_match' => { 'id' => '\d+'}, #results 'status_code' => 200, diff --git src/test/e2e/templeton/tests/jobsubmission_streaming_lin.conf src/test/e2e/templeton/tests/jobsubmission_streaming_lin.conf new file mode 100644 index 0000000..ec0107d --- /dev/null +++ src/test/e2e/templeton/tests/jobsubmission_streaming_lin.conf @@ -0,0 +1,60 @@ +############################################################################### +# curl command tests for templeton +# +# + +#use Yahoo::Miners::Test::PigSetup; + +#PigSetup::setup(); + +#my $me = `whoami`; +#chomp $me; + +$cfg = +{ + 'driver' => 'Curl', + + 'groups' => + [ +##============================================================================================================= + { + 'name' => 'TestStreaming', + 'tests' => + [ + { + 'num' => 1, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', + 'mapper=/bin/cat', 'reducer=/usr/bin/wc'], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_call_back' => 1, + }, + { + #-ve test - no input file + 'num' => 2, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', + 'mapper=/bin/ls no_such-file-12e3', 'reducer=/usr/bin/wc'], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'FAILURE', + 'check_call_back' => 1, + }, + + ] + }, +##============================================================================================================= + + + ] +}, + ; + diff --git src/test/e2e/templeton/tests/jobsubmission_streaming_win.conf src/test/e2e/templeton/tests/jobsubmission_streaming_win.conf new file mode 100644 index 0000000..6328d78 --- /dev/null +++ src/test/e2e/templeton/tests/jobsubmission_streaming_win.conf @@ -0,0 +1,60 @@ +############################################################################### +# curl command tests for templeton +# +# + +#use Yahoo::Miners::Test::PigSetup; + +#PigSetup::setup(); + +#my $me = `whoami`; +#chomp $me; + +$cfg = +{ + 'driver' => 'Curl', + + 'groups' => + [ +##============================================================================================================= + { + 'name' => 'TestStreaming', + 'tests' => + [ + { + 'num' => 1, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', + 'mapper=\cygwin\bin\cat.exe', 'reducer=\cygwin\bin\wc.exe'], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_call_back' => 1, + }, + { + #-ve test - no input file + 'num' => 2, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', + 'mapper=/bin/ls no_such-file-12e3', 'reducer=/usr/bin/wc'], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'FAILURE', + 'check_call_back' => 1, + }, + + ] + }, +##============================================================================================================= + + + ] +}, + ; + diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java index 80ddb1e..f39a1a8 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -84,6 +84,7 @@ public class AppConfig extends Configuration { public static final String HADOOP_NAME = "templeton.hadoop"; public static final String HADOOP_CONF_DIR = "templeton.hadoop.conf.dir"; public static final String HCAT_NAME = "templeton.hcat"; + public static final String PYTHON_NAME = "templeton.python"; public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive"; public static final String HIVE_PATH_NAME = "templeton.hive.path"; public static final String HIVE_PROPS_NAME = "templeton.hive.properties"; @@ -168,6 +169,7 @@ public class AppConfig extends Configuration { public String libJars() { return get(LIB_JARS_NAME); } public String clusterHadoop() { return get(HADOOP_NAME); } public String clusterHcat() { return get(HCAT_NAME); } + public String clusterPython() { return get(PYTHON_NAME); } public String pigPath() { return get(PIG_PATH_NAME); } public String pigArchive() { return get(PIG_ARCHIVE_NAME); } public String hivePath() { return get(HIVE_PATH_NAME); } @@ -192,14 +194,11 @@ public class AppConfig extends Configuration { public long zkCleanupInterval() { return getLong(ZooKeeperCleanup.ZK_CLEANUP_INTERVAL, - (1000L * 60L * 60L * 12L)); - } - - public long zkMaxAge() { + (1000L * 60L * 60L * 12L)); } + public long zkMaxAge() { return getLong(ZooKeeperCleanup.ZK_CLEANUP_MAX_AGE, - (1000L * 60L * 60L * 24L * 7L)); - } - + (1000L * 60L * 60L * 24L * 7L)); } public String zkHosts() { return get(ZooKeeperStorage.ZK_HOSTS); } - public int zkSessionTimeout() { return getInt(ZooKeeperStorage.ZK_SESSION_TIMEOUT, 30000); } + public int zkSessionTimeout() { return getInt(ZooKeeperStorage.ZK_SESSION_TIMEOUT, + 30000); } } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ExecServiceImpl.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ExecServiceImpl.java index 41920b5..edbd36a 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ExecServiceImpl.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ExecServiceImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,12 +17,18 @@ */ package org.apache.hcatalog.templeton; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Semaphore; import org.apache.commons.exec.CommandLine; @@ -32,6 +38,39 @@ import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.Shell; + + +class StreamOutputWriter extends Thread +{ + InputStream is; + String type; + PrintWriter out; + + StreamOutputWriter(InputStream is, String type, OutputStream outStream) + { + this.is = is; + this.type = type; + this.out = new PrintWriter(outStream, true); + } + + @Override + public void run() + { + try + { + BufferedReader br = + new BufferedReader(new InputStreamReader(is)); + String line = null; + while ( (line = br.readLine()) != null){ + out.println(line); + } + } catch (IOException ioe) + { + ioe.printStackTrace(); + } + } +} /** * Execute a local program. This is a singleton service that will @@ -65,14 +104,15 @@ public class ExecServiceImpl implements ExecService { * the number of processes that can simultaneously created for * this instance. * + * @param user A valid user * @param program The program to run - * @param args Arguments to pass to the program * @param env Any extra environment variables to set - * @return The result of the run. + * @returns The result of the run. */ public ExecBean run(String program, List args, Map env) - throws NotAuthorizedException, BusyException, ExecuteException, IOException { + throws NotAuthorizedException, BusyException, ExecuteException, IOException + { boolean aquired = false; try { aquired = avail.tryAcquire(); @@ -92,14 +132,14 @@ public class ExecServiceImpl implements ExecService { * Run the program synchronously as the given user. Warning: * CommandLine will trim the argument strings. * + * @param user A valid user * @param program The program to run. - * @param args Arguments to pass to the program - * @param env Any extra environment variables to set - * @return The result of the run. + * @returns The result of the run. */ public ExecBean runUnlimited(String program, List args, Map env) - throws NotAuthorizedException, ExecuteException, IOException { + throws NotAuthorizedException, ExecuteException, IOException + { try { return auxRun(program, args, env); } catch (IOException e) { @@ -108,12 +148,13 @@ public class ExecServiceImpl implements ExecService { throw e; else throw new IOException("Invalid permissions on Templeton directory: " - + cwd.getCanonicalPath()); + + cwd.getCanonicalPath()); } } private ExecBean auxRun(String program, List args, Map env) - throws NotAuthorizedException, ExecuteException, IOException { + throws NotAuthorizedException, ExecuteException, IOException + { DefaultExecutor executor = new DefaultExecutor(); executor.setExitValues(null); @@ -132,17 +173,54 @@ public class ExecServiceImpl implements ExecService { LOG.info("Running: " + cmd); ExecBean res = new ExecBean(); - res.exitcode = executor.execute(cmd, execEnv(env)); + + if(Shell.WINDOWS){ + //The default executor is sometimes causing failure on windows. hcat + // command sometimes returns non zero exit status with it. It seems + // to hit some race conditions on windows. + env = execEnv(env); + String[] envVals = new String[env.size()]; + int i=0; + for( Entry kv : env.entrySet()){ + envVals[i++] = kv.getKey() + "=" + kv.getValue(); + System.out.println("Setting " + kv.getKey() + "=" + kv.getValue()); + } + Process proc = Runtime.getRuntime().exec(cmd.toStrings(), envVals); + //consume stderr + StreamOutputWriter errorGobbler = new + StreamOutputWriter(proc.getErrorStream(), "ERROR", errStream); + + //consume stdout + StreamOutputWriter outputGobbler = new + StreamOutputWriter(proc.getInputStream(), "OUTPUT", outStream); + + //start collecting input streams + errorGobbler.start(); + outputGobbler.start(); + //execute + try{ + res.exitcode = proc.waitFor(); + } catch (InterruptedException e) { + throw new IOException(e); + } + //flush + errorGobbler.out.flush(); + outputGobbler.out.flush(); + } + else { + res.exitcode = executor.execute(cmd, execEnv(env)); + } String enc = appConf.get(AppConfig.EXEC_ENCODING_NAME); res.stdout = outStream.toString(enc); res.stderr = errStream.toString(enc); - return res; + } private CommandLine makeCommandLine(String program, List args) - throws NotAuthorizedException, IOException { + throws NotAuthorizedException, IOException + { String path = validateProgram(program); CommandLine cmd = new CommandLine(path); if (args != null) @@ -166,9 +244,10 @@ public class ExecServiceImpl implements ExecService { res.put(key, val); } } + if (env != null) res.putAll(env); - for (Map.Entry envs : res.entrySet()) { + for(Map.Entry envs : res.entrySet()){ LOG.info("Env " + envs.getKey() + "=" + envs.getValue()); } return res; @@ -179,10 +258,11 @@ public class ExecServiceImpl implements ExecService { * an exception if the program is missing or not authorized. * * @param path The path of the program. - * @return The path of the validated program. + * @return The path of the validated program. */ public String validateProgram(String path) - throws NotAuthorizedException, IOException { + throws NotAuthorizedException, IOException + { File f = new File(path); if (f.canExecute()) { return f.getCanonicalPath(); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HcatDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HcatDelegator.java index 94855dd..cca677e 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HcatDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HcatDelegator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import javax.ws.rs.core.Response; import org.apache.commons.exec.ExecuteException; @@ -34,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hcatalog.templeton.tool.TempletonUtils; - /** * Run hcat on the local server using the ExecService. This is * the backend of the ddl web service. @@ -53,7 +53,8 @@ public class HcatDelegator extends LauncherDelegator { */ public ExecBean run(String user, String exec, boolean format, String group, String permissions) - throws NotAuthorizedException, BusyException, ExecuteException, IOException { + throws NotAuthorizedException, BusyException, ExecuteException, IOException + { SecureProxySupport proxy = new SecureProxySupport(); try { List args = makeArgs(exec, format, group, permissions); @@ -64,7 +65,7 @@ public class HcatDelegator extends LauncherDelegator { Map env = TempletonUtils.hadoopUserEnv(user, cp); proxy.addEnv(env); proxy.addArgs(args); - return execService.run(appConf.clusterHcat(), args, env); + return execService.run(appConf.clusterPython(), args, env); } catch (InterruptedException e) { throw new IOException(e); } finally { @@ -76,8 +77,9 @@ public class HcatDelegator extends LauncherDelegator { private List makeArgs(String exec, boolean format, String group, String permissions) { ArrayList args = new ArrayList(); + args.add(appConf.clusterHcat()); args.add("-e"); - args.add(exec); + args.add('"' + exec + '"'); if (TempletonUtils.isset(group)) { args.add("-g"); args.add(group); @@ -103,7 +105,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response descDatabase(String user, String db, boolean extended) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "desc database " + db + "; "; if (extended) exec = "desc database extended " + db + "; "; @@ -114,12 +117,12 @@ public class HcatDelegator extends LauncherDelegator { } catch (HcatException e) { if (e.execBean.stderr.indexOf("Error in semantic analysis") > -1) { return JsonBuilder.create(). - put("error", "Database " + db + " does not exist") - .put("errorCode", "404") - .put("database", db).build(); + put("error", "Database " + db + " does not exist") + .put("errorCode", "404") + .put("database", db).build(); } throw new HcatException("unable to describe database: " + db, - e.execBean, exec); + e.execBean, exec); } } @@ -129,7 +132,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response listDatabases(String user, String dbPattern) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("show databases like '%s';", dbPattern); try { String res = jsonRun(user, exec); @@ -137,7 +141,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to show databases for: " + dbPattern, - e.execBean, exec); + e.execBean, exec); } } @@ -146,7 +150,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response createDatabase(String user, DatabaseDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "create database"; if (desc.ifNotExists) exec += " if not exists"; @@ -157,7 +162,7 @@ public class HcatDelegator extends LauncherDelegator { exec += String.format(" location '%s'", desc.location); if (TempletonUtils.isset(desc.properties)) exec += String.format(" with dbproperties (%s)", - makePropertiesStatement(desc.properties)); + makePropertiesStatement(desc.properties)); exec += ";"; String res = jsonRun(user, exec, desc.group, desc.permissions); @@ -173,7 +178,8 @@ public class HcatDelegator extends LauncherDelegator { boolean ifExists, String option, String group, String permissions) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "drop database"; if (ifExists) exec += " if exists"; @@ -193,7 +199,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response createTable(String user, String db, TableDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = makeCreateTable(db, desc); try { @@ -205,7 +212,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (final HcatException e) { throw new HcatException("unable to create table: " + desc.table, - e.execBean, exec); + e.execBean, exec); } } @@ -214,7 +221,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response createTableLike(String user, String db, TableLikeDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; create", db); if (desc.external) @@ -233,7 +241,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (final HcatException e) { throw new HcatException("unable to create table: " + desc.newTable, - e.execBean, exec); + e.execBean, exec); } } @@ -242,7 +250,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response descTable(String user, String db, String table, boolean extended) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "use " + db + "; "; if (extended) exec += "desc extended " + table + "; "; @@ -256,7 +265,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to describe table: " + table, - e.execBean, exec); + e.execBean, exec); } } @@ -266,9 +275,10 @@ public class HcatDelegator extends LauncherDelegator { */ public Response listTables(String user, String db, String tablePattern) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; show tables like '%s';", - db, tablePattern); + db, tablePattern); try { String res = jsonRun(user, exec); return JsonBuilder.create(res) @@ -276,7 +286,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to show tables for: " + tablePattern, - e.execBean, exec); + e.execBean, exec); } } @@ -286,9 +296,10 @@ public class HcatDelegator extends LauncherDelegator { */ public Response descExtendedTable(String user, String db, String table) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; show table extended like %s;", - db, table); + db, table); try { String res = jsonRun(user, exec); JsonBuilder jb = JsonBuilder.create(singleTable(res, table)) @@ -355,7 +366,7 @@ public class HcatDelegator extends LauncherDelegator { exec += String.format(" location '%s'", desc.location); if (TempletonUtils.isset(desc.tableProperties)) exec += String.format(" tblproperties (%s)", - makePropertiesStatement(desc.tableProperties)); + makePropertiesStatement(desc.tableProperties)); exec += ";"; return exec; @@ -402,9 +413,9 @@ public class HcatDelegator extends LauncherDelegator { private String makeRowFormat(TableDesc.RowFormatDesc desc) { String res = makeTermBy(desc.fieldsTerminatedBy, "fields") - + makeTermBy(desc.collectionItemsTerminatedBy, "collection items") - + makeTermBy(desc.mapKeysTerminatedBy, "map keys") - + makeTermBy(desc.linesTerminatedBy, "lines"); + + makeTermBy(desc.collectionItemsTerminatedBy, "collection items") + + makeTermBy(desc.mapKeysTerminatedBy, "map keys") + + makeTermBy(desc.linesTerminatedBy, "lines"); if (TempletonUtils.isset(res)) return "row format delimited" + res; @@ -418,7 +429,7 @@ public class HcatDelegator extends LauncherDelegator { private String makeTermBy(String sep, String fieldName) { if (TempletonUtils.isset(sep)) - return String.format(" %s terminated by '%s'", fieldName, sep); + return String.format(" %s terminated by '%s'", fieldName, sep); else return ""; } @@ -428,7 +439,7 @@ public class HcatDelegator extends LauncherDelegator { String res = "row format serde " + desc.name; if (TempletonUtils.isset(desc.properties)) res += String.format(" with serdeproperties (%s)", - makePropertiesStatement(desc.properties)); + makePropertiesStatement(desc.properties)); return res; } @@ -445,13 +456,14 @@ public class HcatDelegator extends LauncherDelegator { String res = String.format("stored by '%s'", desc.className); if (TempletonUtils.isset(desc.properties)) res += String.format(" with serdeproperties (%s)", - makePropertiesStatement(desc.properties)); + makePropertiesStatement(desc.properties)); return res; } // Pull out the first table from the "show extended" json. private String singleTable(String json, String table) - throws IOException { + throws IOException + { Map obj = JsonBuilder.jsonToMap(json); if (JsonBuilder.isError(obj)) return json; @@ -462,8 +474,8 @@ public class HcatDelegator extends LauncherDelegator { else { return JsonBuilder .createError(String.format("Table %s does not exist", table), - JsonBuilder.MISSING). - buildJson(); + JsonBuilder.MISSING). + buildJson(); } } @@ -474,7 +486,8 @@ public class HcatDelegator extends LauncherDelegator { String table, boolean ifExists, String group, String permissions) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; drop table", db); if (ifExists) exec += " if exists"; @@ -498,9 +511,10 @@ public class HcatDelegator extends LauncherDelegator { String oldTable, String newTable, String group, String permissions) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; alter table %s rename to %s;", - db, oldTable, newTable); + db, oldTable, newTable); try { String res = jsonRun(user, exec, group, permissions, true); return JsonBuilder.create(res) @@ -509,7 +523,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to rename table: " + oldTable, - e.execBean, exec); + e.execBean, exec); } } @@ -519,7 +533,8 @@ public class HcatDelegator extends LauncherDelegator { public Response descTableProperty(String user, String db, String table, String property) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { Response res = descTable(user, db, table, true); if (res.getStatus() != JsonBuilder.OK) return res; @@ -545,7 +560,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response listTableProperties(String user, String db, String table) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { Response res = descTable(user, db, table, true); if (res.getStatus() != JsonBuilder.OK) return res; @@ -563,10 +579,11 @@ public class HcatDelegator extends LauncherDelegator { public Response addOneTableProperty(String user, String db, String table, TablePropertyDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; alter table %s set tblproperties ('%s'='%s');", - db, table, desc.name, desc.value); + db, table, desc.name, desc.value); try { String res = jsonRun(user, exec, desc.group, desc.permissions, true); return JsonBuilder.create(res) @@ -576,12 +593,12 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to add table property: " + table, - e.execBean, exec); + e.execBean, exec); } } private Map tableProperties(Object extendedTable) { - if (!(extendedTable instanceof Map)) + if (! (extendedTable instanceof Map)) return null; Map m = (Map) extendedTable; Map tableInfo = (Map) m.get("tableInfo"); @@ -596,7 +613,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response listPartitions(String user, String db, String table) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "use " + db + "; "; exec += "show partitions " + table + "; "; try { @@ -607,7 +625,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to show partitions for table: " + table, - e.execBean, exec); + e.execBean, exec); } } @@ -617,7 +635,8 @@ public class HcatDelegator extends LauncherDelegator { public Response descOnePartition(String user, String db, String table, String partition) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = "use " + db + "; "; exec += "show table extended like " + table + " partition (" + partition + "); "; @@ -631,9 +650,9 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to show partition: " - + table + " " + partition, - e.execBean, - exec); + + table + " " + partition, + e.execBean, + exec); } } @@ -643,7 +662,8 @@ public class HcatDelegator extends LauncherDelegator { public Response addOnePartition(String user, String db, String table, PartitionDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; alter table %s add", db, table); if (desc.ifNotExists) exec += " if not exists"; @@ -655,11 +675,11 @@ public class HcatDelegator extends LauncherDelegator { String res = jsonRun(user, exec, desc.group, desc.permissions, true); if (res.indexOf("AlreadyExistsException") > -1) { return JsonBuilder.create(). - put("error", "Partition already exists") - .put("errorCode", "409") - .put("database", db) - .put("table", table) - .put("partition", desc.partition).build(); + put("error", "Partition already exists") + .put("errorCode", "409") + .put("database", db) + .put("table", table) + .put("partition", desc.partition).build(); } return JsonBuilder.create(res) .put("database", db) @@ -668,7 +688,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to add partition: " + desc, - e.execBean, exec); + e.execBean, exec); } } @@ -679,7 +699,8 @@ public class HcatDelegator extends LauncherDelegator { String table, String partition, boolean ifExists, String group, String permissions) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; alter table %s drop", db, table); if (ifExists) exec += " if exists"; @@ -694,7 +715,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to drop partition: " + partition, - e.execBean, exec); + e.execBean, exec); } } @@ -704,12 +725,13 @@ public class HcatDelegator extends LauncherDelegator { */ public Response listColumns(String user, String db, String table) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { try { return descTable(user, db, table, false); } catch (HcatException e) { throw new HcatException("unable to show columns for table: " + table, - e.execBean, e.statement); + e.execBean, e.statement); } } @@ -718,7 +740,8 @@ public class HcatDelegator extends LauncherDelegator { */ public Response descOneColumn(String user, String db, String table, String column) throws SimpleWebException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { Response res = listColumns(user, db, table); if (res.getStatus() != JsonBuilder.OK) return res; @@ -727,7 +750,7 @@ public class HcatDelegator extends LauncherDelegator { final Map fields = (o != null && (o instanceof Map)) ? (Map) o : null; if (fields == null) throw new SimpleWebException(500, "Internal error, unable to find column " - + column); + + column); List cols = (List) fields.get("columns"); @@ -742,11 +765,9 @@ public class HcatDelegator extends LauncherDelegator { } if (found == null) throw new SimpleWebException(500, "unable to find column " + column, - new HashMap() { - { - put("description", fields); - } - }); + new HashMap() {{ + put("description", fields); + }}); fields.remove("columns"); fields.put("column", found); return Response.fromResponse(res).entity(fields).build(); @@ -758,9 +779,10 @@ public class HcatDelegator extends LauncherDelegator { public Response addOneColumn(String user, String db, String table, ColumnDesc desc) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { String exec = String.format("use %s; alter table %s add columns (%s %s", - db, table, desc.name, desc.type); + db, table, desc.name, desc.type); if (TempletonUtils.isset(desc.comment)) exec += String.format(" comment '%s'", desc.comment); exec += ");"; @@ -773,7 +795,7 @@ public class HcatDelegator extends LauncherDelegator { .build(); } catch (HcatException e) { throw new HcatException("unable to add column: " + desc, - e.execBean, exec); + e.execBean, exec); } } @@ -806,10 +828,11 @@ public class HcatDelegator extends LauncherDelegator { String group, String permissions, boolean requireEmptyOutput) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { ExecBean res = run(user, exec, true, group, permissions); - if (!isValid(res, requireEmptyOutput)) + if (! isValid(res, requireEmptyOutput)) throw new HcatException("Failure calling hcat: " + exec, res, exec); return res.stdout; @@ -819,7 +842,8 @@ public class HcatDelegator extends LauncherDelegator { // permissions set. private String jsonRun(String user, String exec) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { return jsonRun(user, exec, null, null); } @@ -827,7 +851,8 @@ public class HcatDelegator extends LauncherDelegator { private String jsonRun(String user, String exec, String group, String permissions) throws HcatException, NotAuthorizedException, BusyException, - ExecuteException, IOException { + ExecuteException, IOException + { return jsonRun(user, exec, group, permissions, false); } } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java index d135b8b..2c8844a 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import org.apache.commons.exec.ExecuteException; +import org.apache.hadoop.util.Shell; import org.apache.hcatalog.templeton.tool.TempletonUtils; /** @@ -57,6 +58,7 @@ public class HiveDelegator extends LauncherDelegator { try { args.addAll(makeBasicArgs(execute, srcFile, statusdir, completedUrl)); args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.hivePath()); args.add("--service"); args.add("cli"); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java index 53a8ca6..9beca0c 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; - import org.apache.commons.exec.ExecuteException; import org.apache.hcatalog.templeton.tool.TempletonUtils; @@ -41,11 +40,12 @@ public class JarDelegator extends LauncherDelegator { List jarArgs, List defines, String statusdir, String callback, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException + { runAs = user; List args = makeArgs(jar, mainClass, - libjars, files, jarArgs, defines, - statusdir, completedUrl); + libjars, files, jarArgs, defines, + statusdir, completedUrl); return enqueueController(user, callback, args); } @@ -54,15 +54,17 @@ public class JarDelegator extends LauncherDelegator { String libjars, String files, List jarArgs, List defines, String statusdir, String completedUrl) - throws BadParam, IOException, InterruptedException { + throws BadParam, IOException, InterruptedException + { ArrayList args = new ArrayList(); try { ArrayList allFiles = new ArrayList(); allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs)); args.addAll(makeLauncherArgs(appConf, statusdir, - completedUrl, allFiles)); + completedUrl, allFiles)); args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.clusterHadoop()); args.add("jar"); args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName()); @@ -71,12 +73,12 @@ public class JarDelegator extends LauncherDelegator { if (TempletonUtils.isset(libjars)) { args.add("-libjars"); args.add(TempletonUtils.hadoopFsListAsString(libjars, appConf, - runAs)); + runAs)); } if (TempletonUtils.isset(files)) { args.add("-files"); args.add(TempletonUtils.hadoopFsListAsString(files, appConf, - runAs)); + runAs)); } for (String d : defines) diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java index 93ab238..7735ece 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; - +import java.util.Map; import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +49,8 @@ public class LauncherDelegator extends TempletonDelegator { } public void registerJob(String id, String user, String callback) - throws IOException { + throws IOException + { JobState state = null; try { state = new JobState(id, Main.getAppConfigInstance()); @@ -65,11 +66,12 @@ public class LauncherDelegator extends TempletonDelegator { * Enqueue the TempletonControllerJob directly calling doAs. */ public EnqueueBean enqueueController(String user, String callback, - List args) + List args) throws NotAuthorizedException, BusyException, ExecuteException, - IOException, QueueException { + IOException, QueueException + { try { - UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation ugi = UgiFactory.getUgi(user); final long startTime = System.nanoTime(); @@ -90,22 +92,25 @@ public class LauncherDelegator extends TempletonDelegator { } private String queueAsUser(UserGroupInformation ugi, final List args) - throws IOException, InterruptedException { + throws IOException, InterruptedException + { + LOG.debug("Running Toolrunner with args: " + args); String id = ugi.doAs(new PrivilegedExceptionAction() { - public String run() throws Exception { - String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(); - ToolRunner.run(ctrl, args.toArray(array)); - return ctrl.getSubmittedId(); - } - }); + public String run() throws Exception { + String[] array = new String[args.size()]; + TempletonControllerJob ctrl = new TempletonControllerJob(); + ToolRunner.run(ctrl, args.toArray(array)); + return ctrl.getSubmittedId(); + } + }); return id; } public List makeLauncherArgs(AppConfig appConf, String statusdir, String completedUrl, - List copyFiles) { + List copyFiles) + { ArrayList args = new ArrayList(); args.add("-libjars"); @@ -119,9 +124,9 @@ public class LauncherDelegator extends TempletonDelegator { // Internal vars addDef(args, TempletonControllerJob.STATUSDIR_NAME, statusdir); addDef(args, TempletonControllerJob.COPY_NAME, - TempletonUtils.encodeArray(copyFiles)); + TempletonUtils.encodeArray(copyFiles)); addDef(args, TempletonControllerJob.OVERRIDE_CLASSPATH, - makeOverrideClasspath(appConf)); + makeOverrideClasspath(appConf)); // Job vars addStorageVars(args); @@ -133,21 +138,21 @@ public class LauncherDelegator extends TempletonDelegator { // Storage vars private void addStorageVars(List args) { addDef(args, TempletonStorage.STORAGE_CLASS, - appConf.get(TempletonStorage.STORAGE_CLASS)); + appConf.get(TempletonStorage.STORAGE_CLASS)); addDef(args, TempletonStorage.STORAGE_ROOT, - appConf.get(TempletonStorage.STORAGE_ROOT)); + appConf.get(TempletonStorage.STORAGE_ROOT)); addDef(args, ZooKeeperStorage.ZK_HOSTS, - appConf.get(ZooKeeperStorage.ZK_HOSTS)); + appConf.get(ZooKeeperStorage.ZK_HOSTS)); addDef(args, ZooKeeperStorage.ZK_SESSION_TIMEOUT, - appConf.get(ZooKeeperStorage.ZK_SESSION_TIMEOUT)); + appConf.get(ZooKeeperStorage.ZK_SESSION_TIMEOUT)); } // Completion notifier vars private void addCompletionVars(List args, String completedUrl) { addDef(args, AppConfig.HADOOP_END_RETRY_NAME, - appConf.get(AppConfig.CALLBACK_RETRY_NAME)); + appConf.get(AppConfig.CALLBACK_RETRY_NAME)); addDef(args, AppConfig.HADOOP_END_INTERVAL_NAME, - appConf.get(AppConfig.CALLBACK_INTERVAL_NAME)); + appConf.get(AppConfig.CALLBACK_INTERVAL_NAME)); addDef(args, AppConfig.HADOOP_END_URL_NAME, completedUrl); } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java index 0e6bae4..d9ef578 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,7 +23,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - import org.apache.commons.exec.ExecuteException; import org.apache.hcatalog.templeton.tool.TempletonUtils; @@ -42,11 +41,12 @@ public class PigDelegator extends LauncherDelegator { List pigArgs, String otherFiles, String statusdir, String callback, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException + { runAs = user; List args = makeArgs(execute, - srcFile, pigArgs, - otherFiles, statusdir, completedUrl); + srcFile, pigArgs, + otherFiles, statusdir, completedUrl); return enqueueController(user, callback, args); } @@ -54,15 +54,17 @@ public class PigDelegator extends LauncherDelegator { private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, String completedUrl) - throws BadParam, IOException, InterruptedException { + throws BadParam, IOException, InterruptedException + { ArrayList args = new ArrayList(); try { ArrayList allFiles = new ArrayList(); if (TempletonUtils.isset(srcFile)) allFiles.add(TempletonUtils.hadoopFsFilename - (srcFile, appConf, runAs)); + (srcFile, appConf, runAs)); if (TempletonUtils.isset(otherFiles)) { - String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); + String[] ofs = TempletonUtils.hadoopFsListAsArray + (otherFiles, appConf, runAs); allFiles.addAll(Arrays.asList(ofs)); } @@ -71,6 +73,7 @@ public class PigDelegator extends LauncherDelegator { args.add(appConf.pigArchive()); args.add("--"); + TempletonUtils.addCmdForWindows(args); args.add(appConf.pigPath()); args.addAll(pigArgs); if (TempletonUtils.isset(execute)) { @@ -79,7 +82,7 @@ public class PigDelegator extends LauncherDelegator { } else if (TempletonUtils.isset(srcFile)) { args.add("-file"); args.add(TempletonUtils.hadoopFsPath(srcFile, appConf, runAs) - .getName()); + .getName()); } } catch (FileNotFoundException e) { throw new BadParam(e.getMessage()); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java index 77ce4d4..cf6eb66 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -49,29 +49,27 @@ public class SecureProxySupport { public SecureProxySupport() { isEnabled = UserGroupInformation.isSecurityEnabled(); + LOG.debug("Hadoop security is " + (isEnabled ? "" : "not ") + "enabled"); } private static final Log LOG = LogFactory.getLog(SecureProxySupport.class); - + /** * The file where we store the auth token */ - public Path getTokenPath() { - return (tokenPath); - } + public Path getTokenPath() { return( tokenPath ); } /** * The token to pass to hcat. */ - public String getHcatServiceStr() { - return (HCAT_SERVICE); - } + public String getHcatServiceStr() { return( HCAT_SERVICE ); } /** * Create the delegation token. */ public Path open(String user, Configuration conf) - throws IOException, InterruptedException { + throws IOException, InterruptedException + { close(); if (isEnabled) { this.user = user; @@ -88,7 +86,7 @@ public class SecureProxySupport { msToken.decodeFromUrlString(hcatTokenStr); msToken.setService(new Text(HCAT_SERVICE)); writeProxyDelegationTokens(fsToken, msToken, conf, user, tokenPath); - + } return tokenPath; } @@ -109,7 +107,7 @@ public class SecureProxySupport { public void addEnv(Map env) { if (isEnabled) { env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, - getTokenPath().toUri().getPath()); + getTokenPath().toUri().getPath()); } } @@ -121,17 +119,18 @@ public class SecureProxySupport { args.add("-D"); args.add("hive.metastore.token.signature=" + getHcatServiceStr()); args.add("-D"); - args.add("proxy.user.name=" + user); + args.add("proxy.user.name=" + user); } } - - class TokenWrapper { + + class TokenWrapper { Token token; } private Token getFSDelegationToken(String user, - final Configuration conf) - throws IOException, InterruptedException { + final Configuration conf) + throws IOException, InterruptedException + { LOG.info("user: " + user + " loginUser: " + UserGroupInformation.getLoginUser().getUserName()); final UserGroupInformation ugi = UgiFactory.getUgi(user); @@ -139,7 +138,7 @@ public class SecureProxySupport { ugi.doAs(new PrivilegedExceptionAction() { public Object run() throws IOException { FileSystem fs = FileSystem.get(conf); - twrapper.token = fs.getDelegationToken(ugi.getShortUserName()); + twrapper.token = fs.getDelegationToken(ugi.getShortUserName()); return null; } }); @@ -148,43 +147,45 @@ public class SecureProxySupport { } private void writeProxyDelegationTokens(final Token fsToken, - final Token msToken, - final Configuration conf, - String user, - final Path tokenPath) - throws IOException, InterruptedException { - - + final Token msToken, + final Configuration conf, + String user, + final Path tokenPath) + throws IOException, InterruptedException{ + + LOG.info("user: " + user + " loginUser: " + UserGroupInformation.getLoginUser().getUserName()); - final UserGroupInformation ugi = UgiFactory.getUgi(user); - + final UserGroupInformation ugi = UgiFactory.getUgi(user); + ugi.doAs(new PrivilegedExceptionAction() { - public Object run() throws IOException { - Credentials cred = new Credentials(); - cred.addToken(fsToken.getService(), fsToken); - cred.addToken(msToken.getService(), msToken); - cred.writeTokenStorageFile(tokenPath, conf); - return null; - } - }); - + public Object run() throws IOException { + Credentials cred = new Credentials(); + cred.addToken(fsToken.getService(), fsToken); + cred.addToken(msToken.getService(), msToken); + cred.writeTokenStorageFile(tokenPath, conf); + return null; + } + }); + } - + private String buildHcatDelegationToken(String user) - throws IOException, InterruptedException, MetaException, TException { + throws IOException, InterruptedException, MetaException, TException + { HiveConf c = new HiveConf(); final HiveMetaStoreClient client = new HiveMetaStoreClient(c); LOG.info("user: " + user + " loginUser: " + UserGroupInformation.getLoginUser().getUserName()); final TokenWrapper twrapper = new TokenWrapper(); final UserGroupInformation ugi = UgiFactory.getUgi(user); String s = ugi.doAs(new PrivilegedExceptionAction() { - public String run() - throws IOException, MetaException, TException { - String u = ugi.getUserName(); - return client.getDelegationToken(u); - } - }); + public String run() + throws IOException, MetaException, TException + { + String u = ugi.getUserName(); + return client.getDelegationToken(u); + } + }); return s; } } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index e839e03..dbdc84c 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,8 +23,10 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hcatalog.templeton.SecureProxySupport; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; /** @@ -66,32 +69,32 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden * in hdfs files. */ public class TempletonControllerJob extends Configured implements Tool { - static enum ControllerCounters {SIMPLE_COUNTER} - - ; + static enum ControllerCounters { SIMPLE_COUNTER }; - public static final String COPY_NAME = "templeton.copy"; + public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; - public static final String JAR_ARGS_NAME = "templeton.args"; + public static final String JAR_ARGS_NAME = "templeton.args"; public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; - public static final String STDOUT_FNAME = "stdout"; - public static final String STDERR_FNAME = "stderr"; - public static final String EXIT_FNAME = "exit"; + public static final String STDOUT_FNAME = "stdout"; + public static final String STDERR_FNAME = "stderr"; + public static final String EXIT_FNAME = "exit"; public static final int WATCHER_TIMEOUT_SECS = 10; - public static final int KEEP_ALIVE_MSEC = 60 * 1000; + public static final int KEEP_ALIVE_MSEC = 60 * 1000; private static TrivialExecService execService = TrivialExecService.getInstance(); private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); - - + + public static class LaunchMapper - extends Mapper { + extends Mapper + { protected Process startJob(Context context, String user, String overrideClasspath) - throws IOException, InterruptedException { + throws IOException, InterruptedException + { Configuration conf = context.getConfiguration(); copyLocal(COPY_NAME, conf); String[] jarArgs @@ -99,23 +102,22 @@ public class TempletonControllerJob extends Configured implements Tool { ArrayList removeEnv = new ArrayList(); removeEnv.add("HADOOP_ROOT_LOGGER"); + removeEnv.add("hadoop-command"); + removeEnv.add("CLASS"); + removeEnv.add("mapredcommand"); Map env = TempletonUtils.hadoopUserEnv(user, - overrideClasspath); + overrideClasspath); List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); - if (tokenFile != null) { - /* - * The magic number 3 comes from the fact that the -D option can - * be only after the jar command line option but before other - * options. - */ - jarArgsList.add(3, "-Dmapreduce.job.credentials.binary=" + tokenFile); + if(tokenFile != null){ + jarArgsList.add(3, "-Dmapreduce.job.credentials.binary=" + tokenFile ); } return execService.run(jarArgsList, removeEnv, env); } private void copyLocal(String var, Configuration conf) - throws IOException { + throws IOException + { String[] filenames = TempletonUtils.decodeArray(conf.get(var)); if (filenames != null) { for (String filename : filenames) { @@ -130,30 +132,32 @@ public class TempletonControllerJob extends Configured implements Tool { @Override public void run(Context context) - throws IOException, InterruptedException { + throws IOException, InterruptedException + { Configuration conf = context.getConfiguration(); Process proc = startJob(context, - conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); + conf.get("user.name"), + conf.get(OVERRIDE_CLASSPATH)); + System.err.println("job started"); String statusdir = conf.get(STATUSDIR_NAME); Counter cnt = context.getCounter(ControllerCounters.SIMPLE_COUNTER); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), - proc.getInputStream(), statusdir, STDOUT_FNAME); + proc.getInputStream(), statusdir, STDOUT_FNAME); executeWatcher(pool, conf, context.getJobID(), - proc.getErrorStream(), statusdir, STDERR_FNAME); + proc.getErrorStream(), statusdir, STDERR_FNAME); KeepAlive keepAlive = startCounterKeepAlive(pool, cnt); - + System.err.println("job watchers created"); proc.waitFor(); keepAlive.sendReport = false; pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) + if (! pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) pool.shutdownNow(); - + System.err.println("job done"); writeExitValue(conf, proc.exitValue(), statusdir); JobState state = new JobState(context.getJobID().toString(), conf); state.setExitValue(proc.exitValue()); @@ -162,7 +166,7 @@ public class TempletonControllerJob extends Configured implements Tool { if (proc.exitValue() != 0) System.err.println("templeton: job failed with exit code " - + proc.exitValue()); + + proc.exitValue()); else System.err.println("templeton: job completed with exit code 0"); } @@ -170,26 +174,29 @@ public class TempletonControllerJob extends Configured implements Tool { private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in, String statusdir, String name) - throws IOException { + throws IOException + { Watcher w = new Watcher(conf, jobid, in, statusdir, name); pool.execute(w); } private KeepAlive startCounterKeepAlive(ExecutorService pool, Counter cnt) - throws IOException { + throws IOException + { KeepAlive k = new KeepAlive(cnt); pool.execute(k); return k; } private void writeExitValue(Configuration conf, int exitValue, String statusdir) - throws IOException { + throws IOException + { if (TempletonUtils.isset(statusdir)) { Path p = new Path(statusdir, EXIT_FNAME); FileSystem fs = p.getFileSystem(conf); OutputStream out = fs.create(p); System.err.println("templeton: Writing exit value " - + exitValue + " to " + p); + + exitValue + " to " + p); PrintWriter writer = new PrintWriter(out); writer.println(exitValue); writer.close(); @@ -205,7 +212,8 @@ public class TempletonControllerJob extends Configured implements Tool { public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name) - throws IOException { + throws IOException + { this.conf = conf; this.jobid = jobid; this.in = in; @@ -265,7 +273,8 @@ public class TempletonControllerJob extends Configured implements Tool { private Counter cnt; public boolean sendReport; - public KeepAlive(Counter cnt) { + public KeepAlive(Counter cnt) + { this.cnt = cnt; this.sendReport = true; } @@ -284,7 +293,6 @@ public class TempletonControllerJob extends Configured implements Tool { } private JobID submittedJobId; - public String getSubmittedId() { if (submittedJobId == null) return null; @@ -297,7 +305,8 @@ public class TempletonControllerJob extends Configured implements Tool { */ @Override public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException { + throws IOException, InterruptedException, ClassNotFoundException + { Configuration conf = getConf(); conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName()); @@ -312,9 +321,9 @@ public class TempletonControllerJob extends Configured implements Tool { = new NullOutputFormat(); job.setOutputFormatClass(of.getClass()); job.setNumReduceTasks(0); - + JobClient jc = new JobClient(new JobConf(job.getConfiguration())); - + Token mrdt = jc.getDelegationToken(new Text("mr token")); job.getCredentials().addToken(new Text("mr token"), mrdt); job.submit(); @@ -324,7 +333,7 @@ public class TempletonControllerJob extends Configured implements Tool { return 0; } - + public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new TempletonControllerJob(), args); if (ret != 0) diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java index 054e9d4..79a4443 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,22 +20,26 @@ package org.apache.hcatalog.templeton.tool; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; /** @@ -68,14 +72,14 @@ public class TempletonUtils { * Is the object non-empty? */ public static boolean isset(Collection col) { - return (col != null) && (!col.isEmpty()); + return (col != null) && (! col.isEmpty()); } /** * Is the object non-empty? */ public static boolean isset(Map col) { - return (col != null) && (!col.isEmpty()); + return (col != null) && (! col.isEmpty()); } @@ -163,7 +167,8 @@ public class TempletonUtils { public static String[] hadoopFsListAsArray(String files, Configuration conf, String user) throws URISyntaxException, FileNotFoundException, IOException, - InterruptedException { + InterruptedException + { if (files == null || conf == null) { return null; } @@ -179,7 +184,8 @@ public class TempletonUtils { public static String hadoopFsListAsString(String files, Configuration conf, String user) throws URISyntaxException, FileNotFoundException, IOException, - InterruptedException { + InterruptedException + { if (files == null || conf == null) { return null; } @@ -188,7 +194,8 @@ public class TempletonUtils { public static String hadoopFsFilename(String fname, Configuration conf, String user) throws URISyntaxException, FileNotFoundException, IOException, - InterruptedException { + InterruptedException + { Path p = hadoopFsPath(fname, conf, user); if (p == null) return null; @@ -201,8 +208,8 @@ public class TempletonUtils { */ public static boolean hadoopFsIsMissing(FileSystem fs, Path p) { try { - return !fs.exists(p); - } catch (Throwable t) { + return ! fs.exists(p); + } catch(Throwable t) { // Got an error, might be there anyway due to a // permissions problem. return false; @@ -211,7 +218,8 @@ public class TempletonUtils { public static Path hadoopFsPath(String fname, Configuration conf, String user) throws URISyntaxException, FileNotFoundException, IOException, - InterruptedException { + InterruptedException + { if (fname == null || conf == null) { return null; } @@ -221,13 +229,13 @@ public class TempletonUtils { UserGroupInformation ugi = UserGroupInformation.getLoginUser(); final FileSystem defaultFs = - ugi.doAs(new PrivilegedExceptionAction() { - public FileSystem run() - throws URISyntaxException, FileNotFoundException, IOException, - InterruptedException { - return FileSystem.get(new URI(finalFName), fConf); - } - }); + ugi.doAs(new PrivilegedExceptionAction() { + public FileSystem run() + throws URISyntaxException, FileNotFoundException, IOException, + InterruptedException { + return FileSystem.get(new URI(finalFName), fConf); + } + }); URI u = new URI(fname); Path p = new Path(u).makeQualified(defaultFs); @@ -242,7 +250,8 @@ public class TempletonUtils { * GET the given url. Returns the number of bytes received. */ public static int fetchUrl(URL url) - throws IOException { + throws IOException + { URLConnection cnx = url.openConnection(); InputStream in = cnx.getInputStream(); @@ -259,7 +268,8 @@ public class TempletonUtils { * Set the environment variables to specify the hadoop user. */ public static Map hadoopUserEnv(String user, - String overrideClasspath) { + String overrideClasspath) + { HashMap env = new HashMap(); env.put("HADOOP_USER_NAME", user); @@ -273,4 +283,12 @@ public class TempletonUtils { return env; } + + public static void addCmdForWindows(ArrayList args) { + if(Shell.WINDOWS){ + args.add("cmd"); + args.add("/c"); + args.add("call"); + } + } } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java index 36759b3..5174e4c 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TrivialExecService.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,13 +21,16 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * Execute a local program. This is a singleton service that will * execute a programs on the local box. */ public class TrivialExecService { private static volatile TrivialExecService theSingleton; - + private static final Log LOG = LogFactory.getLog(TrivialExecService.class); /** * Retrieve the singleton. */ @@ -39,12 +42,12 @@ public class TrivialExecService { public Process run(List cmd, List removeEnv, Map environmentVariables) - throws IOException { - System.err.println("templeton: starting " + cmd); - System.err.print("With environment variables: "); - for (Map.Entry keyVal : environmentVariables.entrySet()) { - System.err.println(keyVal.getKey() + "=" + keyVal.getValue()); - } + throws IOException + { + + logDebugCmd(cmd, environmentVariables); + + ProcessBuilder pb = new ProcessBuilder(cmd); for (String key : removeEnv) pb.environment().remove(key); @@ -52,4 +55,22 @@ public class TrivialExecService { return pb.start(); } + private void logDebugCmd(List cmd, + Map environmentVariables) { + if(!LOG.isDebugEnabled()){ + return; + } + LOG.debug("starting " + cmd); + LOG.debug("With environment variables: " ); + for(Map.Entry keyVal : environmentVariables.entrySet()){ + LOG.debug(keyVal.getKey() + "=" + keyVal.getValue()); + } + LOG.debug("With environment variables already set: " ); + Map env = System.getenv(); + for (String envName : env.keySet()) { + LOG.debug(envName + "=" + env.get(envName)); + } + + } + }