diff --git hcatalog/src/test/e2e/templeton/README.txt hcatalog/src/test/e2e/templeton/README.txt index a3a6822..a52a946 100644 --- hcatalog/src/test/e2e/templeton/README.txt +++ hcatalog/src/test/e2e/templeton/README.txt @@ -151,6 +151,27 @@ To do this 3 properties in hive-site.xml should be configured: org.apache.hadoop.hive.ql.security.authorization.AuthorizationPreEventListener 4) hive.metastore.execute.setugi set to true +Running Sqoop jobsubmission tests +--------------------------------- +ant clean test -Dinpdir.hdfs= -Ddb.connection.string= + -Ddb.user.name= -Ddb.password= -Dtest.user.name= + -Dharness.webhdfs.url= -Dharness.templeton.url= + -Dtests.to.run=-t TestSqoop + +In order to run Sqoop jobsubmission tests, a RDBMS like MySQL or SQL server should be installed. Also since +Sqoop export command require table already exists in the database, a table "PERSON" need to be created under +the default database of the RDBMS installed. + +Here is the schema of the table writen in MySQL: + CREATE TABLE `world`.`person` ( + `id` INT NOT NULL, + `name` VARCHAR(45) NULL, + `occupation` VARCHAR(45) NULL, + PRIMARY KEY (`id`)); + +To prevent primary key violation and sqoop import directory conflict, make sure the "PERSION" table is empty +and the folder hdfs://hostname:8020/sqoopoutputdir doesn't exist before running the test. + Notes ----- diff --git hcatalog/src/test/e2e/templeton/build.xml hcatalog/src/test/e2e/templeton/build.xml index 17cfd67..147a6c8 100644 --- hcatalog/src/test/e2e/templeton/build.xml +++ hcatalog/src/test/e2e/templeton/build.xml @@ -98,6 +98,9 @@ + + + @@ -131,6 +134,9 @@ + + + @@ -171,6 +177,9 @@ + + + diff --git hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm index 545e041..9c73048 100644 --- hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm +++ hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm @@ -188,6 +188,9 @@ sub globalSetup $globalHash->{'inpdir_local'} = $ENV{'TH_INPDIR_LOCAL'}; $globalHash->{'inpdir_hdfs'} = $ENV{'TH_INPDIR_HDFS'}; + $globalHash->{'db_connection_string'} = $ENV{'DB_CONNECTION_STRING'}; + $globalHash->{'db_user_name'} = $ENV{'DB_USER_NAME'}; + $globalHash->{'db_password'} = $ENV{'DB_PASSWORD'}; $globalHash->{'is_secure_mode'} = $ENV{'SECURE_MODE'}; @@ -355,6 +358,9 @@ sub replaceParametersInArg $arg =~ s/:OUTDIR:/$outdir/g; $arg =~ s/:INPDIR_HDFS:/$testCmd->{'inpdir_hdfs'}/g; $arg =~ s/:INPDIR_LOCAL:/$testCmd->{'inpdir_local'}/g; + $arg =~ s/:DB_CONNECTION_STRING:/$testCmd->{'db_connection_string'}/g; + $arg =~ s/:DB_USER_NAME:/$testCmd->{'db_user_name'}/g; + $arg =~ s/:DB_PASSWORD:/$testCmd->{'db_password'}/g; $arg =~ s/:TNUM:/$testCmd->{'num'}/g; return $arg; } diff --git hcatalog/src/test/e2e/templeton/inpdir/sqoop/person.txt hcatalog/src/test/e2e/templeton/inpdir/sqoop/person.txt new file mode 100644 index 0000000..930ae71 --- /dev/null +++ hcatalog/src/test/e2e/templeton/inpdir/sqoop/person.txt @@ -0,0 +1,3 @@ +1,Jason,Doctor +2,David,Engineer +3,John,Teacher \ No newline at end of file diff --git hcatalog/src/test/e2e/templeton/inpdir/sqoopcommand.txt hcatalog/src/test/e2e/templeton/inpdir/sqoopcommand.txt new file mode 100644 index 0000000..61bd0c4 --- /dev/null +++ hcatalog/src/test/e2e/templeton/inpdir/sqoopcommand.txt @@ -0,0 +1,4 @@ +--table +person +--target-dir +/sqoopoutputdir \ No newline at end of file diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf index a8a959d..0f6f72e 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf @@ -218,8 +218,8 @@ $cfg = 'status_code' => 400, }, - - { + + { #Auto add quote around args 'ignore' => 'MS9 feature, will reenable later', 'num' => 9, @@ -383,7 +383,7 @@ $cfg = #test select a,b 'num' => 7, 'method' => 'POST', - 'url' => ':TEMPLETON_URL:/templeton/v1/hive', + 'url' => ':TEMPLETON_URL:/templeton/v1/hive', 'post_options' => ['user.name=:UNAME:','execute=select count(*) from mynums', ], 'json_field_substr_match' => { 'id' => '\d+'}, #results @@ -477,7 +477,7 @@ $cfg = 'num' => 13, 'ignore23' => 'Log collector does not work with Hadoop 2', 'method' => 'POST', - 'url' => ':TEMPLETON_URL:/templeton/v1/hive', + 'url' => ':TEMPLETON_URL:/templeton/v1/hive', 'post_options' => ['user.name=:UNAME:','execute=select a,b from mynums', 'statusdir=:OUTDIR:/status', 'enablelog=true'], 'json_field_substr_match' => { 'id' => '\d+'}, #results @@ -490,6 +490,43 @@ $cfg = }, ] }, +##============================================================================================================= + { + 'name' => 'TestSqoop', + 'tests' => + [ + { + # Test Sqoop Export + 'num' => 1, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/sqoop', + 'post_options' => ['user.name=:UNAME:','command=export --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --export-dir :INPDIR_HDFS:/sqoop --table person','statusdir=sqoop.output' ], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => 'map 100% reduce 0%', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + }, + { + # Test Sqoop Import and option file + 'num' => 2, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/sqoop', + 'post_options' => ['user.name=:UNAME:','files=:INPDIR_HDFS:/sqoopcommand.txt','command=import --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --options-file sqoopcommand.txt','statusdir=sqoop.output' ], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_percent_complete' => 'map 100% reduce 0%', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + }, + ] + }, ] }, ; diff --git hcatalog/webhcat/svr/src/main/config/webhcat-default.xml hcatalog/webhcat/svr/src/main/config/webhcat-default.xml index c9af3b6..4bcc43b 100644 --- hcatalog/webhcat/svr/src/main/config/webhcat-default.xml +++ hcatalog/webhcat/svr/src/main/config/webhcat-default.xml @@ -141,6 +141,12 @@ + templeton.sqoop.path + ${env.SQOOP_HOME}/bin/sqoop.cmd + The path to the Sqoop executable. + + + templeton.exec.encoding UTF-8 The encoding of the stdout and stderr data. diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index d49c70c..8176a1b 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -106,6 +106,8 @@ */ public static final String HCAT_HOME_PATH = "templeton.hcat.home"; public static final String HIVE_PROPS_NAME = "templeton.hive.properties"; + public static final String SQOOP_ARCHIVE_NAME = "templeton.sqoop.archive"; + public static final String SQOOP_PATH_NAME = "templeton.sqoop.path"; public static final String LIB_JARS_NAME = "templeton.libjars"; public static final String PIG_ARCHIVE_NAME = "templeton.pig.archive"; public static final String PIG_PATH_NAME = "templeton.pig.path"; @@ -241,6 +243,8 @@ private boolean loadOneClasspathConfig(String fname) { public String pigArchive() { return get(PIG_ARCHIVE_NAME); } public String hivePath() { return get(HIVE_PATH_NAME); } public String hiveArchive() { return get(HIVE_ARCHIVE_NAME); } + public String sqoopPath() { return get(SQOOP_PATH_NAME); } + public String sqoopArchive() { return get(SQOOP_ARCHIVE_NAME); } public String streamingJar() { return get(STREAMING_JAR_NAME); } public String kerberosSecret() { return get(KERBEROS_SECRET); } public String kerberosPrincipal(){ return get(KERBEROS_PRINCIPAL); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 854aa99..04a5c6f 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -44,7 +44,7 @@ public class LauncherDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(LauncherDelegator.class); protected String runAs = null; - static public enum JobType {JAR, STREAMING, PIG, HIVE} + static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; public LauncherDelegator(AppConfig appConf) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java index b481ac5..d1f45f3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java @@ -182,6 +182,8 @@ public Server runServer(int port) FilterMapping.REQUEST); root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", + FilterMapping.REQUEST); root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", FilterMapping.REQUEST); root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index c6a3429..9a22ecf 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -762,6 +762,46 @@ public EnqueueBean pig(@FormParam("execute") String execute, statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog); } + /** + * Run a Sqoop job. + * @param optionsFile name of option file which contains Sqoop command to run + * @param otherFiles additional files to be shipped to the launcher, such as option + files which contain part of the Sqoop command + * @param statusdir where the stderr/stdout of templeton controller job goes + * @param callback URL which WebHCat will call when the sqoop job finishes + * @param enablelog whether to collect mapreduce log into statusdir/logs + */ + @POST + @Path("sqoop") + @Produces({MediaType.APPLICATION_JSON}) + public EnqueueBean sqoop(@FormParam("command") String command, + @FormParam("file") String optionsFile, + @FormParam("files") String otherFiles, + @FormParam("statusdir") String statusdir, + @FormParam("callback") String callback, + @FormParam("enablelog") boolean enablelog) + throws NotAuthorizedException, BusyException, BadParam, QueueException, + ExecuteException, IOException, InterruptedException { + verifyUser(); + if (command == null && optionsFile == null) + throw new BadParam("Must define Sqoop command or a file contains Sqoop command to run Sqoop job"); + if (enablelog == true && !TempletonUtils.isset(statusdir)) + throw new BadParam("enablelog is only applicable when statusdir is set"); + + //add all function arguments to a map + Map userArgs = new HashMap(); + userArgs.put("user.name", getDoAsUser()); + userArgs.put("command", command); + userArgs.put("file", optionsFile); + userArgs.put("files", otherFiles); + userArgs.put("statusdir", statusdir); + userArgs.put("callback", callback); + userArgs.put("enablelog", Boolean.toString(enablelog)); + SqoopDelegator d = new SqoopDelegator(appConf); + return d.run(getDoAsUser(), userArgs, command, optionsFile, otherFiles, + statusdir, callback, getCompletedUrl(), enablelog); + } + /** * Run a Hive job. * @param execute SQL statement to run, equivalent to "-e" from hive command line diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java new file mode 100644 index 0000000..645acb3 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.exec.ExecuteException; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; +import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; + +/** + * Submit a Sqoop job. + * + * This is the backend of the Sqoop web service. + */ +public class SqoopDelegator extends LauncherDelegator { + + public SqoopDelegator(AppConfig appConf) { + super(appConf); + } + + public EnqueueBean run(String user, + Map userArgs, String command, + String optionsFile, String otherFiles, String statusdir, + String callback, String completedUrl, boolean enablelog) + throws NotAuthorizedException, BadParam, BusyException, QueueException, + ExecuteException, IOException, InterruptedException + { + runAs = user; + List args = makeArgs(command, optionsFile, otherFiles, statusdir, + completedUrl, enablelog); + + return enqueueController(user, userArgs, callback, args); + } + + List makeArgs(String command, String optionsFile, String otherFiles, + String statusdir, String completedUrl, boolean enablelog) + throws BadParam, IOException, InterruptedException + { + ArrayList args = new ArrayList(); + try { + args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog)); + args.add("--"); + TempletonUtils.addCmdForWindows(args); + args.add(appConf.sqoopPath()); + if (TempletonUtils.isset(command)) { + String[] temArgs = command.split(" "); + for (int i = 0; i < temArgs.length; i++) { + args.add(TempletonUtils.quoteForWindows(temArgs[i])); + //the token file location should be right after the tool argument + if (i == 0 && !temArgs[i].startsWith("--")) { + args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + } + } + } else if (TempletonUtils.isset(optionsFile)) { + args.add("--options-file"); + args.add(TempletonUtils.hadoopFsPath(optionsFile, appConf, runAs) + .getName()); + } + } catch (FileNotFoundException e) { + throw new BadParam(e.getMessage()); + } catch (URISyntaxException e) { + throw new BadParam(e.getMessage()); + } + return args; + } + + private List makeBasicArgs(String optionsFile, String otherFiles, + String statusdir, String completedUrl, boolean enablelog) + throws URISyntaxException, FileNotFoundException, IOException, + InterruptedException + { + ArrayList args = new ArrayList(); + ArrayList allFiles = new ArrayList(); + if (TempletonUtils.isset(optionsFile)) + allFiles.add(TempletonUtils.hadoopFsFilename(optionsFile, appConf, + runAs)); + if (TempletonUtils.isset(otherFiles)) { + String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); + allFiles.addAll(Arrays.asList(ofs)); + } + args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, + enablelog, JobType.SQOOP)); + if (appConf.sqoopArchive() != null && !appConf.sqoopArchive().equals("")) + { + args.add("-archives"); + args.add(appConf.sqoopArchive()); + } + + return args; + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LogRetriever.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LogRetriever.java index a5821b2..6dc27f4 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LogRetriever.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LogRetriever.java @@ -127,6 +127,7 @@ public void run() throws IOException { case HIVE: jobIDParser = new HiveJobIDParser(statusDir, conf); break; + case SQOOP: case JAR: case STREAMING: jobIDParser = new JarJobIDParser(statusDir, conf); @@ -134,7 +135,7 @@ public void run() throws IOException { default: System.err .println("Unknown job type:" + jobType!=null? jobType.toString():"null" - + ", only pig/hive/jar/streaming are supported, skip logs"); + + ", only pig/hive/jar/streaming/sqoop are supported, skip logs"); return; } List jobs = new ArrayList();