Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.store.CacheLocalStore; +import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.resources.CacheNameResource; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public abstract class GridCacheAbstractCacheStorePrepareTest extends GridCommonAbstractTest { + /** Local jdbc store on cache0 on node 0. */ + private static final TestLocalCommitStore LOCAL_COMMIT_STORE_0_0 = new TestLocalCommitStore(); + + /** Local jdbc store on cache0 on node 1. */ + private static final TestLocalCommitStore LOCAL_COMMIT_STORE_0_1 = new TestLocalCommitStore(); + + /** Local non-jdbc store on cache1 on node 0. */ + private static final TestLocalStore LOCAL_STORE_1_0 = new TestLocalStore(); + + /** Local non-jdbc store on cache1 on node 1. */ + private static final TestLocalStore LOCAL_STORE_1_1 = new TestLocalStore(); + + /** Global jdbc store on cache0 on node 0. */ + private static final TestGlobalCommitStore GLOBAL_COMMIT_STORE_0_0 = new TestGlobalCommitStore(); + + /** Global jdbc store on cache0 on node 1. */ + private static final TestGlobalCommitStore GLOBAL_COMMIT_STORE_0_1 = new TestGlobalCommitStore(); + + /** Global non-jdbc store on cache1 on node 0. */ + private static final TestGlobalStore GLOBAL_STORE_1_0 = new TestGlobalStore(); + + /** Global non-jdbc store on cache1 on node 1. */ + private static final TestGlobalStore GLOBAL_STORE_1_1 = new TestGlobalStore(); + + /** Cache, configured with local store, which supports prepare-commit. */ + private static final String CACHE_WITH_LOCAL_COMMIT_STORE = "cache0"; + + /** Cache, configured with local store, which doesn't support prepare-commit */ + private static final String CACHE_WITH_LOCAL_STORE = "cache1"; + + /** Cache, configured with global store, which supports prepare-commit. */ + private static final String CACHE_WITH_GLOBAL_COMMIT_STORE = "cache2"; + + /** Cache, configured with global store, which doesn't support prepare-commit. */ + private static final String CACHE_WITH_GLOBAL_STORE = "cache3"; + + /** + * @param cacheName Cache name. + * @return Configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cache(String cacheName) { + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(cacheName); + cacheCfg.setCacheMode(getCacheMode()); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setNearConfiguration(nearConfiguration()); + cacheCfg.setCacheStoreFactory(new StoreFactory()); + cacheCfg.setWriteBehindEnabled(writeBehindEnabled()); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setWriteThrough(true); + cacheCfg.setReadThrough(true); + cacheCfg.setBackups(1); + cacheCfg.setWriteBehindFlushFrequency(50); + cacheCfg.setWriteBehindFlushThreadCount(Runtime.getRuntime().availableProcessors()); + + return cacheCfg; + } + + /** + * + */ + protected boolean writeBehindEnabled() { + return false; + } + + /** + * @return NearCacheConfiguration. + */ + protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** + * @return Cache mode. + */ + protected abstract CacheMode getCacheMode(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cacheCfg = cache(CACHE_WITH_LOCAL_COMMIT_STORE); + cacheCfg.setAffinity(new RendezvousAffinityFunction()); + + CacheConfiguration cacheCfg2 = cache(CACHE_WITH_LOCAL_STORE); + cacheCfg2.setAffinity(new RendezvousAffinityFunction()); + + CacheConfiguration cacheCfg3 = cache(CACHE_WITH_GLOBAL_COMMIT_STORE); + cacheCfg2.setAffinity(new RendezvousAffinityFunction()); + + CacheConfiguration cacheCfg4 = cache(CACHE_WITH_GLOBAL_STORE); + cacheCfg2.setAffinity(new RendezvousAffinityFunction()); + + cfg.setCommunicationSpi(new BlockTcpCommunicationSpi()); + cfg.setCacheConfiguration(cacheCfg, cacheCfg2, cacheCfg3, cacheCfg4); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + + awaitPartitionMapExchange(true, true, null); + + Thread.sleep(500); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Test populate global cache stores correctly. + * + * Note, that caches with local and non-local stores can't be enlisted in one transaction). + * + * @throws IgniteCheckedException If failed. + */ + public void testCorrectGlobalStoresPersistence() throws IgniteCheckedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache1 = ignite0.cache(CACHE_WITH_GLOBAL_COMMIT_STORE); + IgniteCache cache2 = ignite0.cache(CACHE_WITH_GLOBAL_STORE); + + Integer key1 = primaryKey(cache1); + Integer key2 = primaryKey(cache2); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + TransactionProxyImpl tx = (TransactionProxyImpl)ignite0.transactions().txStart(concurrency, isolation); + + cache1.put(key1, key1); + cache2.put(key2, key2); + + tx.tx().prepare(true); + + U.sleep(100); + + assertEquals(null, GLOBAL_COMMIT_STORE_0_0.load(key1)); + assertEquals(null, GLOBAL_COMMIT_STORE_0_1.load(key1)); + assertEquals(writeBehindEnabled() ? 0 : 1, GLOBAL_COMMIT_STORE_0_0.writeCount.getAndSet(0)); + assertEquals(0, GLOBAL_COMMIT_STORE_0_1.writeCount.get()); + + assertEquals(null, GLOBAL_STORE_1_0.load(key2)); + assertEquals(null, GLOBAL_STORE_1_1.load(key2)); + assertEquals(0, GLOBAL_STORE_1_0.writeCount.get()); + assertEquals(0, GLOBAL_STORE_1_1.writeCount.get()); + + tx.commit(); + + flushStoreIfWriteBehindEnabled(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = GLOBAL_COMMIT_STORE_0_0.load(key1) == key1; + + storesFlushed &= GLOBAL_COMMIT_STORE_0_1.load(key1) == key1; + + storesFlushed &= GLOBAL_STORE_1_0.load(key2) == key2; + + storesFlushed &= GLOBAL_STORE_1_1.load(key2) == key2; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(writeBehindEnabled() ? 1 : 0, GLOBAL_COMMIT_STORE_0_0.writeCount.getAndSet(0)); + assertEquals(0, GLOBAL_COMMIT_STORE_0_1.writeCount.get()); + assertEquals(key1, cache1.get(key1)); + + assertEquals(1, GLOBAL_STORE_1_0.writeCount.getAndSet(0)); + assertEquals(0, GLOBAL_STORE_1_1.writeCount.get()); + assertEquals(key2, cache2.get(key2)); + + cache1.remove(key1); + cache2.remove(key2); + + TestGlobalCommitStore.storage.clear(); + TestGlobalStore.storage.clear(); + } + } + } + + /** + * Test populate local cache stores correctly. + * + * Note, that caches with local and non-local stores can't be enlisted in one transaction). + * + * @throws IgniteCheckedException If failed. + */ + public void testCorrectLocalStoresPersistence() throws IgniteCheckedException, InterruptedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache1 = ignite0.cache(CACHE_WITH_LOCAL_STORE); + IgniteCache cache2 = ignite0.cache(CACHE_WITH_LOCAL_COMMIT_STORE); + + Integer key1 = primaryKey(cache1); + Integer key2 = primaryKey(cache2); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + TransactionProxyImpl tx = (TransactionProxyImpl)ignite0.transactions().txStart(concurrency, isolation); + + cache1.put(key1, key1); + cache2.put(key2, key2); + + tx.tx().prepare(true); + + U.sleep(100); + + assertEquals(null, LOCAL_COMMIT_STORE_0_0.load(key1)); + assertEquals(null, LOCAL_COMMIT_STORE_0_1.load(key1)); + assertEquals(writeBehindEnabled() ? 0 : 1, LOCAL_COMMIT_STORE_0_0.writeCount.getAndSet(0)); + assertEquals(writeBehindEnabled() ? 0 : 1, LOCAL_COMMIT_STORE_0_1.writeCount.getAndSet(0)); + + assertEquals(null, LOCAL_STORE_1_0.load(key2)); + assertEquals(null, LOCAL_STORE_1_1.load(key2)); + assertEquals(0, LOCAL_STORE_1_0.writeCount.get()); + assertEquals(0, LOCAL_STORE_1_1.writeCount.get()); + + tx.commit(); + + flushStoreIfWriteBehindEnabled(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = LOCAL_COMMIT_STORE_0_0.load(key1) == key1; + + storesFlushed &= LOCAL_COMMIT_STORE_0_1.load(key1) == key1; + + storesFlushed &= LOCAL_STORE_1_0.load(key2) == key2; + + storesFlushed &= LOCAL_STORE_1_0.load(key2) == key2; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(writeBehindEnabled() ? 1 : 0, LOCAL_COMMIT_STORE_0_0.writeCount.getAndSet(0)); + assertEquals(writeBehindEnabled() ? 1 : 0, LOCAL_COMMIT_STORE_0_1.writeCount.getAndSet(0)); + assertEquals(key1, cache1.get(key1)); + + assertEquals(1, LOCAL_STORE_1_0.writeCount.getAndSet(0)); + assertEquals(1, LOCAL_STORE_1_1.writeCount.getAndSet(0)); + assertEquals(key2, cache2.get(key2)); + + cache1.remove(key1); + cache2.remove(key2); + + LOCAL_COMMIT_STORE_0_0.storage.clear(); + LOCAL_COMMIT_STORE_0_1.storage.clear(); + LOCAL_STORE_1_0.storage.clear(); + LOCAL_STORE_1_1.storage.clear(); + } + } + } + + /** + * Tests that store is not affected if tx prepare fails. + * + * @throws IgniteCheckedException If failed. + */ + public void testTxFailOnLocalPrimaryNodeWithLocalStore() throws IgniteCheckedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache0 = ignite0.cache(CACHE_WITH_LOCAL_COMMIT_STORE); + IgniteCache cache1 = ignite0.cache(CACHE_WITH_LOCAL_STORE); + + commSpi(ignite(1)).blockMessage(GridDhtTxPrepareResponse.class); + + Integer key0 = primaryKey(cache0); + Integer key1 = primaryKey(cache1); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + Transaction tx = ignite0.transactions().txStart(concurrency, isolation, 700, 2); + + cache0.put(key0, key0); + cache1.put(key1, key1); + + tx.commitAsync(); + + U.sleep(800); + + waitTxFinish(tx); + + tx.close(); + + flushStoreIfWriteBehindEnabled(); + + if (writeBehindEnabled()) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = LOCAL_COMMIT_STORE_0_0.load(key0) == key0; + + storesFlushed &= LOCAL_COMMIT_STORE_0_1.load(key0) == key0; + + storesFlushed &= LOCAL_STORE_1_0.load(key1) == key1; + + storesFlushed &= LOCAL_STORE_1_0.load(key1) == key1; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(key1, cache1.get(key1)); + assertEquals(key0, cache0.get(key0)); + } + else { + assertEquals(null, LOCAL_COMMIT_STORE_0_0.load(key0)); + assertEquals(null, LOCAL_COMMIT_STORE_0_1.load(key0)); + assertNull(cache0.get(key0)); + + assertEquals(null, LOCAL_STORE_1_0.load(key1)); + assertEquals(null, LOCAL_STORE_1_1.load(key1)); + assertNull(cache1.get(key1)); + } + + cache0.remove(key0); + cache1.remove(key1); + + LOCAL_COMMIT_STORE_0_0.storage.clear(); + LOCAL_COMMIT_STORE_0_1.storage.clear(); + LOCAL_STORE_1_0.storage.clear(); + LOCAL_STORE_1_1.storage.clear(); + } + } + + LOCAL_COMMIT_STORE_0_0.writeCount.set(0); + LOCAL_COMMIT_STORE_0_1.writeCount.set(0); + LOCAL_STORE_1_0.writeCount.set(0); + LOCAL_STORE_1_1.writeCount.set(0); + + commSpi(ignite(1)).unblockMessage(); + } + + /** + * Tests that store is not affected if tx prepare fails. + * + * @throws IgniteCheckedException If failed. + */ + public void testTxFailOnLocalPrimaryNodeWithGlobalStore() throws IgniteCheckedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache1 = ignite0.cache(CACHE_WITH_GLOBAL_COMMIT_STORE); + IgniteCache cache2 = ignite0.cache(CACHE_WITH_GLOBAL_STORE); + + commSpi(ignite0).blockMessage(GridDhtTxPrepareRequest.class); + + Integer key1 = primaryKey(cache1); + Integer key2 = primaryKey(cache2); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + + Transaction tx = ignite0.transactions().txStart(concurrency, isolation, 700, 2); + + cache1.put(key1, key1); + cache2.put(key2, key2); + + tx.commitAsync(); + + U.sleep(800); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return TransactionState.ROLLED_BACK.equals(tx.state()); + } + }, getTestTimeout()); + + tx.close(); + + flushStoreIfWriteBehindEnabled(); + + if (writeBehindEnabled()) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = GLOBAL_COMMIT_STORE_0_0.load(key1) == key1; + + storesFlushed &= GLOBAL_COMMIT_STORE_0_1.load(key1) == key1; + + storesFlushed &= GLOBAL_STORE_1_0.load(key2) == key2; + + storesFlushed &= GLOBAL_STORE_1_0.load(key2) == key2; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(key1, cache1.get(key1)); + assertEquals(key2, cache2.get(key2)); + } + else { + assertEquals(null, GLOBAL_COMMIT_STORE_0_0.load(key1)); + assertEquals(null, GLOBAL_COMMIT_STORE_0_1.load(key1)); + assertNull(cache1.get(key1)); + + assertEquals(null, GLOBAL_STORE_1_0.load(key2)); + assertEquals(null, GLOBAL_STORE_1_1.load(key2)); + assertEquals(null, cache2.get(key2)); + } + + cache1.remove(key1); + cache2.remove(key2); + + TestGlobalCommitStore.storage.clear(); + TestGlobalStore.storage.clear(); + } + } + + GLOBAL_COMMIT_STORE_0_0.writeCount.set(0); + GLOBAL_COMMIT_STORE_0_1.writeCount.set(0); + GLOBAL_STORE_1_0.writeCount.set(0); + GLOBAL_STORE_1_1.writeCount.set(0); + + commSpi(ignite0).unblockMessage(); + } + + /** + * Tests that store is not affected if store prepare is failed. + * + * @throws IgniteCheckedException If failed. + */ + public void testPDSPrepareFailOnLocalPrimaryNodeWithLocalStore() throws IgniteCheckedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache0 = ignite0.cache(CACHE_WITH_LOCAL_COMMIT_STORE); + IgniteCache cache1 = ignite0.cache(CACHE_WITH_LOCAL_STORE); + + Integer key0 = primaryKey(cache0); + Integer key1 = primaryKey(cache1); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + LOCAL_COMMIT_STORE_0_0.writeFails = true; + + Transaction tx = ignite0.transactions().txStart(concurrency, isolation); + + cache0.put(key0, key0); + cache1.put(key1, key1); + + try { + tx.commit(); + } + catch (Exception ignore) { + //No-op. + } + + waitTxFinish(tx); + + tx.close(); + + flushStoreIfWriteBehindEnabled(); + + if (writeBehindEnabled()) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = LOCAL_COMMIT_STORE_0_0.load(key0) == key0; + + storesFlushed &= LOCAL_COMMIT_STORE_0_1.load(key0) == key0; + + storesFlushed &= LOCAL_STORE_1_0.load(key1) == key1; + + storesFlushed &= LOCAL_STORE_1_0.load(key1) == key1; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(key0, cache0.get(key0)); + assertEquals(key1, cache1.get(key1)); + } + else { + assertEquals(null, LOCAL_COMMIT_STORE_0_0.load(key0)); + assertEquals(null, LOCAL_COMMIT_STORE_0_1.load(key0)); + assertNull(cache0.get(key0)); + + assertEquals(null, LOCAL_STORE_1_0.load(key1)); + assertEquals(null, LOCAL_STORE_1_1.load(key1)); + assertNull(cache1.get(key1)); + } + + cache0.remove(key0); + cache1.remove(key1); + + LOCAL_COMMIT_STORE_0_0.storage.clear(); + LOCAL_COMMIT_STORE_0_1.storage.clear(); + LOCAL_STORE_1_0.storage.clear(); + LOCAL_STORE_1_1.storage.clear(); + } + } + + LOCAL_COMMIT_STORE_0_0.writeCount.set(0); + LOCAL_COMMIT_STORE_0_1.writeCount.set(0); + LOCAL_STORE_1_0.writeCount.set(0); + LOCAL_STORE_1_1.writeCount.set(0); + } + + /** + * Tests that store is not affected if store prepare is failed. + * + * @throws IgniteCheckedException If failed. + */ + public void testPDSPrepareFailOnLocalPrimaryNodeWithGlobalStore() throws IgniteCheckedException { + IgniteEx ignite0 = grid(0); + + IgniteCache cache2 = ignite0.cache(CACHE_WITH_GLOBAL_COMMIT_STORE); + IgniteCache cache3 = ignite0.cache(CACHE_WITH_GLOBAL_STORE); + + Integer key2 = primaryKey(cache2); + Integer key3 = primaryKey(cache3); + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + GLOBAL_COMMIT_STORE_0_0.writeFails = true; + + Transaction tx = ignite0.transactions().txStart(concurrency, TransactionIsolation.REPEATABLE_READ); + + cache2.put(key2, key2); + cache3.put(key3, key3); + + try { + tx.commit(); + } + catch (Exception ignore) { + //No-op. + } + + waitTxFinish(tx); + + tx.close(); + + flushStoreIfWriteBehindEnabled(); + + if (writeBehindEnabled()) { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + boolean storesFlushed; + + storesFlushed = GLOBAL_COMMIT_STORE_0_0.load(key2) == key2; + + storesFlushed &= GLOBAL_STORE_1_0.load(key3) == key3; + + return storesFlushed; + } + }, getTestTimeout()); + + assertEquals(key2, cache2.get(key2)); + assertEquals(key3, cache3.get(key3)); + } + else { + assertEquals(null, GLOBAL_COMMIT_STORE_0_0.load(key2)); + assertEquals(null, GLOBAL_COMMIT_STORE_0_1.load(key2)); + assertEquals(null, cache2.get(key2)); + + assertEquals(null, GLOBAL_STORE_1_0.load(key3)); + assertEquals(null, GLOBAL_STORE_1_1.load(key3)); + assertEquals(null, cache3.get(key3)); + } + + cache2.remove(key2); + cache3.remove(key3); + + TestGlobalCommitStore.storage.clear(); + TestGlobalStore.storage.clear(); + } + + GLOBAL_COMMIT_STORE_0_0.writeCount.set(0); + GLOBAL_COMMIT_STORE_0_1.writeCount.set(0); + GLOBAL_STORE_1_0.writeCount.set(0); + GLOBAL_STORE_1_1.writeCount.set(0); + } + + /** + * @param ignite Node. + * @return Communication SPI. + */ + protected BlockTcpCommunicationSpi commSpi(Ignite ignite) { + return ((BlockTcpCommunicationSpi)ignite.configuration().getCommunicationSpi()); + } + + /** + * + */ + @CacheLocalStore + private static class TestLocalCommitStore extends MapBasedCacheStore { + /** {@inheritDoc} */ +// @Override public boolean supports2PhaseCommit() { +// return true; +// } + } + + /** + * + */ + @CacheLocalStore + private static class TestLocalStore extends MapBasedCacheStore { + } + + /** + * + */ + private static class TestGlobalCommitStore extends MapBasedCacheStore { + /** Storage. */ + public static HashMap storage = new HashMap<>(); + + /** {@inheritDoc} */ + @Override protected HashMap storage() { + return storage; + } + + /** {@inheritDoc} */ +// @Override public boolean supports2PhaseCommit() { +// return true; +// } + } + + /** + * + */ + private static class TestGlobalStore extends MapBasedCacheStore { + /** Storage. */ + public static HashMap storage = new HashMap<>(); + + /** {@inheritDoc} */ + @Override protected HashMap storage() { + return storage; + } + } + + /** + * Cache store, which stores data in HashMap. + * If 2-phase-commit is supported, than data is put into state on prepare step. + * On commit step, data is persisted into storage. + */ + private static class MapBasedCacheStore extends CacheStoreAdapter { + /** Storage. */ + public HashMap storage = new HashMap<>(); + + /** State. */ + public HashMap state = new HashMap<>(); + + /** Whether commit step must fail with exception. */ + public boolean commitFails; + + /** Whether write must fail with exception. */ + public boolean writeFails; + + /** Write count. Write is performed on prepare step if 2-pahe-commit is enabled. */ + public AtomicInteger writeCount = new AtomicInteger(0); + + private boolean writeBehindEnabled; + + /** + * + */ + protected HashMap storage() { + return storage; + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + Object val = storage().get(key); + + if (val instanceof IgniteBiTuple) + return ((IgniteBiTuple)val).get1(); + else + return (Integer)val; + } + + StackTraceElement[] lastThread; + + /** {@inheritDoc} */ + @Override public void write( + Cache.Entry entry) throws CacheWriterException { + if (writeFails) { + writeFails = false; + + throw new CacheWriterException(); + } + +// if (supports2PhaseCommit() && !writeBehindEnabled) +// state.put(entry.getKey(), entry.getValue()); +// else +// storage().put(entry.getKey(), entry.getValue()); + + writeCount.incrementAndGet(); + + lastThread = Thread.currentThread().getStackTrace(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + state.clear(); + + storage().clear(); + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) { + super.sessionEnd(commit); + +// if (!supports2PhaseCommit()) +// return; + + if (commit) { + if (commitFails) + throw new CacheWriterException(); + + storage().putAll(state); + } + else + state.clear(); + } + + /** + * @param writeBehindEnabled Write behind enabled. + */ + public MapBasedCacheStore withWriteBehindEnabled(boolean writeBehindEnabled) { + this.writeBehindEnabled = writeBehindEnabled; + + return this; + } + } + + /** + * + */ + static class StoreFactory implements Factory { + /** */ + @IgniteInstanceResource + private Ignite node; + + /** */ + @CacheNameResource + private String cacheName; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + String igniteInstanceName = node.configuration().getIgniteInstanceName(); + + boolean writeBehindEnabled = false; + + for (CacheConfiguration configuration : node.configuration().getCacheConfiguration()) { + if (configuration.getName().equals(cacheName)) { + writeBehindEnabled = configuration.isWriteBehindEnabled(); + + break; + } + } + + if (CACHE_WITH_LOCAL_COMMIT_STORE.equals(cacheName) && igniteInstanceName.endsWith("0")) + return LOCAL_COMMIT_STORE_0_0.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_LOCAL_COMMIT_STORE.equals(cacheName) && igniteInstanceName.endsWith("1")) + return LOCAL_COMMIT_STORE_0_1.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_LOCAL_STORE.equals(cacheName) && igniteInstanceName.endsWith("0")) + return LOCAL_STORE_1_0.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_LOCAL_STORE.equals(cacheName) && igniteInstanceName.endsWith("1")) + return LOCAL_STORE_1_1.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_GLOBAL_COMMIT_STORE.equals(cacheName) && igniteInstanceName.endsWith("0")) + return GLOBAL_COMMIT_STORE_0_0.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_GLOBAL_COMMIT_STORE.equals(cacheName) && igniteInstanceName.endsWith("1")) + return GLOBAL_COMMIT_STORE_0_1.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_GLOBAL_STORE.equals(cacheName) && igniteInstanceName.endsWith("0")) + return GLOBAL_STORE_1_0.withWriteBehindEnabled(writeBehindEnabled); + else if (CACHE_WITH_GLOBAL_STORE.equals(cacheName) && igniteInstanceName.endsWith("1")) + return GLOBAL_STORE_1_1.withWriteBehindEnabled(writeBehindEnabled); + + return null; + } + } + + /** + * + */ + protected static class BlockTcpCommunicationSpi extends TcpCommunicationSpi { + /** Message class to block. */ + volatile Class msgCls; + + /** How much time message was blocked. */ + public AtomicInteger blockedTimes = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) + throws IgniteSpiException { + Class msgCls0 = msgCls; + + if (msgCls0 != null && msg instanceof GridIoMessage + && ((GridIoMessage)msg).message().getClass().equals(msgCls)) { + blockedTimes.incrementAndGet(); + + return; + } + + super.sendMessage(node, msg, ackC); + } + + /** + * @param clazz Class of messages which will be block. + */ + public void blockMessage(Class clazz) { + msgCls = clazz; + } + + /** + * Unlock all message. + */ + public void unblockMessage() { + msgCls = null; + } + } + + /** + * + */ + private void flushStoreIfWriteBehindEnabled() throws IgniteCheckedException { + for (Ignite ignite : G.allGrids()) { + for (String cacheName : ignite.cacheNames()) + internalCache(ignite, cacheName).context().store().forceFlush(); + } + } + + /** + * @param tx Tx. + */ + private void waitTxFinish(Transaction tx) throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + if (writeBehindEnabled()) + return TransactionState.COMMITTED.equals(tx.state()); + else + return TransactionState.ROLLED_BACK.equals(tx.state()); + } + }, getTestTimeout()); + } + + +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class GridCachePartitionedCacheStorePrepareTest extends GridCacheAbstractCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override public CacheMode getCacheMode() { + return CacheMode.PARTITIONED; + } +} \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class GridCachePartitionedWithNearCacheStorePrepareTest extends GridCachePartitionedCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearWriteBehindCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWithNearWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class GridCachePartitionedWithNearWriteBehindCacheStorePrepareTest extends GridCachePartitionedWithNearCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected boolean writeBehindEnabled() { + return true; + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWriteBehindCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class GridCachePartitionedWriteBehindCacheStorePrepareTest extends GridCachePartitionedCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected boolean writeBehindEnabled() { + return true; + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class GridCacheReplicatedCacheStorePrepareTest extends GridCacheAbstractCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override public CacheMode getCacheMode() { + return CacheMode.REPLICATED; + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.NearCacheConfiguration; + +/** + * + */ +public class GridCacheReplicatedWithNearCacheStorePrepareTest extends GridCacheReplicatedCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } +} \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearWriteBehindCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWithNearWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class GridCacheReplicatedWithNearWriteBehindCacheStorePrepareTest extends GridCacheReplicatedWithNearCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected boolean writeBehindEnabled() { + return true; + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWriteBehindCacheStorePrepareTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedWriteBehindCacheStorePrepareTest.java (revision bd5eda9a8dd76f3bc66c6c292646153520e4b1fa) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * + */ +public class GridCacheReplicatedWriteBehindCacheStorePrepareTest extends GridCacheReplicatedCacheStorePrepareTest { + /** {@inheritDoc} */ + @Override protected boolean writeBehindEnabled() { + return true; + } +} Index: modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java (date 1521214778000) @@ -102,4 +102,13 @@ */ @Deprecated public void sessionEnd(boolean commit) throws CacheWriterException; + + /** + * Checks whether cache store supports 2-phase-commit. + * + * @return {@code True} If store supports 2-phase-commit. + */ + default public boolean supports2PhaseCommit() { + return false; + } } \ No newline at end of file Index: modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java (date 1521214778000) @@ -95,6 +95,11 @@ // No-op. } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheStoreAdapter.class, this); Index: modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java (date 1521214778000) @@ -1640,6 +1640,11 @@ return ses; } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return true; + } + /** * Type kind. */ Index: modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java (date 1521214778000) @@ -206,6 +206,11 @@ log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return true; + } + /** {@inheritDoc} */ @SuppressWarnings({"RedundantTypeArguments"}) @Override public V load(K key) { Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java (date 1521214778000) @@ -228,6 +228,11 @@ delegate.sessionEnd(commit); } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheStoreBalancingWrapper.class, this); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLoaderWriterStore.java (date 1521214778000) @@ -144,6 +144,11 @@ // No-op. } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheLoaderWriterStore.class, this); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java (date 1521214778000) @@ -400,6 +400,8 @@ try { cctx.tm().prepareTx(this, null); + storesPrepare(writeEntries(), true); + if (pessimistic() || isSystemInvalidate()) state(PREPARED); } @@ -487,7 +489,9 @@ List dataEntries = null; - batchStoreCommit(writeMap().values()); + storesPrepare(writeEntries(), false); + + storeFinish(); try { // Node that for near transactions we grab all entries. Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java (date 1521214778000) @@ -515,6 +515,14 @@ cctx.database().checkpointReadUnlock(); } } + + try { + if (tx.dht() || tx.optimistic()) + tx.storesPrepare(req.writes(), true); + } + catch (IgniteCheckedException e) { + onError(e); + } } /** @@ -1656,9 +1664,12 @@ @Override public String toString() { Collection futs = F.viewReadOnly(futures(), new C1, String>() { @Override public String apply(IgniteInternalFuture f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; + if (f instanceof MiniFuture) + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + else + return ""; } }); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java (date 1521214778000) @@ -67,10 +67,6 @@ * */ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { - /** */ - @GridToStringExclude - private KeyLockFuture keyLockFut; - /** */ @GridToStringExclude private ClientRemapFuture remapFut; @@ -398,7 +394,7 @@ while (it.hasNext()) { IgniteInternalFuture fut0 = it.next(); - if (skipFuture(remap, fut0)) + if (!isMini(fut0) || skipFuture(remap, fut0)) continue; MiniFuture fut = (MiniFuture)fut0; @@ -409,7 +405,7 @@ while (it.hasNext()) { fut0 = it.next(); - if (skipFuture(remap, fut0)) + if (!isMini(fut0) || skipFuture(remap, fut0)) continue; fut = (MiniFuture)fut0; @@ -701,9 +697,12 @@ Collection futs = F.viewReadOnly(futures(), new C1, String>() { @Override public String apply(IgniteInternalFuture f) { - return "[node=" + ((MiniFuture)f).primary().id() + - ", loc=" + ((MiniFuture)f).primary().isLocal() + - ", done=" + f.isDone() + "]"; + if (isMini(f)) + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + + ", done=" + f.isDone() + "]"; + else + return ""; } }, new P1>() { Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java (date 1521214778000) @@ -77,10 +77,6 @@ */ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter implements IgniteDiagnosticAware { - /** */ - @GridToStringExclude - private KeyLockFuture keyLockFut; - /** */ private int miniId; @@ -844,9 +840,12 @@ @Override public String toString() { Collection futs = F.viewReadOnly(futures(), new C1, String>() { @Override public String apply(IgniteInternalFuture f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; + if (isMini(f)) + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + else + return ""; } }, new P1>() { @Override public boolean apply(IgniteInternalFuture fut) { Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java (date 1521214778000) @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.internal.S; @@ -36,6 +37,9 @@ * */ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter { + /** */ + @GridToStringExclude + protected KeyLockFuture keyLockFut; /** * @param cctx Context. * @param tx Transaction. @@ -171,7 +175,7 @@ /** * Keys lock future. */ - protected static class KeyLockFuture extends GridFutureAdapter { + public static class KeyLockFuture extends GridFutureAdapter { /** */ @GridToStringInclude protected Collection lockKeys = new GridConcurrentHashSet<>(); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java (date 1521214778000) @@ -76,6 +76,9 @@ boolean found = false; for (IgniteInternalFuture fut : futures()) { + if (!isMini(fut)) + continue; + MiniFuture f = (MiniFuture)fut; if (f.primary().id().equals(nodeId)) { @@ -124,6 +127,14 @@ } } + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture f) { + return f.getClass().equals(MiniFuture.class); + } + /** * Finds pending mini future by the given mini ID. * @@ -138,7 +149,12 @@ // Avoid iterator creation. for (int i = 0; i < size; i++) { - MiniFuture mini = (MiniFuture)future(i); + IgniteInternalFuture fut = future(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; if (mini.futureId() == miniId) { if (!mini.isDone()) @@ -419,9 +435,12 @@ @Override public String toString() { Collection futs = F.viewReadOnly(futures(), new C1, String>() { @Override public String apply(IgniteInternalFuture f) { - return "[node=" + ((MiniFuture)f).primary().id() + - ", loc=" + ((MiniFuture)f).primary().isLocal() + - ", done=" + f.isDone() + "]"; + if (isMini(f)) + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + + ", done=" + f.isDone() + "]"; + else + return ""; } }); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java (date 1521214778000) @@ -106,6 +106,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; +import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFutureAdapter.*; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.transactions.TransactionState.COMMITTED; @@ -3270,8 +3271,53 @@ mapExplicitLocks(); + final GridFutureAdapter storePrepareFut = new GridFutureAdapter<>(); + + fut.add(storePrepareFut); + fut.prepare(); + if (fut instanceof GridNearOptimisticTxPrepareFutureAdapter) { + KeyLockFuture keyLockFut = ((GridNearOptimisticTxPrepareFutureAdapter)fut).keyLockFut; + + if (keyLockFut != null) { + final GridNearTxPrepareFutureAdapter finalFut = fut; + + keyLockFut.listen(new IgniteInClosure>() { + /** {@inheritDoc} */ + @Override public void apply( + IgniteInternalFuture gridNearTxPrepareResponseIgniteInternalFuture) { + try { + storesPrepare(writeEntries(), true); + + storePrepareFut.onDone(); + } + catch (IgniteCheckedException e) { + finalFut.onDone(e); + + storePrepareFut.onDone(e); + } + + } + }); + } + else + storePrepareFut.onDone(); + + } + else { + try { + storesPrepare(writeEntries(), true); + + storePrepareFut.onDone(); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + + storePrepareFut.onDone(e); + } + } + return fut; } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java (date 1521214778000) @@ -550,6 +550,11 @@ // No-op. } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheWriteBehindStore.class, this); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java (date 1522070594000) @@ -48,17 +48,7 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; -import org.apache.ignite.internal.processors.cache.CacheLazyEntry; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; @@ -373,6 +363,31 @@ consistentIdMapper = new ConsistentIdMapper(cctx.discovery()); } + /** + * Checks whether store must be prepared now. + * + * @param onTxPrepare {@code True} If transaction is on prepare phase, otherwise is on finish step. + * @param storeManager Store manager. + * @param txEntry txEntry. + * @return {@code True} If store could be prepared. + */ + private boolean prepareStore(boolean onTxPrepare, CacheStoreManager storeManager, IgniteTxEntry txEntry) { + assert storeManager != null; + + if (!storeManager.configured()) + return true; + if (storeManager.isWriteBehind() && onTxPrepare) + return false; + if (onTxPrepare) + return storeManager.configuredStore().supports2PhaseCommit(); + else if (!storeManager.isLocal() && txEntry.cached().detached()) + return true; + else if (storeManager.configuredStore().supports2PhaseCommit() && storeManager.isWriteBehind()) + return true; + else + return !storeManager.configuredStore().supports2PhaseCommit(); + } + /** * @return Shared cache context. */ @@ -899,6 +914,22 @@ return false; } + /** + * Commits stores. + */ + protected void storeFinish() throws IgniteCheckedException { + if (!storeEnabled() || internal() || + (!local() && near())) // No need to work with local store at GridNearTxRemote. + return; + + Collection stores = txState().stores(cctx); + + if (stores == null || stores.isEmpty() || isRollbackOnly()) + return; + + sessionEnd(stores, true); + } + /** {@inheritDoc} */ @Override public void completedVersions(GridCacheVersion base, Collection committed, Collection txs) { @@ -1255,22 +1286,22 @@ } /** - * Performs batch database operations. This commit must be called - * before cache update. This way if there is a DB failure, - * cache transaction can still be rolled back. + * Iterates through entries and prepares cache stores if they support prepare. On transaction prepare step + * store would be prepared only if it supports preparing. On transaction finish step only stores that don't support + * preparing are written into, because they can persist data right away(without prepare). * - * @param writeEntries Transaction write set. - * @throws IgniteCheckedException If batch update failed. + * @param writeEntries Write entries. + * @param onTxPrepare {@code True} If we perform persistence prepare on transaction prepare phase. */ - @SuppressWarnings({"CatchGenericClass"}) - protected final void batchStoreCommit(Iterable writeEntries) throws IgniteCheckedException { + public final void storesPrepare(Iterable writeEntries, + boolean onTxPrepare) throws IgniteCheckedException { if (!storeEnabled() || internal() || (!local() && near())) // No need to work with local store at GridNearTxRemote. return; Collection stores = txState().stores(cctx); - if (stores == null || stores.isEmpty()) + if (stores == null || stores.isEmpty() || isRollbackOnly()) return; assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction"; @@ -1279,178 +1310,172 @@ boolean isWriteToStoreFromDht = first.isWriteToStoreFromDht(); - if ((local() || first.isLocal()) && (near() || isWriteToStoreFromDht)) { + if ((local() || first.isLocal()) && (near() || isWriteToStoreFromDht) && writeEntries != null) { try { - if (writeEntries != null) { - Map> putMap = null; - List rmvCol = null; - CacheStoreManager writeStore = null; + Map> putMap = null; + List rmvCol = null; + CacheStoreManager writeStore = null; - boolean skipNonPrimary = near() && isWriteToStoreFromDht; + boolean skipNonPrimary = near() && isWriteToStoreFromDht; - for (IgniteTxEntry e : writeEntries) { - boolean skip = e.skipStore(); + for (IgniteTxEntry e : writeEntries) { + boolean skip = e.skipStore(); - if (!skip && skipNonPrimary) { - skip = e.cached().isNear() || - e.cached().detached() || - !e.context().affinity().primaryByPartition(e.cached().partition(), topologyVersion()).isLocal(); - } + GridCacheContext cacheCtx = e.context(); + + if (!skip && skipNonPrimary) { + skip = e.cached().isNear() || + e.cached().detached() || + !e.context().affinity().primaryByPartition(e.cached().partition(), topologyVersion()).isLocal(); + + } - if (!skip && !local() && // Update local store at backups only if needed. - cctx.localStorePrimaryOnly()) - skip = true; + if (!skip && !local() && // Update local store at backups only if needed. + cctx.localStorePrimaryOnly()) + skip = true; - if (skip) - continue; + if (!skip) + skip = !prepareStore(onTxPrepare, cacheCtx.store(), e); + + if (skip) + continue; - boolean intercept = e.context().config().getInterceptor() != null; + + boolean intercept = e.context().config().getInterceptor() != null; - if (intercept || !F.isEmpty(e.entryProcessors())) - e.cached().unswap(false); + if (intercept || !F.isEmpty(e.entryProcessors())) + e.cached().unswap(false); - IgniteBiTuple res = applyTransformClosures(e, false, null); + IgniteBiTuple res = applyTransformClosures(e, false, null); - GridCacheContext cacheCtx = e.context(); - - GridCacheOperation op = res.get1(); - KeyCacheObject key = e.key(); - CacheObject val = res.get2(); - GridCacheVersion ver = writeVersion(); + GridCacheOperation op = res.get1(); + KeyCacheObject key = e.key(); + CacheObject val = res.get2(); + GridCacheVersion ver = writeVersion(); - if (op == CREATE || op == UPDATE) { - // Batch-process all removes if needed. - if (rmvCol != null && !rmvCol.isEmpty()) { - assert writeStore != null; + if (op == CREATE || op == UPDATE) { + if (rmvCol != null && !rmvCol.isEmpty()) { + assert writeStore != null; - writeStore.removeAll(this, rmvCol); + writeStore.removeAll(this, rmvCol); - // Reset. - rmvCol.clear(); + // Reset. + rmvCol.clear(); - writeStore = null; - } + writeStore = null; + } - // Batch-process puts if cache ID has changed. - if (writeStore != null && writeStore != cacheCtx.store()) { - if (putMap != null && !putMap.isEmpty()) { - writeStore.putAll(this, putMap); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (putMap != null && !putMap.isEmpty()) { + writeStore.putAll(this, putMap); - // Reset. - putMap.clear(); - } + putMap.clear(); + } - writeStore = null; - } + writeStore = null; + } - if (intercept) { - Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut( - new CacheLazyEntry( - cacheCtx, - key, - e.cached().rawGet(), - e.keepBinary()), - cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false)); + if (intercept) { + Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut( + new CacheLazyEntry( + cacheCtx, + key, + e.cached().rawGet(), + e.keepBinary()), + cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false)); - if (interceptorVal == null) - continue; + if (interceptorVal == null) + continue; - val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); - } + val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); + } - if (writeStore == null) - writeStore = cacheCtx.store(); + if (writeStore == null) + writeStore = cacheCtx.store(); - if (writeStore.isWriteThrough()) { - if (putMap == null) - putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); + if (writeStore.isWriteThrough()) { + if (putMap == null) + putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - putMap.put(key, F.t(val, ver)); - } - } - else if (op == DELETE) { - // Batch-process all puts if needed. - if (putMap != null && !putMap.isEmpty()) { - assert writeStore != null; + putMap.put(key, F.t(val, ver)); + } + } + else if (op == DELETE) { + if (putMap != null && !putMap.isEmpty()) { + assert writeStore != null; - writeStore.putAll(this, putMap); + writeStore.putAll(this, putMap); - // Reset. - putMap.clear(); + putMap.clear(); - writeStore = null; - } + writeStore = null; + } - if (writeStore != null && writeStore != cacheCtx.store()) { - if (rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAll(this, rmvCol); + if (writeStore != null && writeStore != cacheCtx.store()) { + if (rmvCol != null && !rmvCol.isEmpty()) { + writeStore.removeAll(this, rmvCol); - // Reset. - rmvCol.clear(); - } + rmvCol.clear(); + } - writeStore = null; - } + writeStore = null; + } - if (intercept) { - IgniteBiTuple t = cacheCtx.config().getInterceptor().onBeforeRemove( - new CacheLazyEntry(cacheCtx, key, e.cached().rawGet(), e.keepBinary())); + if (intercept) { + IgniteBiTuple t = cacheCtx.config().getInterceptor().onBeforeRemove( + new CacheLazyEntry(cacheCtx, key, e.cached().rawGet(), e.keepBinary())); - if (cacheCtx.cancelRemove(t)) - continue; - } + if (cacheCtx.cancelRemove(t)) + continue; + } - if (writeStore == null) - writeStore = cacheCtx.store(); + if (writeStore == null) + writeStore = cacheCtx.store(); - if (writeStore.isWriteThrough()) { - if (rmvCol == null) - rmvCol = new ArrayList<>(); + if (writeStore.isWriteThrough()) { + if (rmvCol == null) + rmvCol = new ArrayList<>(); - rmvCol.add(key); - } - } - else if (log.isDebugEnabled()) - log.debug("Ignoring NOOP entry for batch store commit: " + e); - } + rmvCol.add(key); + } + } + else if (log.isDebugEnabled()) + log.debug("Ignoring NOOP entry for batch store commit: " + e); + } - if (putMap != null && !putMap.isEmpty()) { - assert rmvCol == null || rmvCol.isEmpty(); - assert writeStore != null; + if (putMap != null && !putMap.isEmpty()) { + assert rmvCol == null || rmvCol.isEmpty(); + assert writeStore != null; - // Batch put at the end of transaction. - writeStore.putAll(this, putMap); - } + writeStore.putAll(this, putMap); + } - if (rmvCol != null && !rmvCol.isEmpty()) { - assert putMap == null || putMap.isEmpty(); - assert writeStore != null; + if (rmvCol != null && !rmvCol.isEmpty()) { + assert putMap == null || putMap.isEmpty(); + assert writeStore != null; - // Batch remove at the end of transaction. - writeStore.removeAll(this, rmvCol); - } + writeStore.removeAll(this, rmvCol); } - - // Commit while locks are held. - sessionEnd(stores, true); } catch (IgniteCheckedException ex) { - commitError(ex); + if (!onTxPrepare) { + commitError(ex); - errorWhenCommitting(); + errorWhenCommitting(); - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); + cctx.tm().removeCommittedTx(this); + } throw ex; } catch (Throwable ex) { - commitError(ex); + if (!onTxPrepare) { + commitError(ex); - errorWhenCommitting(); + errorWhenCommitting(); - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); + cctx.tm().removeCommittedTx(this); + } if (ex instanceof Error) throw (Error)ex; @@ -1462,8 +1487,6 @@ sessionEnd(stores, false); } } - else - sessionEnd(stores, true); } /** Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java (date 1521214778000) @@ -499,7 +499,9 @@ cctx.tm().addCommittedTx(this); if (!empty) { - batchStoreCommit(writeEntries()); + storesPrepare(writeEntries(), false); + + storeFinish(); WALPointer ptr = null; Index: modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java (date 1531820219000) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java (date 1521214778000) @@ -366,6 +366,11 @@ } } + /** {@inheritDoc} */ + @Override public boolean supports2PhaseCommit() { + return false; + } + /** {@inheritDoc} */ @Override public void start() throws IgniteException { // No-op. Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java (date 1531820219000) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestStore.java (date 1521214778000) @@ -264,6 +264,8 @@ putCnt.incrementAndGet(); } + StackTraceElement[] hist ; + /** {@inheritDoc} */ @Override public void writeAll(Collection> entries) throws CacheWriterException { @@ -277,6 +279,8 @@ this.map.put(e.getKey(), e.getValue()); putAllCnt.incrementAndGet(); + + hist = Thread.currentThread().getStackTrace(); } /** {@inheritDoc} */ Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java (date 1531820219000) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java (date 1521214778000) @@ -211,8 +211,12 @@ IgniteCache cache2 = grid(1).cache(DEFAULT_CACHE_NAME); final int key1 = primaryKey(cache1); + final int key2 = primaryKey(cache2); + Thread.sleep(1_000); + + final CountDownLatch latch1 = new CountDownLatch(2); final CountDownLatch latch2 = new CountDownLatch(2);