diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 54d0907..df7de4e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -110,6 +110,9 @@ public static final String MAPPER_MEMORY_MB = "templeton.mapper.memory.mb"; public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb"; public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder"; + public static final String EXEC_MAX_JOB_SUBMIT_PROCS = "templeton.exec.job.submit.max-procs"; + public static final String EXEC_MAX_JOB_STATUS_PROCS = "templeton.exec.job.status.max-procs"; + public static final String EXEC_MAX_JOB_LIST_PROCS = "templeton.exec.job.list.max-procs"; /** * see webhcat-default.xml diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java index 64261fa..1f05516 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java @@ -25,6 +25,10 @@ */ public class BusyException extends SimpleWebException { public BusyException() { - super(HttpStatus.SERVICE_UNAVAILABLE_503, "Busy, please retry"); + this("Busy, please retry"); + } + + public BusyException(String msg) { + super(HttpStatus.SERVICE_UNAVAILABLE_503, msg); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f0296cb..bd3efdf 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -49,7 +49,7 @@ public EnqueueBean run(String user, Map userArgs, String statusdir, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 84cd5b9..07f4e76 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -46,7 +46,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequest.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequest.java new file mode 100644 index 0000000..e805869 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobRequest { + private static final Logger LOG = LoggerFactory.getLogger(JobRequest.class); + private static AppConfig appConf = Main.getAppConfigInstance(); + + // Semaphore to control concurrent job requests. + private Semaphore avail; + + // Type of job request. Useful for logging purpose. + private JobRequestType requestType; + + // Config name used to find the number of concurrent requets. Useful for logging purpose. + private String configName; + + // Job Request type. Useful for logging purpose. + public enum JobRequestType { + SUBMIT, + STATUS, + LIST + } + + // Prevent default constructor to trigger. + private JobRequest() { + } + + // Creates a job request. Retrieves number of concurrent requests from 'configName' and + // creates Semaphores. In case configName is not found in config then 'defaultValue' is used. + public JobRequest(String configName, int defaultValue, JobRequestType requestType) { + this.configName = configName; + int procs = appConf.getInt(configName, defaultValue); + this.avail = procs > 0 ? new Semaphore(procs) : null; + this.requestType = requestType; + LOG.debug("Configured " + procs + " semaphores for job request type " + this.requestType); + } + + // Acquire semaphore if it exists and then executes the actual operation. + T execute(Callable callable) throws Exception { + boolean acquired = false; + String type = this.requestType.toString().toLowerCase(); + try { + if (avail != null) { + acquired = avail.tryAcquire(); + if (!acquired) { + LOG.info("Unable to acquire the semaphore for " + type + " job request."); + throw new BusyException("Unable to service the " + type + " job request as templeton service is busy " + + "with too many " + type + " job requests. Please wait for some time before retrying" + + " the operation. Please refer to the config " + configName + " to configure concurrent requests."); + } else { + LOG.debug("Acquired semaphore for " + type + " job request."); + } + } else { + LOG.debug("No semaphore found for " + type + " job request."); + } + + // Acquired semaphore. Execute the actual job operation. + return callable.call(); + } finally { + if (acquired && avail != null) { + LOG.debug("Releasing the semaphore after " + type + " job request is completed."); + avail.release(); + } + } + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b3f44a2..e4d467e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,9 @@ static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; + private static JobRequest jobRequest = + new JobRequest(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 0, + JobRequest.JobRequestType.SUBMIT); public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -71,9 +76,26 @@ public void registerJob(String id, String user, String callback, } /** + * Acquires semaphore and then calls enqueueJob to submit job request. If semaphore is + * not acquired then responds back with BusyException exception. + */ + public EnqueueBean enqueueController(final String user, final Map userArgs, final String callback, + final List args) + throws NotAuthorizedException, BusyException, IOException, QueueException, Exception { + Callable callable = new Callable() { + @Override + public EnqueueBean call() throws NotAuthorizedException, BusyException, IOException, QueueException, Exception { + return enqueueJob(user, userArgs, callback, args); + } + }; + + return jobRequest.execute(callable); + } + + /** * Enqueue the TempletonControllerJob directly calling doAs. */ - public EnqueueBean enqueueController(String user, Map userArgs, String callback, + public EnqueueBean enqueueJob(String user, Map userArgs, String callback, List args) throws NotAuthorizedException, BusyException, IOException, QueueException { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java index a30ecd1..f44557f 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java @@ -21,7 +21,11 @@ import java.io.IOException; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; @@ -31,12 +35,34 @@ * List jobs owned by a user. */ public class ListDelegator extends TempletonDelegator { + private static final Log LOG = LogFactory.getLog(ListDelegator.class); + private static JobRequest> jobRequest = + new JobRequest>(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 0, + JobRequest.JobRequestType.LIST); + public ListDelegator(AppConfig appConf) { super(appConf); } - public List run(String user, boolean showall) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + /** + * Acquires semaphore and then calls listJobs to list jobs. If semaphore is + * not acquired then responds back with BusyException exception. + */ + public List run(final String user, final boolean showall) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException, Exception { + + Callable> callable = new Callable>() { + @Override + public List call() throws NotAuthorizedException, BadParam, IOException, InterruptedException, Exception { + return listJobs(user, showall); + } + }; + + return jobRequest.execute(callable); + } + + public List listJobs(String user, boolean showall) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { UserGroupInformation ugi = UgiFactory.getUgi(user); WebHCatJTShim tracker = null; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index aeb89df..ccc2d36 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 2da0204..2e418da 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -650,7 +650,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); verifyParam(inputs, "input"); verifyParam(mapper, "mapper"); @@ -704,7 +704,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); verifyParam(jar, "jar"); verifyParam(mainClass, "class"); @@ -754,7 +754,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -805,7 +805,7 @@ public EnqueueBean sqoop(@FormParam("command") String command, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - IOException, InterruptedException { + IOException, InterruptedException, Exception { verifyUser(); if (command == null && optionsFile == null) throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job."); @@ -859,7 +859,7 @@ public EnqueueBean hive(@FormParam("execute") String execute, @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -891,7 +891,8 @@ public EnqueueBean hive(@FormParam("execute") String execute, @Path("jobs/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showJobId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { verifyUser(); verifyParam(jobid, ":jobid"); @@ -968,7 +969,8 @@ public QueueStatusBean deleteJobId(@PathParam("jobid") String jobid) @QueryParam("showall") boolean showall, @QueryParam("jobid") String jobid, @QueryParam("numrecords") String numrecords) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { verifyUser(); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index fde5f60..97d5c4e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -50,7 +50,7 @@ public EnqueueBean run(String user, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect, String libdir) throws NotAuthorizedException, BadParam, BusyException, QueueException, - IOException, InterruptedException + IOException, InterruptedException, Exception { if(TempletonUtils.isset(appConf.sqoopArchive())) { if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java index fac0170..00ce83d 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +43,35 @@ */ public class StatusDelegator extends TempletonDelegator { private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class); + // Semaphore to control the number of concurrent job status requests to be serviced. + private static JobRequest jobRequest = + new JobRequest(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 0, + JobRequest.JobRequestType.STATUS); public StatusDelegator(AppConfig appConf) { super(appConf); } - public QueueStatusBean run(String user, String id) - throws NotAuthorizedException, BadParam, IOException, InterruptedException + /** + * Acquires semaphore and then calls getJobStatus to get the status of job. If semaphore is + * not acquired then responds back with BusyException exception. + */ + public QueueStatusBean run(final String user, final String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, Exception { + Callable callable = new Callable() { + @Override + public QueueStatusBean call() throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException, Exception { + return getJobStatus(user, id); + } + }; + + return jobRequest.execute(callable); + } + + public QueueStatusBean getJobStatus(String user, String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { WebHCatJTShim tracker = null; JobState state = null; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 839b56a..fa72cbd 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, Exception { List args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerHelper.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerHelper.java new file mode 100644 index 0000000..01c0450 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerHelper.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Helper class to generate mocked response. + */ +public class MockAnswerHelper { + public Answer getInterruptedExceptionAnswer() { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + throw new InterruptedException("Thread got interrupted manually."); + } + }; + } + + public Answer getQueueExceptionAnswer() { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws QueueException { + throw new QueueException("QueueException raised manually."); + } + }; + } + + public Answer getDelayedResonseAnswer(final int delayInSeconds, final T response) { + return new Answer() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + Thread.sleep(1000 * delayInSeconds); + return response; + } + }; + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java new file mode 100644 index 0000000..f37b059 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.eclipse.jetty.http.HttpStatus; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests. Verify that we get busy exception and appropriate message. + */ +public class TestConcurrentJobRequests extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 5); + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 5); + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 5); + } + + @Test + public void ConcurrentJobsStatusBusyException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception instanceof BusyException); + assertTrue(jobRunnable.exception != null); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the status job request as templeton service is busy " + + "with too many status job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.status.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsBusyException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the list job request as templeton service is busy " + + "with too many list job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.list.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsBusyException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the submit job request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.submit.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentStatusJobsKillThreads() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, true, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentJobsStatus(5, config, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsKillThreads() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, true, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = ConcurrentListJobs(5, config, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsKillThreads() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, true, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsInterruptedException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getInterruptedExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobQueueFail() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getQueueExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + jobRunnable = SubmitConcurrentJobs(5, config, false, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java new file mode 100644 index 0000000..46dddbe --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Base class for mocking job operations with concurrent requests. + */ +public class TestConcurrentJobRequestsBase { + private static final Logger LOG = LoggerFactory.getLogger(TestConcurrentJobRequestsBase.class); + + MockAnswerHelper statusJobHelper = new MockAnswerHelper(); + MockAnswerHelper> listJobHelper = new MockAnswerHelper>(); + MockAnswerHelper submitJobHelper = new MockAnswerHelper(); + + public JobRunnable ConcurrentJobsStatus(int threadCount, AppConfig appConfig, + final boolean killThreads, final Answer answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + StatusDelegator delegator = new StatusDelegator(appConfig); + final StatusDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class), + Mockito.any(String.class)); + + JobRunnable statusJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", "job_1000"); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(statusJobRunnable, threadCount, killThreads); + return statusJobRunnable; + } + + public JobRunnable ConcurrentListJobs(int threadCount, AppConfig config, + final boolean killThreads, final Answer> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + ListDelegator delegator = new ListDelegator(config); + final ListDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class), + Mockito.any(boolean.class)); + + JobRunnable listJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", true); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(listJobRunnable, threadCount, killThreads); + return listJobRunnable; + } + + public JobRunnable SubmitConcurrentJobs(int threadCount, AppConfig config, + final boolean killThreads, final Answer responseAnswer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, BusyException { + + LauncherDelegator delegator = new LauncherDelegator(config); + final LauncherDelegator mockDelegator = Mockito.spy(delegator); + final List listArgs = new ArrayList(); + + Mockito.doAnswer(responseAnswer).when(mockDelegator).enqueueJob( + Mockito.any(String.class), Mockito.any(Map.class), Mockito.any(String.class), + Mockito.any(List.class)); + + JobRunnable submitJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.enqueueController("admin", null, "", listArgs); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(submitJobRunnable, threadCount, killThreads); + return submitJobRunnable; + } + + public void executeJobOperations(Runnable jobRunnable, int threadCount, boolean killThreads) + throws IOException, InterruptedException, QueueException, NotAuthorizedException { + + try{ + ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue());; + + for (int i = 0; i < threadCount; i++) { + executorService.submit(jobRunnable); + } + + if (killThreads) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + public abstract class JobRunnable implements Runnable { + public volatile Exception exception = null; + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java new file mode 100644 index 0000000..7b54019 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests. + */ +public class TestConcurrentJobRequestsNoSemaphore extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 0); + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 0); + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 0); + } + + @Test + public void ConcurrentJobsStatusSuccess() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsSuccess() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsSuccess() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, + submitJobHelper.getDelayedResonseAnswer(4, new EnqueueBean("job_1000"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +}