commit b5ad9fe19d00004a4f04e033b42949b235829666 Author: Sahil Takiar Date: Thu May 31 11:39:47 2018 -0500 HIVE-17317: Ability to run multiple pre-commit jobs on a ptest server diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java index d4d28c06ba..066bd15cfb 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java @@ -20,9 +20,14 @@ import java.io.File; import java.io.PrintStream; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.commons.pool.PoolableObjectFactory; +import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hive.ptest.api.Status; import org.apache.hive.ptest.api.request.TestStartRequest; import org.apache.hive.ptest.execution.Constants; @@ -58,6 +63,11 @@ private ExecutionContext mExecutionContext; private boolean execute; + private static final int NUM_PARALLEL_BUILDS = 2; + + private final ExecutorService es; + private final BlockingQueue workingDirSuffixes; + public TestExecutor(ExecutionContextConfiguration executionContextConfiguration, ExecutionContextProvider executionContextProvider, BlockingQueue testQueue, PTest.Builder pTestBuilder) { @@ -66,91 +76,123 @@ public TestExecutor(ExecutionContextConfiguration executionContextConfiguration, mTestQueue = testQueue; mPTestBuilder = pTestBuilder; execute = true; + es = Executors.newFixedThreadPool(NUM_PARALLEL_BUILDS); + workingDirSuffixes = new ArrayBlockingQueue<>(NUM_PARALLEL_BUILDS); + for (int i = 0; i < NUM_PARALLEL_BUILDS; i++) { + if (!workingDirSuffixes.add(Integer.toString(i))) { + throw new IllegalStateException("Unable to add workingDirSuffix to queue"); + } + } } @Override public void run() { - while(execute) { - Test test = null; + while (execute) { + try { + Test test = mTestQueue.poll(30, TimeUnit.MINUTES); + if (!execute) { + terminateExecutionContext(); + break; + } + if (test == null) { + terminateExecutionContext(); + } else { + es.submit(new TestRunner(test)); + } + } catch (Exception e) { + LOG.error("Unxpected Error", e); + terminateExecutionContext(); + } + } + } + + private class TestRunner implements Runnable { + + private final Test test; + + private TestRunner(Test test) { + this.test = test; + } + + @Override + public void run() { PrintStream logStream = null; Logger logger = null; + String workingDirSuffix = null; try { // start a log cleaner at the start of each test - LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(mExecutionContextConfiguration. - getGlobalLogDirectory()), mExecutionContextConfiguration.getMaxLogDirectoriesPerProfile()); + LogDirectoryCleaner cleaner = new LogDirectoryCleaner( + new File(mExecutionContextConfiguration. + getGlobalLogDirectory()), + mExecutionContextConfiguration.getMaxLogDirectoriesPerProfile()); cleaner.setName("LogCleaner-" + mExecutionContextConfiguration. - getGlobalLogDirectory()); + getGlobalLogDirectory()); cleaner.setDaemon(true); cleaner.start(); - test = mTestQueue.poll(30, TimeUnit.MINUTES); - if(!execute) { - terminateExecutionContext(); - break; + + test.setStatus(Status.inProgress()); + test.setDequeueTime(System.currentTimeMillis()); + if (mExecutionContext == null) { + mExecutionContext = createExceutionContext(); } - if(test == null) { - terminateExecutionContext(); + test.setExecutionStartTime(System.currentTimeMillis()); + TestStartRequest startRequest = test.getStartRequest(); + String profile = startRequest.getProfile(); + File profileConfFile = new File(mExecutionContextConfiguration.getProfileDirectory(), + String.format("%s.properties", profile)); + LOG.info("Attempting to run using profile file: {}", profileConfFile); + if (!profileConfFile.isFile()) { + test.setStatus(Status.illegalArgument( + "Profile " + profile + " not found in directory " + + mExecutionContextConfiguration.getProfileDirectory())); + test.setExecutionFinishTime(System.currentTimeMillis()); } else { - test.setStatus(Status.inProgress()); - test.setDequeueTime(System.currentTimeMillis()); - if(mExecutionContext == null) { - mExecutionContext = createExceutionContext(); + File logDir = Dirs.create(new File(mExecutionContextConfiguration. + getGlobalLogDirectory(), test.getStartRequest().getTestHandle())); + File logFile = new File(logDir, "execution.txt"); + test.setOutputFile(logFile); + logStream = new PrintStream(logFile); + logger = new TestLogger(logStream, TestLogger.LEVEL.DEBUG); + + Context.ContextBuilder builder = new Context.ContextBuilder(); + builder.addPropertiesFile(profileConfFile); + + String environmentConfigurationFile = System.getProperty(SERVER_ENV_PROPERTIES, null); + if (environmentConfigurationFile != null) { + builder.addPropertiesFile(environmentConfigurationFile); } - test.setExecutionStartTime(System.currentTimeMillis()); - TestStartRequest startRequest = test.getStartRequest(); - String profile = startRequest.getProfile(); - File profileConfFile = new File(mExecutionContextConfiguration.getProfileDirectory(), - String.format("%s.properties", profile)); - LOG.info("Attempting to run using profile file: {}", profileConfFile); - if(!profileConfFile.isFile()) { - test.setStatus(Status.illegalArgument( - "Profile " + profile + " not found in directory " + - mExecutionContextConfiguration.getProfileDirectory())); - test.setExecutionFinishTime(System.currentTimeMillis()); + + Context ctx = builder.build(); + + TestConfiguration testConfiguration = TestConfiguration.withContext(ctx, logger); + testConfiguration.setPatch(startRequest.getPatchURL()); + testConfiguration.setJiraName(startRequest.getJiraName()); + testConfiguration.setClearLibraryCache(startRequest.isClearLibraryCache()); + LocalCommandFactory localCommandFactory = new LocalCommandFactory(logger); + workingDirSuffix = workingDirSuffixes.poll(6, TimeUnit.HOURS); + + PTest ptest = mPTestBuilder.build(testConfiguration, mExecutionContext, + test.getStartRequest().getTestHandle(), logDir, + localCommandFactory, new SSHCommandExecutor(logger), + new RSyncCommandExecutor(logger, + mExecutionContextConfiguration.getMaxRsyncThreads(), + localCommandFactory), logger, workingDirSuffix); + int result = ptest.run(); + if (result == Constants.EXIT_CODE_SUCCESS) { + test.setStatus(Status.ok()); } else { - File logDir = Dirs.create(new File(mExecutionContextConfiguration. - getGlobalLogDirectory(), test.getStartRequest().getTestHandle())); - File logFile = new File(logDir, "execution.txt"); - test.setOutputFile(logFile); - logStream = new PrintStream(logFile); - logger = new TestLogger(logStream, TestLogger.LEVEL.DEBUG); - - Context.ContextBuilder builder = new Context.ContextBuilder(); - builder.addPropertiesFile(profileConfFile); - - String environmentConfigurationFile = System.getProperty(SERVER_ENV_PROPERTIES, null); - if (environmentConfigurationFile != null) { - builder.addPropertiesFile(environmentConfigurationFile); - } - - Context ctx = builder.build(); - - TestConfiguration testConfiguration = TestConfiguration.withContext(ctx, logger); - testConfiguration.setPatch(startRequest.getPatchURL()); - testConfiguration.setJiraName(startRequest.getJiraName()); - testConfiguration.setClearLibraryCache(startRequest.isClearLibraryCache()); - LocalCommandFactory localCommandFactory = new LocalCommandFactory(logger); - PTest ptest = mPTestBuilder.build(testConfiguration, mExecutionContext, - test.getStartRequest().getTestHandle(), logDir, - localCommandFactory, new SSHCommandExecutor(logger), - new RSyncCommandExecutor(logger, mExecutionContextConfiguration.getMaxRsyncThreads(), - localCommandFactory), logger); - int result = ptest.run(); - if(result == Constants.EXIT_CODE_SUCCESS) { - test.setStatus(Status.ok()); - } else { - test.setStatus(Status.failed("Tests failed with exit code " + result)); - } - logStream.flush(); - // if all drones where abandoned on a host, try replacing them. - mExecutionContext.replaceBadHosts(); + test.setStatus(Status.failed("Tests failed with exit code " + result)); } + logStream.flush(); + // if all drones where abandoned on a host, try replacing them. + mExecutionContext.replaceBadHosts(); } } catch (Exception e) { LOG.error("Unxpected Error", e); - if(test != null) { + if (test != null) { test.setStatus(Status.failed("Tests failed with exception " + - e.getClass().getName() + ": " + e.getMessage())); - if(logger != null) { + e.getClass().getName() + ": " + e.getMessage())); + if (logger != null) { String msg = "Error executing " + test.getStartRequest().getTestHandle(); logger.error(msg, e); } @@ -158,13 +200,18 @@ public void run() { // if we died for any reason lets get a new set of hosts terminateExecutionContext(); } finally { - if(test != null) { + if (test != null) { test.setExecutionFinishTime(System.currentTimeMillis()); } - if(logStream != null) { + if (logStream != null) { logStream.flush(); logStream.close(); } + if (workingDirSuffix != null) { + if (!workingDirSuffixes.add(workingDirSuffix)) { + throw new IllegalStateException("Unable to return workingDirSuffix to queue"); + } + } } } } diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java index 4e6aa6df96..96b21e75d2 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.net.URL; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -94,7 +95,7 @@ public PTest(final TestConfiguration configuration, final ExecutionContext executionContext, final String buildTag, final File logDir, final LocalCommandFactory localCommandFactory, final SSHCommandExecutor sshCommandExecutor, final RSyncCommandExecutor rsyncCommandExecutor, - final Logger logger) throws Exception { + final Logger logger, final String workingDirSuffix) throws Exception { mConfiguration = configuration; mLogger = logger; mBuildTag = buildTag; @@ -108,7 +109,7 @@ public PTest(final TestConfiguration configuration, final ExecutionContext execu new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HostExecutor %d").build())); final File failedLogDir = Dirs.create(new File(logDir, "failed")); final File succeededLogDir = Dirs.create(new File(logDir, "succeeded")); - final File scratchDir = Dirs.createEmpty(new File(mExecutionContext.getLocalWorkingDirectory(), "scratch")); + final File scratchDir = Dirs.createEmpty(Paths.get(mExecutionContext.getLocalWorkingDirectory(), "scratch", buildTag).toFile()); File patchDir = Dirs.createEmpty(new File(logDir, "patches")); File patchFile = null; if(!configuration.getPatch().isEmpty()) { @@ -123,7 +124,7 @@ public PTest(final TestConfiguration configuration, final ExecutionContext execu put("buildTool", configuration.getBuildTool()). put("branch", configuration.getBranch()). put("clearLibraryCache", String.valueOf(configuration.isClearLibraryCache())). - put("workingDir", mExecutionContext.getLocalWorkingDirectory()). + put("workingDir", Paths.get(mExecutionContext.getLocalWorkingDirectory(), workingDirSuffix).toString()). put("buildTag", buildTag). put("logDir", logDir.getAbsolutePath()). put("javaHome", configuration.getJavaHome()). @@ -270,9 +271,9 @@ private void publishJiraComment(boolean error, List messages, SortedSet< public static class Builder { public PTest build(TestConfiguration configuration, ExecutionContext executionContext, String buildTag, File logDir, LocalCommandFactory localCommandFactory, SSHCommandExecutor sshCommandExecutor, - RSyncCommandExecutor rsyncCommandExecutor, Logger logger) throws Exception { + RSyncCommandExecutor rsyncCommandExecutor, Logger logger, String workingDirSuffix) throws Exception { return new PTest(configuration, executionContext, buildTag, logDir, localCommandFactory, sshCommandExecutor, - rsyncCommandExecutor, logger); + rsyncCommandExecutor, logger, workingDirSuffix); } } @@ -391,7 +392,7 @@ public static void main(String[] args) throws Exception { LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG); PTest ptest = new PTest(conf, executionContext, buildTag, logDir, localCommandFactory, new SSHCommandExecutor(LOG, localCommandFactory, conf.getSshOpts()), - new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG); + new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG, buildTag); exitCode = ptest.run(); } finally { if(executionContext != null) {