Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentStartTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentStartTest.java (revision ) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentStartTest.java (revision ) @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryConcurrentStartTest extends GridCommonAbstractTest { + /** */ + public static final int KEYS = 10; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 1; + + /** */ + public static final int ITERATION_CNT = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + //cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + + //client = true; + + //startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackup() throws Exception { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doConcurrentStart(ccfg); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void doConcurrentStart(final CacheConfiguration ccfg) throws Exception { + final IgniteCache cache = ignite(0).createCache(ccfg); + + int partition = 0; + + final List keys = keysByPartition(partition, 10, affinity(cache)); + + TestCacheStore.blockedKey = keys.get(0); + + IgniteInternalFuture f = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + cache.put(TestCacheStore.blockedKey, 1); + + return null; + } + }, "putter"); + + boolean hang = false; + + try { + f.get(200); + } + catch (IgniteFutureTimeoutCheckedException e) { + hang = true; + } + + assertTrue("Operation was not stuck", hang); + + final List evts = new ArrayList<>(); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + for (Object o : iterable) + evts.add(o); + } + }); + + QueryCursor queryCur = cache.query(qry); + + int middle = 5; + + for (int i = 1; i < middle; i++) + cache.put(keys.get(i), ThreadLocalRandom.current().nextInt(100)); + + TestCacheStore.latch.countDown(); + + for (int i = middle; i < keys.size(); i++) + cache.put(keys.get(i), ThreadLocalRandom.current().nextInt(100)); + + final int expSize = keys.size() - 1; + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return evts.size() == expSize; + } + }, 1_000); + + assertEquals("Unexpected count of events", expSize, evts.size()); + + queryCur.close(); + } + + /** + * @param partition Partition. + * @param cntKeys Count of keys. + * @param aff Affinity. + * @return List of keys per partition. + */ + @NotNull private List keysByPartition(int partition, int cntKeys, Affinity aff) { + final List keys = new ArrayList<>(); + + for (int i = 0; i < 1_000_000; i++) { + if (aff.partition(i) == partition) + keys.add(i); + + if (keys.size() == cntKeys) + break; + } + return keys; + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param writeMode Cache write mode. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + CacheWriteSynchronizationMode writeMode) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(writeMode); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory((Factory>) + FactoryBuilder.factoryOf(TestCacheStore.class)); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class TestCacheStore extends CacheStoreAdapter { + /** */ + private static volatile Integer blockedKey; + + /** */ + private static CountDownLatch latch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + throw new UnsupportedOperationException("unsupported load"); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) + throws CacheWriterException { + if (blockedKey.equals(entry.getKey())) { + try { + U.await(latch); + } + catch (IgniteInterruptedCheckedException e) { + throw new CacheWriterException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + } +}