diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 54d0907..ba1b0ac 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -110,6 +110,9 @@ public static final String MAPPER_MEMORY_MB = "templeton.mapper.memory.mb"; public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb"; public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder"; + public static final String EXEC_MAX_JOB_SUBMIT_PROCS = "templeton.exec.job.submit.max-procs"; + public static final String EXEC_MAX_JOB_STATUS_PROCS = "templeton.exec.job.status.max-procs"; + public static final String EXEC_MAX_JOB_LIST_PROCS = "templeton.exec.job.list.max-procs"; /** * see webhcat-default.xml @@ -179,17 +182,17 @@ private void init() { for (Map.Entry e : System.getenv().entrySet()) set("env." + e.getKey(), e.getValue()); + String hadoopConfDir = getHadoopConfDir(); + for (String fname : HADOOP_CONF_FILENAMES) { + logConfigLoadAttempt(hadoopConfDir + File.separator + fname); + loadOneFileConfig(hadoopConfDir, fname); + } String templetonDir = getTempletonDir(); for (String fname : TEMPLETON_CONF_FILENAMES) { logConfigLoadAttempt(templetonDir + File.separator + fname); if (! loadOneClasspathConfig(fname)) loadOneFileConfig(templetonDir, fname); } - String hadoopConfDir = getHadoopConfDir(); - for (String fname : HADOOP_CONF_FILENAMES) { - logConfigLoadAttempt(hadoopConfDir + File.separator + fname); - loadOneFileConfig(hadoopConfDir, fname); - } ProxyUserSupport.processProxyuserConfig(this); handleHiveProperties(); LOG.info(dumpEnvironent()); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java index 64261fa..ffc33c2 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/BusyException.java @@ -27,4 +27,8 @@ public BusyException() { super(HttpStatus.SERVICE_UNAVAILABLE_503, "Busy, please retry"); } + + public BusyException(String msg) { + super(HttpStatus.SERVICE_UNAVAILABLE_503, msg); + } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b3f44a2..1b68612 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,10 @@ static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; + private static int submitProcs = Main.getAppConfigInstance().getInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 0); + + // Semaphore to control the number of concurrent job submit requests to be serviced. + private static Semaphore submitAvail = submitProcs > 0 ? new Semaphore(submitProcs) : null; public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -71,12 +76,42 @@ public void registerJob(String id, String user, String callback, } /** - * Enqueue the TempletonControllerJob directly calling doAs. + * Acquires semaphore and then calls enqueueJob to submit job request. If semaphore is + * not acquired then responds back with BusyException exception. */ public EnqueueBean enqueueController(String user, Map userArgs, String callback, List args) throws NotAuthorizedException, BusyException, IOException, QueueException { + boolean acquired = false; + try { + if (submitAvail != null) { + acquired = submitAvail.tryAcquire(); + if (!acquired) { + LOG.debug("Unable to acquire semaphore for submit job request."); + throw new BusyException("Unable to service the job submit request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before retrying the operation." + + " Please refer to the config " + AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS + + " to configure concurrent requests."); + } + } + + return enqueueJob(user, userArgs, callback, args); + } finally { + if (acquired && submitAvail != null) { + LOG.debug("Releasing the semaphore after submit job request is completed."); + submitAvail.release(); + } + } + } + + /** + * Enqueue the TempletonControllerJob directly calling doAs. + */ + public EnqueueBean enqueueJob(String user, Map userArgs, String callback, + List args) + throws NotAuthorizedException, BusyException, + IOException, QueueException { try { UserGroupInformation ugi = UgiFactory.getUgi(user); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java index a30ecd1..c772bd1 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java @@ -21,7 +21,10 @@ import java.io.IOException; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.Semaphore; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; @@ -31,12 +34,47 @@ * List jobs owned by a user. */ public class ListDelegator extends TempletonDelegator { + private static final Log LOG = LogFactory.getLog(ListDelegator.class); + private static int listProcs = Main.getAppConfigInstance().getInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 0); + + // Semaphore to control the number of concurrent list jobs requests to be serviced. + private static Semaphore listAvail = listProcs > 0 ? new Semaphore(listProcs) : null; + public ListDelegator(AppConfig appConf) { super(appConf); } + /** + * Acquires semaphore and then calls listJobs to list jobs. If semaphore is + * not acquired then responds back with BusyException exception. + */ public List run(String user, boolean showall) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { + + boolean acquired = false; + try { + if (listAvail != null) { + acquired = listAvail.tryAcquire(); + if (!acquired) { + LOG.debug("Unable to acquire semaphore for list jobs request."); + throw new BusyException("Unable to service the list jobs request as templeton service is busy " + + "with too many list jobs requests. Please wait for some time before retrying the operation." + + " Please refer to the config " + AppConfig.EXEC_MAX_JOB_LIST_PROCS + + " to configure concurrent requests."); + } + } + + return listJobs(user, showall); + } finally { + if (acquired && listAvail != null) { + LOG.debug("Releasing the semaphore after list jobs request is completed."); + listAvail.release(); + } + } + } + + public List listJobs(String user, boolean showall) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { UserGroupInformation ugi = UgiFactory.getUgi(user); WebHCatJTShim tracker = null; diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 2da0204..d8a37e5 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -891,7 +891,7 @@ public EnqueueBean hive(@FormParam("execute") String execute, @Path("jobs/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showJobId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { verifyUser(); verifyParam(jobid, ":jobid"); @@ -968,7 +968,7 @@ public QueueStatusBean deleteJobId(@PathParam("jobid") String jobid) @QueryParam("showall") boolean showall, @QueryParam("jobid") String jobid, @QueryParam("numrecords") String numrecords) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { verifyUser(); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java index fac0170..01dc3f8 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +42,46 @@ */ public class StatusDelegator extends TempletonDelegator { private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class); + private static int submitProcs = Main.getAppConfigInstance().getInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 0); + + // Semaphore to control the number of concurrent job status requests to be serviced. + private static Semaphore statusAvail = submitProcs > 0 ? new Semaphore(submitProcs) : null; public StatusDelegator(AppConfig appConf) { super(appConf); } + /** + * Acquires semaphore and then calls getJobStatus to get the status of job. If semaphore is + * not acquired then responds back with BusyException exception. + */ public QueueStatusBean run(String user, String id) - throws NotAuthorizedException, BadParam, IOException, InterruptedException + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException + { + boolean acquired = false; + try { + if (statusAvail != null) { + acquired = statusAvail.tryAcquire(); + if (!acquired) { + LOG.debug("Unable to acquire the semaphore for job status request."); + throw new BusyException("Unable to service the job status request as templeton service is busy " + + "with too many job status requests. Please wait for some time before retrying the operation." + + " Please refer to the config " + AppConfig.EXEC_MAX_JOB_STATUS_PROCS + + " to configure concurrent requests."); + } + } + + return getJobStatus(user, id); + } finally { + if (acquired && statusAvail != null) { + LOG.debug("Releasing the semaphore after job status request is completed."); + statusAvail.release(); + } + } + } + + public QueueStatusBean getJobStatus(String user, String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException { WebHCatJTShim tracker = null; JobState state = null; diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java new file mode 100644 index 0000000..7d583a4 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.eclipse.jetty.http.HttpStatus; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests. Verify that we get busy exception and appropriate message. + */ +public class TestConcurrentJobRequests extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + } + + @Test + public void ConcurrentJobsStatusBusyException() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 5); + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config); + assertTrue(jobRunnable.exception instanceof BusyException); + assertTrue(jobRunnable.exception != null); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the job status request as templeton service is busy " + + "with too many job status requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.status.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsBusyException() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 5); + JobRunnable jobRunnable = ConcurrentListJobs(6, config); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the list jobs request as templeton service is busy " + + "with too many list jobs requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.list.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void SubmitConcurrentJobsBusyException() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 5); + LauncherDelegator delegator = new LauncherDelegator(config); + JobRunnable jobRunnable = SubmitConcurrentJobs(6, delegator); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof BusyException); + BusyException ex = (BusyException)jobRunnable.exception; + assertTrue(ex.httpCode == HttpStatus.SERVICE_UNAVAILABLE_503); + String expectedMessage = "Unable to service the job submit request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.exec.job.submit.max-procs to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + } catch (Exception e) { + assertTrue(false); + } + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java new file mode 100644 index 0000000..c9f388c --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsBase.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.List; +import java.util.Map; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Base class for mocking job operations with concurrent requests. + */ +public class TestConcurrentJobRequestsBase { + + public JobRunnable ConcurrentJobsStatus(int threadCount, AppConfig appConfig) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, BadParam, BusyException { + + StatusDelegator delegator = new StatusDelegator(appConfig); + final StatusDelegator mockDelegator = Mockito.spy(delegator); + + final QueueStatusBean statusBean = new QueueStatusBean("job_1000", "Job not found"); + Mockito.doAnswer(new Answer() { + @Override + public QueueStatusBean answer(InvocationOnMock invocation){ + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + } + return statusBean; + } + }).when(mockDelegator).getJobStatus(Mockito.any(String.class), Mockito.any(String.class)); + + JobRunnable statusJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", "job_1000"); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(statusJobRunnable, threadCount); + return statusJobRunnable; + } + + public JobRunnable ConcurrentListJobs(int threadCount, AppConfig config) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, BadParam, BusyException { + + ListDelegator delegator = new ListDelegator(config); + final ListDelegator mockDelegator = Mockito.spy(delegator); + + final List jobIds = new ArrayList(); + Mockito.doAnswer(new Answer>() { + @Override + public List answer(InvocationOnMock invocation){ + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + } + return jobIds; + } + }).when(mockDelegator).listJobs(Mockito.any(String.class), Mockito.any(boolean.class)); + + JobRunnable listJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.run("admin", true); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(listJobRunnable, threadCount); + return listJobRunnable; + } + + public JobRunnable SubmitConcurrentJobs(int threadCount, LauncherDelegator delegator) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, BusyException { + + final LauncherDelegator mockDelegator = Mockito.spy(delegator); + final List listArgs = new ArrayList(); + + final EnqueueBean bean = new EnqueueBean("job_1000"); + Mockito.doAnswer(new Answer() { + @Override + public EnqueueBean answer(InvocationOnMock invocation){ + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + System.out.println("Got inturrpted unexpecteadly"); + } + return bean; + } + }).when(mockDelegator).enqueueJob(Mockito.any(String.class), Mockito.any(Map.class), Mockito.any(String.class), Mockito.any(List.class)); + + JobRunnable submitJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + mockDelegator.enqueueController("admin", null, "", listArgs); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(submitJobRunnable, threadCount); + return submitJobRunnable; + } + + public void executeJobOperations(Runnable jobRunnable, int threadCount) + throws IOException, InterruptedException, QueueException, NotAuthorizedException { + + try{ + ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue());; + + for (int i = 0; i < threadCount; i++) { + executorService.submit(jobRunnable); + } + + executorService.shutdown(); + + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ex) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } catch (Exception e) { + e.printStackTrace(System.out); + throw new QueueException("Unable to execute job operations " + e); + } + } + + public abstract class JobRunnable implements Runnable { + public volatile Exception exception = null; + } +} diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java new file mode 100644 index 0000000..71e8302 --- /dev/null +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsNoSemaphore.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests. + */ +public class TestConcurrentJobRequestsNoSemaphore extends TestConcurrentJobRequestsBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = Main.getAppConfigInstance(); + } + + @Test + public void ConcurrentJobsStatusSuccess() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_STATUS_PROCS, 0); + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsSuccess() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_LIST_PROCS, 0); + JobRunnable jobRunnable = ConcurrentListJobs(6, config); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void SubmitConcurrentJobssuccess() { + try { + config.setInt(AppConfig.EXEC_MAX_JOB_SUBMIT_PROCS, 0); + LauncherDelegator delegator = new LauncherDelegator(config); + JobRunnable jobRunnable = SubmitConcurrentJobs(6, delegator); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +}