diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 54d0907..8c8f9b6 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -110,6 +110,12 @@ public static final String MAPPER_MEMORY_MB = "templeton.mapper.memory.mb"; public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb"; public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder"; + public static final String EXEC_MAX_JOB_SUBMIT_PROCS = "templeton.exec.job.submit.max-procs"; + public static final String EXEC_MAX_JOB_STATUS_PROCS = "templeton.exec.job.status.max-procs"; + public static final String EXEC_MAX_JOB_LIST_PROCS = "templeton.exec.job.list.max-procs"; + public static final String EXEC_JOB_SUBMIT_TIMEOUT = "templeton.exec.job.submit.timeout"; + public static final String EXEC_JOB_STATUS_TIMEOUT = "templeton.exec.job.status.timeout"; + public static final String EXEC_JOB_LIST_TIMEOUT = "templeton.exec.job.list.timeout"; /** * see webhcat-default.xml diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java index 64261fa..1f05516 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java @@ -25,6 +25,10 @@ */ public class BusyException extends SimpleWebException { public BusyException() { - super(HttpStatus.SERVICE_UNAVAILABLE_503, "Busy, please retry"); + this("Busy, please retry"); + } + + public BusyException(String msg) { + super(HttpStatus.SERVICE_UNAVAILABLE_503, msg); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f0296cb..bd3efdf 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -49,7 +49,7 @@ public EnqueueBean run(String user, Map userArgs, String statusdir, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 84cd5b9..07f4e76 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -46,7 +46,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java new file mode 100644 index 0000000..e1e24b7 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class JobRequestExecutor { + private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class); + private static AppConfig appConf = Main.getAppConfigInstance(); + + /* + * Thread pool to execute job requests. + */ + private ExecutorService executePool = null; + + /* + * Thread pool to execute callable actions on job request time out. + */ + private ExecutorService timeoutPool = null; + + /* + * Type of job request. + */ + private JobRequestType requestType; + + /* + * Config name used to find the number of concurrent requests. + */ + private String configName; + + /* + * Config name used to find the maximum time job request can be executed. + */ + private String timeoutConfigName; + + /* + * Job request execution time out in milliseconds. If it is 0 then request + * will not be timed out. + */ + private int requestExecutionTimeout = 0; + + /* + * Job Request type. + */ + public enum JobRequestType { + Submit, + Status, + List + } + + /* + * Prevent default constructor to trigger. + */ + private JobRequestExecutor() { + } + + /* + * Creates a job request object and sets up execution environment. Creates thread pool + * to execute job requests and time out pool to execute tasks on time out. + * + * @param requestType + * Job request type + * + * @param configName + * Config name to be used to extract number of maximum active requests to be serviced. + * + * @param timeoutConfigName + * Config name to be used to extract maximum time a task can execute a request. + */ + public JobRequestExecutor(JobRequestType requestType, String configName, String timeoutConfigName) { + this.configName = configName; + + /* + * The default number of threads will be 0. That means thread pool is not used and + * operation is executed within the current thread. + */ + int threads = !StringUtils.isEmpty(configName) ? appConf.getInt(configName, 0) : 0; + + if (threads > 0) { + /* + * Create a thread pool with no queue wait time to execute the operation. This will ensure + * that job requests are rejected if there are already maximum number of threads busy. + */ + this.executePool = new ThreadPoolExecutor(threads, threads, 0, + TimeUnit.SECONDS, new SynchronousQueue(), + new JobOperationThreadFactory("Job " + requestType + " Execute")); + + /* + * If Job request execution times out then it may have callable action on time out. + * Create a thread pool with fixed number of threads and unbounded queue such that + * these time out operations will get finished eventually. + */ + this.timeoutPool = Executors.newFixedThreadPool(threads, + new JobOperationThreadFactory("Job " + requestType + " Timeout")); + } + + this.requestType = requestType; + this.timeoutConfigName = timeoutConfigName; + + if (!StringUtils.isEmpty(timeoutConfigName)) { + /* + * Retrieve the job request time out information if configured. + */ + this.requestExecutionTimeout = appConf.getInt(timeoutConfigName, 0); + } + + LOG.info("Configured " + threads + " threads for job request type " + this.requestType + + " with time out " + this.requestExecutionTimeout + " ms."); + } + + /* + * Executes job request operation. If thread pool is not created then job request is + * executed in current thread itself. + * + * @param callable + * Callable object to run the job request task. + * + */ + public T execute(Callable callable) throws Exception { + + if (callable == null ) { + throw new NullPointerException("callable can't be null"); + } + + String type = this.requestType.toString().toLowerCase(); + + /* + * If no threads are configured then execute the job request with current thread itself + * with unbounded time. + */ + if (this.executePool == null) { + LOG.info("No thread pool configured for " + type + " job request. Executing " + + "with the job request in current thread."); + return callable.call(); + } + + LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeout); + Future future = null; + + try { + future = this.executePool.submit(callable); + } catch (RejectedExecutionException rejectedException) { + /* + * Not able to find thread to execute the job request. Raise Busy exception and client + * can retry the operation. + */ + LOG.info("Unable to find thread for " + type + " job request."); + throw new BusyException("Unable to service the " + type + " job request as " + + "templeton service is busy with too many " + type + " job requests. " + + "Please wait for some time before retrying the operation. Please refer " + + "to the config " + configName + " to configure concurrent requests."); + } + + try { + return this.requestExecutionTimeout > 0 + ? future.get(this.requestExecutionTimeout, TimeUnit.MILLISECONDS) : future.get(); + } catch (ExecutionException e) { + /* + * Throw execution exception to caller. + */ + throw (Exception) e.getCause(); + } catch (TimeoutException e) { + String message = this.requestType + " job request got timed out. Please retry the operation after " + + "waiting for some time. Please refer to the config " + timeoutConfigName + + " to configure job request time out."; + LOG.info(message); + + /* + * Throw TimeoutException to caller. + */ + throw new TimeoutException(message); + } finally { + /* + * If the task still active then cancel it. This may happen in case task got interrupted, + * or timed out. + */ + if (!future.isDone()) { + LOG.info("Task is still executing for " + type + " job request. Cancelling it."); + future.cancel(true); + } + } + } + + /* + * Submits a on task on time out thread pool. The executes until it completes or terminates. + * If thread pool is not created then job request is executed in current thread itself. + * + * @param runnableOnTimeout + * Runnable object to run on the time out thread pool. + * + */ + public void submitTimeOutTask(Runnable runnableOnTimeout) { + + if (runnableOnTimeout == null ) { + throw new NullPointerException("runnableOnTimeout can't be null"); + } + + String type = this.requestType.toString().toLowerCase(); + + if (this.timeoutPool == null) { + LOG.info("No thread pool configured to execute time out tasks for " + type + + " job request. Executing with the task in current thread."); + runnableOnTimeout.run(); + } + + this.executePool.submit(runnableOnTimeout); + } + + /* + * A ThreadFactory for job operation threads with meaningful names. Helpful + * for debugging purposes. + */ + static class JobOperationThreadFactory implements ThreadFactory { + + private String threadIdPrefix = "JobThread"; + + /* + * Atomic integer to provide thread id for thread names. + */ + private AtomicInteger threadSequenceNumber = new AtomicInteger(0); + + public JobOperationThreadFactory(String prefix) { + threadIdPrefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + /* + * Use current thread name as part in naming thread. Useful for debgging purpose. + */ + t.setName(String.format("%s-%s-%d", threadIdPrefix, Thread.currentThread().getName(), + threadSequenceNumber.getAndIncrement())); + return t; + } + + } + +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b3f44a2..6b151a1 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,6 +23,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +35,14 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.templeton.tool.JobState; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hive.hcatalog.templeton.tool.TempletonStorage; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; import org.apache.hive.hcatalog.templeton.tool.ZooKeeperStorage; +import com.google.common.annotations.VisibleForTesting; /** * The helper class for all the Templeton delegator classes that @@ -49,8 +53,16 @@ protected String runAs = null; static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; + private TempletonControllerJob ctrl = null; private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; + /* + * Job request executor to submit job requests. + */ + private static JobRequestExecutor jobRequest = + new JobRequestExecutor(JobRequestExecutor.JobRequestType.Submit, + AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, AppConfig.EXEC_JOB_SUBMIT_TIMEOUT); + public LauncherDelegator(AppConfig appConf) { super(appConf); } @@ -70,13 +82,68 @@ public void registerJob(String id, String user, String callback, } } + /* + * Submit job request. If maximum concurrent job submit requests are configured then submit + * request will be executed on a thread from thread pool. If job submit request time out is + * configured then request execution thread will be interrupted if thread times out. Also + * does best efforts to identify if job is submitted and kill it quietly. + */ + public EnqueueBean enqueueController(final String user, final Map userArgs, + final String callback, final List args) + throws NotAuthorizedException, BusyException, IOException, QueueException, Exception { + Callable callable = new Callable() { + @Override + public EnqueueBean call() throws NotAuthorizedException, BusyException, IOException, + QueueException, Exception { + return enqueueJob(user, userArgs, callback, args); + } + }; + + EnqueueBean bean = null; + ctrl = getTempletonController(); + + try { + bean = jobRequest.execute(callable); + } catch (TimeoutException e) { + final String jobId = ctrl.getSubmittedId(); + LOG.info("Job submission got timed out."); + + /* + * Check if the job submission has gone through and if job is valid. + */ + if (StringUtils.startsWith(jobId, "job_")) { + LOG.info("Timed out while submitting the job " + jobId + + ". Started killing the job. Please retry after waiting for some time."); + + /* + * There might be a job request submitted to resource manager before the request + * got timed out. Send kill job request to ensure that there will not be duplicate + * jobs if client retries the job submit request. + */ + Runnable runnableOnTimeout = new Runnable() { + @Override + public void run() { + killJob(user, jobId); + } + }; + + jobRequest.submitTimeOutTask(runnableOnTimeout); + } else { + LOG.info("Invalid job id found after job request time out."); + } + + throw e; + } + + return bean; + } + /** * Enqueue the TempletonControllerJob directly calling doAs. */ - public EnqueueBean enqueueController(String user, Map userArgs, String callback, + public EnqueueBean enqueueJob(String user, Map userArgs, String callback, List args) - throws NotAuthorizedException, BusyException, - IOException, QueueException { + throws NotAuthorizedException, BusyException, IOException, QueueException, TimeoutException { try { UserGroupInformation ugi = UgiFactory.getUgi(user); @@ -100,20 +167,54 @@ public EnqueueBean enqueueController(String user, Map userArgs, } private String queueAsUser(UserGroupInformation ugi, final List args) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { if(LOG.isDebugEnabled()) { LOG.debug("Launching job: " + args); } + + final String user = ugi.getUserName(); + return ugi.doAs(new PrivilegedExceptionAction() { public String run() throws Exception { - String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf); - ToolRunner.run(ctrl, args.toArray(array)); + runTempletonControllerJob(ctrl, args); return ctrl.getSubmittedId(); } }); } + /* + * Gets new templeton controller objects. + */ + @VisibleForTesting + TempletonControllerJob getTempletonController() { + return new TempletonControllerJob(secureMeatastoreAccess, appConf); + } + + /* + * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run + * the actual job. + */ + @VisibleForTesting + int runTempletonControllerJob(TempletonControllerJob ctrl, List args) + throws IOException, InterruptedException, TimeoutException, Exception { + String[] array = new String[args.size()]; + return ToolRunner.run(ctrl, args.toArray(array)); + } + + /* + * Uses DeleteDelegator to kill a job and ignores all exceptions. + */ + @VisibleForTesting + void killJob(String user, String jobId) { + try { + DeleteDelegator d = new DeleteDelegator(appConf); + d.run(user, jobId); + } + catch (Exception e) { + LOG.info("Failed to kill job " + jobId + " with message " + e.getMessage()); + } + } + public List makeLauncherArgs(AppConfig appConf, String statusdir, String completedUrl, List copyFiles, @@ -262,8 +363,8 @@ public static void addDef(List args, String name, String val) { } } /** - * This is called by subclasses when they determined that the sumbmitted job requires - * metastore access (e.g. Pig job that uses HCatalog). This then determines if + * This is called by subclasses when they determined that the submitted job requires + * metastore access (e.g. Pig job that uses HCatalog). This then determines if * secure access is required and causes TempletonControllerJob to set up a delegation token. * @see TempletonControllerJob */ diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java index a30ecd1..6b202aa 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java @@ -19,9 +19,14 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; @@ -31,20 +36,56 @@ * List jobs owned by a user. */ public class ListDelegator extends TempletonDelegator { + private static final Log LOG = LogFactory.getLog(ListDelegator.class); + + /* + * Job request executor to list job status requests. + */ + private static JobRequestExecutor> jobRequest = + new JobRequestExecutor>(JobRequestExecutor.JobRequestType.List, + AppConfig.EXEC_MAX_JOB_LIST_PROCS, AppConfig.EXEC_JOB_LIST_TIMEOUT); + public ListDelegator(AppConfig appConf) { super(appConf); } - public List run(String user, boolean showall) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + /* + * List status jobs request. If maximum concurrent job list requests are configured then + * list request will be executed on a thread from thread pool. If job list request time out + * is configured then request execution thread will be interrupted if thread times out and + * does no action. + */ + public List run(final String user, final boolean showall, final String jobId, + final String numrecords, final boolean showDetails) + throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException, Exception { + + Callable> callable = new Callable>() { + @Override + public List call() throws NotAuthorizedException, BadParam, IOException, + InterruptedException, Exception { + return listJobs(user, showall, jobId, numrecords, showDetails); + } + }; + + return jobRequest.execute(callable); + } + + /* + * Gets list of job ids and calls getJobStatus to get status for each job id. + */ + public List listJobs(String user, boolean showall, String jobId, + String numrecords, boolean showDetails) + throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException { UserGroupInformation ugi = UgiFactory.getUgi(user); WebHCatJTShim tracker = null; + ArrayList ids = new ArrayList(); + try { tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); - ArrayList ids = new ArrayList(); - JobStatus[] jobs = tracker.getAllJobs(); if (jobs != null) { @@ -54,13 +95,97 @@ public ListDelegator(AppConfig appConf) { ids.add(id); } } - - return ids; } catch (IllegalStateException e) { throw new BadParam(e.getMessage()); } finally { if (tracker != null) tracker.close(); } + + return getJobStatus(ids, user, showall, jobId, numrecords, showDetails); + } + + /* + * Returns job status for list of input jobs as a list. + */ + public List getJobStatus(ArrayList jobIds, String user, boolean showall, + String jobId, String numrecords, boolean showDetails) + throws BadParam, IOException, InterruptedException, + BusyException { + + List detailList = new ArrayList(); + int currRecord = 0; + int numRecords; + + // Parse numrecords to an integer + try { + if (numrecords != null) { + numRecords = Integer.parseInt(numrecords); + if (numRecords <= 0) { + throw new BadParam("numrecords should be an integer > 0"); + } + } + else { + numRecords = -1; + } + } + catch(Exception e) { + throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0"); + } + + // Sort the list as requested + boolean isAscendingOrder = true; + switch (appConf.getListJobsOrder()) { + case lexicographicaldesc: + Collections.sort(jobIds, Collections.reverseOrder()); + isAscendingOrder = false; + break; + case lexicographicalasc: + default: + Collections.sort(jobIds); + break; + } + + for (String job : jobIds) { + // If numRecords = -1, fetch all records. + // Hence skip all the below checks when numRecords = -1. + if (numRecords != -1) { + // If currRecord >= numRecords, we have already fetched the top #numRecords + if (currRecord >= numRecords) { + break; + } + else if (jobId == null || jobId.trim().length() == 0) { + currRecord++; + } + // If the current record needs to be returned based on the + // filter conditions specified by the user, increment the counter + else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) { + currRecord++; + } + // The current record should not be included in the output detailList. + else { + continue; + } + } + JobItemBean jobItem = new JobItemBean(); + jobItem.id = job; + if (showDetails) { + StatusDelegator sd = new StatusDelegator(appConf); + try { + jobItem.detail = sd.run(user, job); + } + catch(Exception ex) { + /* + * if we could not get status for some reason, log it, and send empty status back with + * just the ID so that caller knows to even look in the log file + */ + LOG.info("Failed to get status detail for jobId='" + job + "'", ex); + jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); + } + } + detailList.add(jobItem); + } + + return detailList; } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index aeb89df..ccc2d36 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 2da0204..4879045 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -650,7 +650,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); verifyParam(inputs, "input"); verifyParam(mapper, "mapper"); @@ -704,7 +704,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); verifyParam(jar, "jar"); verifyParam(mainClass, "class"); @@ -754,7 +754,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -805,7 +805,7 @@ public EnqueueBean sqoop(@FormParam("command") String command, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - IOException, InterruptedException { + IOException, InterruptedException, Exception { verifyUser(); if (command == null && optionsFile == null) throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job."); @@ -859,7 +859,7 @@ public EnqueueBean hive(@FormParam("execute") String execute, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -891,7 +891,8 @@ public EnqueueBean hive(@FormParam("execute") String execute, @Path("jobs/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showJobId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { verifyUser(); verifyParam(jobid, ":jobid"); @@ -968,7 +969,8 @@ public QueueStatusBean deleteJobId(@PathParam("jobid") String jobid) @QueryParam("showall") boolean showall, @QueryParam("jobid") String jobid, @QueryParam("numrecords") String numrecords) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { verifyUser(); @@ -981,78 +983,7 @@ public QueueStatusBean deleteJobId(@PathParam("jobid") String jobid) } ListDelegator ld = new ListDelegator(appConf); - List list = ld.run(getDoAsUser(), showall); - List detailList = new ArrayList(); - int currRecord = 0; - int numRecords; - - // Parse numrecords to an integer - try { - if (numrecords != null) { - numRecords = Integer.parseInt(numrecords); - if (numRecords <= 0) { - throw new BadParam("numrecords should be an integer > 0"); - } - } - else { - numRecords = -1; - } - } - catch(Exception e) { - throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0"); - } - - // Sort the list as requested - boolean isAscendingOrder = true; - switch (appConf.getListJobsOrder()) { - case lexicographicaldesc: - Collections.sort(list, Collections.reverseOrder()); - isAscendingOrder = false; - break; - case lexicographicalasc: - default: - Collections.sort(list); - break; - } - - for (String job : list) { - // If numRecords = -1, fetch all records. - // Hence skip all the below checks when numRecords = -1. - if (numRecords != -1) { - // If currRecord >= numRecords, we have already fetched the top #numRecords - if (currRecord >= numRecords) { - break; - } - else if (jobid == null || jobid.trim().length() == 0) { - currRecord++; - } - // If the current record needs to be returned based on the - // filter conditions specified by the user, increment the counter - else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) { - currRecord++; - } - // The current record should not be included in the output detailList. - else { - continue; - } - } - JobItemBean jobItem = new JobItemBean(); - jobItem.id = job; - if (showDetails) { - StatusDelegator sd = new StatusDelegator(appConf); - try { - jobItem.detail = sd.run(getDoAsUser(), job); - } - catch(Exception ex) { - /*if we could not get status for some reason, log it, and send empty status back with - * just the ID so that caller knows to even look in the log file*/ - LOG.info("Failed to get status detail for jobId='" + job + "'", ex); - jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); - } - } - detailList.add(jobItem); - } - return detailList; + return ld.run(getDoAsUser(), showall, jobid, numrecords, showDetails); } /** diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index fde5f60..97d5c4e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -50,7 +50,7 @@ public EnqueueBean run(String user, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect, String libdir) throws NotAuthorizedException, BadParam, BusyException, QueueException, - IOException, InterruptedException + IOException, InterruptedException, Exception { if(TempletonUtils.isset(appConf.sqoopArchive())) { if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java index fac0170..cb92150 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +44,39 @@ public class StatusDelegator extends TempletonDelegator { private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class); + /* + * Job status request executor to get status of a job. + */ + private static JobRequestExecutor jobRequest = + new JobRequestExecutor(JobRequestExecutor.JobRequestType.Status, + AppConfig.EXEC_MAX_JOB_STATUS_PROCS, AppConfig.EXEC_JOB_STATUS_TIMEOUT); + public StatusDelegator(AppConfig appConf) { super(appConf); } - public QueueStatusBean run(String user, String id) - throws NotAuthorizedException, BadParam, IOException, InterruptedException + /* + * Gets status of job form job id. If maximum concurrent job status requests are configured + * then status request will be executed on a thread from thread pool. If job status request + * time out is configured then request execution thread will be interrupted if thread times out. + * and does no action. + */ + public QueueStatusBean run(final String user, final String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { + Callable callable = new Callable() { + @Override + public QueueStatusBean call() throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException, Exception { + return getJobStatus(user, id); + } + }; + + return jobRequest.execute(callable); + } + + public QueueStatusBean getJobStatus(String user, String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { WebHCatJTShim tracker = null; JobState state = null; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 839b56a..fa72cbd 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { List args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java new file mode 100644 index 0000000..7238cb6 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests. + */ +public class TestConcurrentJobRequests extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 0); + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 0); + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 0); + } + + @Test + public void ConcurrentJobsStatusSuccess() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsSuccess() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsSuccess() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(4, null), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java new file mode 100644 index 0000000..45069ad --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Base class for mocking job operations with concurrent requests. + */ +public class TestConcurrentJobRequestsBase { + private static final Logger LOG = LoggerFactory.getLogger(TestConcurrentJobRequestsBase.class); + + TestMockAnswerHelper statusJobHelper = new TestMockAnswerHelper(); + TestMockAnswerHelper killJobHelper = new TestMockAnswerHelper(); + TestMockAnswerHelper> listJobHelper = new TestMockAnswerHelper>(); + TestMockAnswerHelper submitJobHelper = new TestMockAnswerHelper(); + + public JobRunnable ConcurrentJobsStatus(int threadCount, AppConfig appConfig, + final boolean killThreads, final Answer answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + StatusDelegator delegator = new StatusDelegator(appConfig); + final StatusDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class), + Mockito.any(String.class)); + + JobRunnable statusJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", "job_1000"); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(statusJobRunnable, threadCount, killThreads); + return statusJobRunnable; + } + + public JobRunnable ConcurrentListJobs(int threadCount, AppConfig config, + final boolean killThreads, final Answer> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + ListDelegator delegator = new ListDelegator(config); + final ListDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class), + Mockito.any(boolean.class), Mockito.any(String.class), + Mockito.any(String.class), Mockito.any(boolean.class)); + + JobRunnable listJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", true, "", "10", true); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(listJobRunnable, threadCount, killThreads); + return listJobRunnable; + } + + public JobRunnable SubmitConcurrentJobs(int threadCount, AppConfig config, + final boolean killThreads, final Answer responseAnswer, + final Answer timeoutResponseAnswer, final String jobIdResponse) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BusyException, TimeoutException, Exception { + + LauncherDelegator delegator = new LauncherDelegator(config); + final LauncherDelegator mockDelegator = Mockito.spy(delegator); + final List listArgs = new ArrayList(); + + TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class); + + Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId(); + + Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController(); + + Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob( + Mockito.any(TempletonControllerJob.class), Mockito.any(List.class)); + + Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob( + Mockito.any(String.class), Mockito.any(String.class)); + + Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class), + Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class)); + + JobRunnable submitJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.enqueueController("admin", null, "", listArgs); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(submitJobRunnable, threadCount, killThreads); + return submitJobRunnable; + } + + public void executeJobOperations(Runnable jobRunnable, int threadCount, boolean killThreads) + throws IOException, InterruptedException, QueueException, NotAuthorizedException { + + ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue());; + + try{ + for (int i = 0; i < threadCount; i++) { + executorService.submit(jobRunnable); + } + + if (killThreads) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + + } catch (Exception ex) { + // Do nothing. Exception already stored in jobRunnable. + } + + // Shutdown threads if there are any. + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.info("Force Shutting down the pool\n"); + executorService.shutdownNow(); + } + } catch (Exception e) { + // Ignore any exceptions to showndown pool. + } + } + + public abstract class JobRunnable implements Runnable { + public volatile Exception exception = null; + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java new file mode 100644 index 0000000..c1caf8e --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.eclipse.jetty.http.HttpStatus; +import java.util.concurrent.TimeoutException; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests. Verify that we get busy exception and appropriate message. + */ +public class TestConcurrentJobRequestsThreadsAndTimeout extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + private static QueueStatusBean statusBean; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 5); + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 5); + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 5); + config.setInt(AppConfig.EXEC_JOB_SUBMIT_TIMEOUT, 5000); + config.setInt(AppConfig.EXEC_JOB_STATUS_TIMEOUT, 5000); + config.setInt(AppConfig.EXEC_JOB_LIST_TIMEOUT, 5000); + statusBean = new QueueStatusBean("job_1000", "Job not found"); + } + + @Test + public void ConcurrentJobsStatusBusyException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the status job request as templeton service is busy " + + "with too many status job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.status.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsBusyException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the list job request as templeton service is busy " + + "with too many list job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.list.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsBusyException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the submit job request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.submit.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentJobsStatusTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, + statusJobHelper.getDelayedResonseAnswer(10, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "Status job request got timed out. Please retry the operation after waiting " + + "for some time. Please refer to the config templeton.exec.job.status.timeout " + + "to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + jobRunnable = ConcurrentJobsStatus(5, config, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, + listJobHelper.getDelayedResonseAnswer(10, new ArrayList())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "List job request got timed out. Please retry the operation after waiting " + + "for some time. Please refer to the config templeton.exec.job.list.timeout " + + "to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + jobRunnable = ConcurrentListJobs(5, config, false, + listJobHelper.getDelayedResonseAnswer(1, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTimeOutException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(10, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "Submit job request got timed out. Please retry the operation after waiting " + + "for some time. Please refer to the config templeton.exec.job.submit.timeout " + + "to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentStatusJobsKillThreads() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentJobsStatus(5, config, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsKillThreads() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentListJobs(5, config, false, + listJobHelper.getDelayedResonseAnswer(0, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsKillThreads() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, true, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentStatusJobsInterruptedException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, + statusJobHelper.getInterruptedExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentJobsStatus(5, config, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsInterruptedException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, + listJobHelper.getInterruptedExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentListJobs(5, config, false, + listJobHelper.getDelayedResonseAnswer(0, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsInterruptedException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getInterruptedExceptionAnswer(), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestMockAnswerHelper.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestMockAnswerHelper.java new file mode 100644 index 0000000..38878a8 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestMockAnswerHelper.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Helper class to generate mocked response. + */ +public class TestMockAnswerHelper { + public Answer getInterruptedExceptionAnswer() { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + throw new InterruptedException("Thread got interrupted manually."); + } + }; + } + + public Answer getQueueExceptionAnswer() { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws QueueException { + throw new QueueException("QueueException raised manually."); + } + }; + } + + public Answer getDelayedResonseAnswer(final int delayInSeconds, final T response) { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + Thread.sleep(1000 * delayInSeconds); + return response; + } + }; + } +}