Index: modules/core/src/test/java/org/apache/ignite/internal/GridNestedTaskP2PSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/GridNestedTaskP2PSelfTest.java (revision ) +++ modules/core/src/test/java/org/apache/ignite/internal/GridNestedTaskP2PSelfTest.java (revision ) @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.DeploymentMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestExternalClassLoader; +import org.apache.ignite.testframework.config.GridTestProperties; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + +/** + * Tests nested task P2P loading. + */ +public class GridNestedTaskP2PSelfTest extends GridCommonAbstractTest { + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String EXT_JOB_CLS_NAME = "org.apache.ignite.tests.p2p.ExternalJob"; + + /** */ + private static final String EXT_JOB_NESTED_CLS_NAME = "org.apache.ignite.tests.p2p.ExternalNestedJob"; + + /** */ + private static final long COMPUTE_LOOP_DURATION = 40_000; + + /** URL of classes. */ + private static final URL[] URLS; + + /** Current deployment mode. Used in {@link #getConfiguration(String)}. */ + private DeploymentMode depMode; + + private final AtomicBoolean failure = new AtomicBoolean(false); + + @Override protected boolean isMultiJvm() { + return true; + } + + GridTestExternalClassLoader ldr = new GridTestExternalClassLoader(URLS); + + /** + * Initialize URLs. + */ + static { + try { + URLS = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; + } + catch (MalformedURLException e) { + throw new RuntimeException("Define property p2p.uri.cls", e); + } + } + + @Override protected long getTestTimeout() { + return 60 * 1000; + } + + /** + * + */ + public GridNestedTaskP2PSelfTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new StopNodeFailureHandler(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration c = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setDeploymentMode(depMode); + + return c; + } + + /** + * Test {@link DeploymentMode#CONTINUOUS} mode. + * + * @throws Exception if error occur. + */ + public void testContinuousMode() throws Exception { + depMode = DeploymentMode.CONTINUOUS; + + nestedJobTest(); + } + + /** + * Test {@link DeploymentMode#SHARED} mode. + * + * @throws Exception if error occur. + */ + public void testSharedMode() throws Exception { + depMode = DeploymentMode.SHARED; + + nestedJobTest(); + } + + /** @throws Exception If failed. */ + private void nestedJobTest() throws Exception { + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + try { + ldr.loadClass(EXT_JOB_NESTED_CLS_NAME); + + final IgniteCallable job = (IgniteCallable)ldr.loadClass(EXT_JOB_CLS_NAME).newInstance(); + + IgniteFuture f = waitForLocalEvent(g1.events(), new P1() { + @Override public boolean apply(Event e) { + info("Received grid event: " + e); + + assertTrue(false); + + failure.set(true); + + return true; + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + + long startTime = System.currentTimeMillis(); + while(!failure.get() && System.currentTimeMillis() < startTime + COMPUTE_LOOP_DURATION) { + IgniteFuture future = g1.compute(g1.cluster().forRemotes()).callAsync(job); + future.get(1000); + } + } finally { + stopAllGrids(); + } + } + + /** + * @return Additional JVM args for remote instances. + */ + @Override + protected List additionalRemoteJvmArgs() { + return Collections.singletonList("-XX:MaxMetaspaceSize=32m"); + } +} Index: modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalNestedJob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalNestedJob.java (revision ) +++ modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalNestedJob.java (revision ) @@ -0,0 +1,17 @@ +package org.apache.ignite.tests.p2p; + +import org.apache.ignite.Ignite; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; + +public class ExternalNestedJob implements IgniteCallable { + @IgniteInstanceResource + Ignite ignite; + + @Override public Object call() throws Exception { + + ignite.log().info("Nseted call 2 executed on: " + ignite.name()); + + return null; + } +} \ No newline at end of file Index: modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalJob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalJob.java (revision ) +++ modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/ExternalJob.java (revision ) @@ -0,0 +1,19 @@ +package org.apache.ignite.tests.p2p; + +import org.apache.ignite.Ignite; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; + +public class ExternalJob implements IgniteCallable { + @IgniteInstanceResource + Ignite ignite; + + @Override public Object call() throws Exception { + + ignite.log().info("My call executed on: " + ignite.name()); + + ignite.compute(ignite.cluster().forServers().forRemotes()).call(new ExternalNestedJob()); + + return null; + } +} \ No newline at end of file