diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java index dabd657..2d0939d 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/api/server/TestExecutor.java @@ -53,8 +53,8 @@ private final BlockingQueue mTestQueue; private final PTest.Builder mPTestBuilder; private ExecutionContext mExecutionContext; - private boolean execute; + public TestExecutor(ExecutionContextConfiguration executionContextConfiguration, ExecutionContextProvider executionContextProvider, BlockingQueue testQueue, PTest.Builder pTestBuilder) { @@ -111,10 +111,12 @@ public void run() { testConfiguration.setPatch(startRequest.getPatchURL()); testConfiguration.setJiraName(startRequest.getJiraName()); testConfiguration.setClearLibraryCache(startRequest.isClearLibraryCache()); + LocalCommandFactory localCommandFactory = new LocalCommandFactory(logger); PTest ptest = mPTestBuilder.build(testConfiguration, mExecutionContext, test.getStartRequest().getTestHandle(), logDir, - new LocalCommandFactory(logger), new SSHCommandExecutor(logger), - new RSyncCommandExecutor(logger), logger); + localCommandFactory, new SSHCommandExecutor(logger), + new RSyncCommandExecutor(logger, mExecutionContextConfiguration.getMaxRsyncThreads(), + localCommandFactory), logger); int result = ptest.run(); if(result == Constants.EXIT_CODE_SUCCESS) { test.setStatus(Status.ok()); diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java index 093b675..03e44bf 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java @@ -345,9 +345,10 @@ public static void main(String[] args) throws Exception { executionContextProvider = executionContextConfiguration .getExecutionContextProvider(); executionContext = executionContextProvider.createExecutionContext(); + LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG); PTest ptest = new PTest(conf, executionContext, buildTag, logDir, - new LocalCommandFactory(LOG), new SSHCommandExecutor(LOG), - new RSyncCommandExecutor(LOG), LOG); + localCommandFactory, new SSHCommandExecutor(LOG), + new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG); exitCode = ptest.run(); } finally { if(executionContext != null) { diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/ExecutionContextConfiguration.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/ExecutionContextConfiguration.java index 9a631c7..945ad77 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/ExecutionContextConfiguration.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/ExecutionContextConfiguration.java @@ -37,11 +37,14 @@ public static final String WORKING_DIRECTORY = "workingDirectory"; public static final String PROFILE_DIRECTORY = "profileDirectory"; public static final String MAX_LOG_DIRS_PER_PROFILE = "maxLogDirectoriesPerProfile"; + private static final String MAX_RSYNC_THREADS = "maxRsyncThreads"; + private static final int MAX_RSYNC_THREADS_DEFAULT = 10; private final ExecutionContextProvider mExecutionContextProvider; private final String mWorkingDirectory; private final String mGlobalLogDirectory; private final String mProfileDirectory; private final int mMaxLogDirectoriesPerProfile; + private final int mMaxRsyncThreads; @VisibleForTesting public ExecutionContextConfiguration(Context context) @@ -52,6 +55,7 @@ public ExecutionContextConfiguration(Context context) Preconditions.checkArgument(!mProfileDirectory.isEmpty(), PROFILE_DIRECTORY + " is required"); mGlobalLogDirectory = Dirs.create(new File(mWorkingDirectory, "logs")).getAbsolutePath(); mMaxLogDirectoriesPerProfile = context.getInteger(MAX_LOG_DIRS_PER_PROFILE, 10); + mMaxRsyncThreads = context.getInteger(MAX_RSYNC_THREADS, MAX_RSYNC_THREADS_DEFAULT); String executionContextProviderBuilder = context.getString("executionContextProvider", FixedExecutionContextProvider.Builder.class.getName()).trim(); try { @@ -66,6 +70,9 @@ public ExecutionContextConfiguration(Context context) throw Throwables.propagate(e); } } + public int getMaxRsyncThreads() { + return mMaxRsyncThreads; + } public int getMaxLogDirectoriesPerProfile() { return mMaxLogDirectoriesPerProfile; } diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudComputeService.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudComputeService.java index 1ba6f0e..cfb10c3 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudComputeService.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudComputeService.java @@ -34,6 +34,8 @@ import org.jclouds.compute.domain.NodeMetadata.Status; import org.jclouds.compute.domain.Template; import org.jclouds.logging.log4j.config.Log4JLoggingModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Predicate; import com.google.common.base.Strings; @@ -41,6 +43,8 @@ import com.google.common.collect.Sets; public class CloudComputeService { + private static final Logger LOG = LoggerFactory + .getLogger(CloudComputeService.class); private final ComputeServiceContext mComputeServiceContext; private final ComputeService mComputeService; private final String mInstanceType; @@ -49,9 +53,12 @@ private final String mImageId; private final String mkeyPair; private final String mSecurityGroup; - private final float mMaxBid; + /** + * JClouds requests on-demand instances when null + */ + private final Float mMaxBid; public CloudComputeService(String apiKey, String accessKey, String instanceType, String groupName, - String imageId, String keyPair, String securityGroup, float maxBid) { + String imageId, String keyPair, String securityGroup, Float maxBid) { mInstanceType = instanceType; mGroupName = groupName; mImageId = imageId; @@ -90,15 +97,20 @@ public boolean apply(ComputeMetadata computeMetadata) { return nodeMetadata.getStatus() == Status.RUNNING && isPTestHost(nodeMetadata); } private boolean isPTestHost(NodeMetadata node) { + String result = "false non-ptest host"; if(groupName.equalsIgnoreCase(node.getGroup())) { + result = "true due to group " + groupName; return true; } if(Strings.nullToEmpty(node.getName()).startsWith(groupName)) { + result = "true due to name " + groupName; return true; } if(node.getTags().contains(groupTag)) { + result = "true due to tag " + groupName; return true; } + LOG.debug("Found node: " + node + ", Result: " + result); return false; } }; diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudExecutionContextProvider.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudExecutionContextProvider.java index 5fb45e0..9ef9cd6 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudExecutionContextProvider.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/CloudExecutionContextProvider.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashSet; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; @@ -58,6 +59,7 @@ public static final String API_KEY = "apiKey"; public static final String ACCESS_KEY = "accessKey"; public static final String NUM_HOSTS = "numHosts"; + public static final String MAX_HOSTS_PER_CREATE_REQUEST = "maxHostsPerCreateRequest"; public static final String GROUP_NAME = "groupName"; public static final String IMAGE_ID = "imageId"; public static final String KEY_PAIR = "keyPair"; @@ -74,9 +76,11 @@ private final String[] mSlaveLocalDirs; private final int mNumThreads; private final int mNumHosts; + private final int mMaxHostsPerCreateRequest; private final long mRetrySleepInterval; private final CloudComputeService mCloudComputeService; private final Map mTerminatedHosts; + private final Map mLiveHosts; private final ExecutorService mTerminationExecutor; private final File mWorkingDir; private final SSHCommandExecutor mSSHCommandExecutor; @@ -85,8 +89,9 @@ CloudExecutionContextProvider(String dataDir, int numHosts, CloudComputeService cloudComputeService, SSHCommandExecutor sshCommandExecutor, String workingDirectory, String privateKey, String user, String[] slaveLocalDirs, int numThreads, - long retrySleepInterval) throws IOException { + long retrySleepInterval, int maxHostsPerCreateRequest) throws IOException { mNumHosts = numHosts; + mMaxHostsPerCreateRequest = maxHostsPerCreateRequest; mCloudComputeService = cloudComputeService; mPrivateKey = privateKey; mUser = user; @@ -95,6 +100,7 @@ mRetrySleepInterval = retrySleepInterval; mSSHCommandExecutor = sshCommandExecutor; mWorkingDir = Dirs.create(new File(workingDirectory, "working")); + mLiveHosts = Collections.synchronizedMap(new HashMap()); mTerminatedHosts = Collections .synchronizedMap(new LinkedHashMap() { private static final long serialVersionUID = 1L; @@ -110,6 +116,7 @@ public boolean removeEldestEntry(Map.Entry entry) { } private void initialize() throws IOException { + LOG.info("CloudExecutionContextProvider maxHostsPerCreateRequest = " + mMaxHostsPerCreateRequest); Set hosts = Sets.newHashSet(); String host = null; mHostLog.seek(0); // should already be true @@ -164,7 +171,7 @@ public void replaceBadHosts(ExecutionContext executionContext) terminate(hostsToTerminate, true); Set nodes = createNodes(hostsToTerminate.size()); for (NodeMetadata node : nodes) { - executionContext.addHost(new Host(node.getHostname(), mUser, mSlaveLocalDirs, + executionContext.addHost(new Host(publicIp(node), mUser, mSlaveLocalDirs, mNumThreads)); } } @@ -179,8 +186,8 @@ public synchronized ExecutionContext createExecutionContext() Set nodes = createNodes(mNumHosts); Set hosts = Sets.newHashSet(); for (NodeMetadata node : nodes) { - hosts.add(new Host(node.getHostname(), mUser, mSlaveLocalDirs, - mNumThreads)); + hosts.add(new Host(publicIp(node), mUser, mSlaveLocalDirs, + mNumThreads)); } return new ExecutionContext(this, hosts, mWorkingDir.getAbsolutePath(), mPrivateKey); @@ -204,7 +211,7 @@ public synchronized ExecutionContext createExecutionContext() boolean error = false; LOG.info("Attempting to create " + numRequired + " nodes"); try { - result.addAll(mCloudComputeService.createNodes(Math.min(2, numRequired))); + result.addAll(mCloudComputeService.createNodes(Math.min(mMaxHostsPerCreateRequest, numRequired))); } catch (RunNodesException e) { error = true; LOG.warn("Error creating nodes", e); @@ -212,6 +219,9 @@ public synchronized ExecutionContext createExecutionContext() result.addAll(e.getSuccessfulNodes()); } result = verifyHosts(result); + for (NodeMetadata node : result) { + mLiveHosts.put(publicIpOrHostname(node), System.currentTimeMillis()); + } LOG.info("Successfully created " + result.size() + " nodes"); numRequired = numHosts - result.size(); if (numRequired > 0) { @@ -247,6 +257,23 @@ public void close() { } } + + private static String publicIpOrHostname(NodeMetadata node) { + Set publicIps = node.getPublicAddresses(); + if (publicIps.size() == 1) { + return Iterables.getOnlyElement(publicIps); + } + return node.getHostname(); + } + + private static String publicIp(NodeMetadata node) { + Set publicIps = node.getPublicAddresses(); + if (publicIps.size() == 1) { + return Iterables.getOnlyElement(publicIps); + } + throw new IllegalStateException("Node does not have exactly one public ip: " + node); + } + private Set verifyHosts(Set hosts) throws CreateHostsFailedException { final Set result = Collections.synchronizedSet(new HashSet()); @@ -258,7 +285,8 @@ public void close() { executorService.submit(new Runnable() { @Override public void run() { - SSHCommand command = new SSHCommand(mSSHCommandExecutor, mPrivateKey, mUser, node.getHostname(), 0, "pkill -f java"); + String ip = publicIpOrHostname(node); + SSHCommand command = new SSHCommand(mSSHCommandExecutor, mPrivateKey, mUser, ip, 0, "pkill -f java"); mSSHCommandExecutor.execute(command); if(command.getExitCode() == Constants.EXIT_CODE_UNKNOWN || command.getException() != null) { @@ -293,10 +321,13 @@ private synchronized void performBackgroundWork() { terminatedHosts.putAll(mTerminatedHosts); } for (NodeMetadata node : getRunningNodes()) { - if (terminatedHosts.containsKey(node.getHostname())) { + String ip = publicIpOrHostname(node); + if (terminatedHosts.containsKey(ip)) { terminateInternal(node); LOG.warn("Found zombie node: " + node + " previously terminated at " - + new Date(terminatedHosts.get(node.getHostname()))); + + new Date(terminatedHosts.get(ip))); + } else if(!mLiveHosts.containsKey(ip)) { + LOG.warn("Found zombie node: " + node + " previously unknown to ptest"); } } } @@ -318,6 +349,7 @@ private void terminateInternal(Set nodes) { private void terminateInternal(final NodeMetadata node) { LOG.info("Submitting termination for " + node); + mLiveHosts.remove(publicIpOrHostname(node)); mTerminationExecutor.submit(new Runnable() { @Override public void run() { @@ -328,9 +360,10 @@ public void run() { Thread.currentThread().interrupt(); } try { - LOG.info("Terminating " + node.getHostname()); - if (!mTerminatedHosts.containsKey(node.getHostname())) { - mTerminatedHosts.put(node.getHostname(), System.currentTimeMillis()); + String ip = publicIpOrHostname(node); + LOG.info("Terminating " + ip); + if (!mTerminatedHosts.containsKey(ip)) { + mTerminatedHosts.put(ip, System.currentTimeMillis()); } mCloudComputeService.destroyNode(node.getId()); } catch (Exception e) { @@ -343,8 +376,9 @@ public void run() { private void persistHostnamesToLog(Set nodes) { for (NodeMetadata node : nodes) { try { - if(!Strings.nullToEmpty(node.getHostname()).trim().isEmpty()) { - mHostLog.writeBytes(node.getHostname() + "\n"); + String ip = publicIpOrHostname(node); + if(!Strings.nullToEmpty(ip).trim().isEmpty()) { + mHostLog.writeBytes(ip + "\n"); } } catch (IOException e) { Throwables.propagate(e); @@ -364,7 +398,8 @@ private void terminate(Set hosts, boolean warnIfHostsNotFound) { LOG.info("Requesting termination of " + hosts); Set nodesToTerminate = Sets.newHashSet(); for (NodeMetadata node : getRunningNodes()) { - if (hosts.contains(node.getHostname())) { + String ip = publicIpOrHostname(node); + if (hosts.contains(ip)) { nodesToTerminate.add(node); } } @@ -391,6 +426,7 @@ private static CloudExecutionContextProvider create(Context context, API_KEY + " is required"); String accessKey = Preconditions.checkNotNull( context.getString(ACCESS_KEY), ACCESS_KEY + " is required"); + int maxHostsPerCreateRequest = context.getInteger(MAX_HOSTS_PER_CREATE_REQUEST, 2); Integer numHosts = context.getInteger(NUM_HOSTS, 8); Preconditions.checkArgument(numHosts > 0, NUM_HOSTS + " must be greater than zero"); @@ -401,10 +437,9 @@ private static CloudExecutionContextProvider create(Context context, KEY_PAIR + " is required"); String securityGroup = Preconditions.checkNotNull( context.getString(SECURITY_GROUP), SECURITY_GROUP + " is required"); - Float maxBid = Preconditions.checkNotNull(context.getFloat(MAX_BID), - MAX_BID + " is required"); - Preconditions.checkArgument(maxBid > 0, MAX_BID - + " must be greater than zero"); + Float maxBid = context.getFloat(MAX_BID); + Preconditions.checkArgument(maxBid == null || maxBid > 0, MAX_BID + + " must be null or greater than zero"); String privateKey = Preconditions.checkNotNull( context.getString(PRIVATE_KEY), PRIVATE_KEY + " is required"); String user = context.getString(USERNAME, "hiveptest"); @@ -417,7 +452,7 @@ private static CloudExecutionContextProvider create(Context context, instanceType, groupName, imageId, keyPair, securityGroup, maxBid); CloudExecutionContextProvider service = new CloudExecutionContextProvider( dataDir, numHosts, cloudComputeService, new SSHCommandExecutor(LOG), workingDirectory, - privateKey, user, localDirs, numThreads, 60); + privateKey, user, localDirs, numThreads, 60, maxHostsPerCreateRequest); return service; } } diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ssh/RSyncCommandExecutor.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ssh/RSyncCommandExecutor.java index bc62aa8..86c8796 100644 --- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ssh/RSyncCommandExecutor.java +++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ssh/RSyncCommandExecutor.java @@ -30,20 +30,20 @@ public class RSyncCommandExecutor { - private final Logger mLogger; + private final int mMaxRsyncThreads; private final LocalCommandFactory mLocalCommandFactory; private final Semaphore mSemaphore; private volatile boolean mShutdown; - public RSyncCommandExecutor(Logger logger, LocalCommandFactory localCommandFactory) { + public RSyncCommandExecutor(Logger logger, int maxRsyncThreads, LocalCommandFactory localCommandFactory) { mLogger = logger; + mMaxRsyncThreads = Math.min(Runtime.getRuntime().availableProcessors() * 5, maxRsyncThreads); mLocalCommandFactory = localCommandFactory; - mSemaphore = new Semaphore(Math.min(Runtime.getRuntime().availableProcessors() * 5, 10)); + mSemaphore = new Semaphore(mMaxRsyncThreads); mShutdown = false; - } - public RSyncCommandExecutor(Logger logger) { - this(logger, new LocalCommandFactory(logger)); + mLogger.info("RSyncCommandExecutor has " + mMaxRsyncThreads + " threads on " + Runtime.getRuntime() + .availableProcessors() + " cpus"); } /** @@ -105,4 +105,4 @@ boolean isShutdown() { public void shutdownNow() { this.mShutdown = true; } -} \ No newline at end of file +} diff --git a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/MockRSyncCommandExecutor.java b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/MockRSyncCommandExecutor.java index b55fa90..6347ce5 100644 --- a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/MockRSyncCommandExecutor.java +++ b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/MockRSyncCommandExecutor.java @@ -34,7 +34,7 @@ private final List mCommands; private final Map> mFailures; public MockRSyncCommandExecutor(Logger logger) { - super(logger); + super(logger, 0, null); mCommands = Lists.newArrayList(); mFailures = Maps.newHashMap(); } diff --git a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/context/TestCloudExecutionContextProvider.java b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/context/TestCloudExecutionContextProvider.java index 37734e7..35a669f 100644 --- a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/context/TestCloudExecutionContextProvider.java +++ b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/context/TestCloudExecutionContextProvider.java @@ -80,8 +80,11 @@ public void setup() throws Exception { when(template.getImage()).thenReturn(mock(Image.class)); when(template.getHardware()).thenReturn(mock(Hardware.class)); when(node1.getHostname()).thenReturn("node1"); + when(node1.getPublicAddresses()).thenReturn(Collections.singleton("1.1.1.1")); when(node2.getHostname()).thenReturn("node2"); + when(node2.getPublicAddresses()).thenReturn(Collections.singleton("1.1.1.2")); when(node3.getHostname()).thenReturn("node3"); + when(node3.getPublicAddresses()).thenReturn(Collections.singleton("1.1.1.3")); runNodesException = new RunNodesException("", 2, template, Collections.singleton(node1), Collections.emptyMap(), Collections.singletonMap(node2, new Exception("For testing"))); @@ -105,12 +108,12 @@ public void testRetry() throws Exception { } }); CloudExecutionContextProvider provider = new CloudExecutionContextProvider(dataDir, NUM_NODES, - cloudComputeService, sshCommandExecutor, workingDir, PRIVATE_KEY, USER, SLAVE_DIRS, 1, 0); + cloudComputeService, sshCommandExecutor, workingDir, PRIVATE_KEY, USER, SLAVE_DIRS, 1, 0, 1); ExecutionContext executionContext = provider.createExecutionContext(); Set hosts = Sets.newHashSet(); for(Host host : executionContext.getHosts()) { hosts.add(host.getName()); } - Assert.assertEquals(Sets.newHashSet("node1", "node3"), hosts); + Assert.assertEquals(Sets.newHashSet("1.1.1.1", "1.1.1.3"), hosts); } } diff --git a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/ssh/TestRSyncCommandExecutor.java b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/ssh/TestRSyncCommandExecutor.java index 173a57e..cad1577 100644 --- a/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/ssh/TestRSyncCommandExecutor.java +++ b/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/ssh/TestRSyncCommandExecutor.java @@ -50,7 +50,7 @@ public void setup() throws Exception { public void testShutdownBeforeWaitFor() throws Exception { LocalCommand localCommand = mock(LocalCommand.class); localCommandFactory.setInstance(localCommand); - RSyncCommandExecutor executor = new RSyncCommandExecutor(LOG, localCommandFactory); + RSyncCommandExecutor executor = new RSyncCommandExecutor(LOG, 1, localCommandFactory); Assert.assertFalse(executor.isShutdown()); executor.shutdownNow(); RSyncCommand command = new RSyncCommand(executor, "privateKey", "user", "host", 1, "local", "remote", RSyncCommand.Type.FROM_LOCAL); @@ -66,7 +66,7 @@ public void testShutdownBeforeWaitFor() throws Exception { public void testShutdownDuringWaitFor() throws Exception { LocalCommand localCommand = mock(LocalCommand.class); localCommandFactory.setInstance(localCommand); - final RSyncCommandExecutor executor = new RSyncCommandExecutor(LOG, localCommandFactory); + final RSyncCommandExecutor executor = new RSyncCommandExecutor(LOG, 1, localCommandFactory); Assert.assertFalse(executor.isShutdown()); when(localCommand.getExitCode()).thenAnswer(new Answer() { @Override @@ -84,4 +84,4 @@ public Integer answer(InvocationOnMock invocation) throws Throwable { } verify(localCommand, never()).kill(); } -} \ No newline at end of file +}