Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java (revision f329383119cb4af9b8f7a27dde12cf05372ed1f5) +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java (revision b4957cf52a1e176acbfb00244798820aea7fa25d) @@ -331,6 +331,9 @@ /** {@inheritDoc} */ @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { + cctx.kernalContext().log(getClass()).warning("--> storeWriteThrough " + + "activeCacheIds=" + activeCacheIds + " size="+activeCacheIds.size()); + if (!activeCacheIds.isEmpty()) { for (int i = 0; i < activeCacheIds.size(); i++) { int cacheId = activeCacheIds.get(i); Index: modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java (revision f329383119cb4af9b8f7a27dde12cf05372ed1f5) +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridIntList.java (revision b4957cf52a1e176acbfb00244798820aea7fa25d) @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.NoSuchElementException; +import org.apache.ignite.internal.util.lang.RunnableX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.plugin.extensions.communication.Message; @@ -47,6 +48,9 @@ /** */ private int idx; + /** */ + public static RunnableX onBackupTxAdd; + /** * */ @@ -171,7 +175,12 @@ else if (arr.length == idx) arr = Arrays.copyOf(arr, arr.length << 1); - arr[idx++] = x; + int index = idx++; + + if (Thread.currentThread().getName().startsWith("sys")) + onBackupTxAdd.run(); + + arr[index] = x; } /** Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxTest.java new file mode 100644 --- /dev/null (revision b4957cf52a1e176acbfb00244798820aea7fa25d) +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxTest.java (revision b4957cf52a1e176acbfb00244798820aea7fa25d) @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.AbstractFailureHandler; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; + +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** */ +public class TxTest extends GridCommonAbstractTest { + /** */ + private final AtomicReference err = new AtomicReference<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCommunicationSpi(new TestRecordingCommunicationSpi()) + .setFailureHandler(new AbstractFailureHandler() { + @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) { + err.compareAndSet(null, failureCtx.error()); + + return false; + } + }); + } + + /** */ + @Test + public void test() throws Exception { + IgniteEx node0 = startGrid(0); + IgniteEx node1 = startGrid(1); + + IgniteCache cache0 = node0.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(1)); + + GridIntList.onBackupTxAdd = () -> { + // Tx recovery should occur simultaneously with active caches adding. + node0.context().discovery().failNode(node1.localNode().id(), "test reason"); + + // Wait for PME, tx recovery. + U.sleep(3000); + }; + + Integer key = backupKey(cache0); + + try (Transaction tx = node0.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) { + cache0.put(key, key); + + tx.commit(); + } + catch (Exception e) { + // No-op. + } + + // Wait for FH triggered. + U.sleep(5000); + assertNull(err.get()); + + IgniteCache cache1 = node0.cache(DEFAULT_CACHE_NAME); + + assertNull(cache1.get(key)); + } +}