Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java (revision 43ebe7a5b3a4a19a1ef842b6d80417a4fc3580a0) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java (revision ) @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -126,6 +127,9 @@ * Managed transaction adapter. */ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implements IgniteInternalTx, Externalizable { + /** Tx prepare commit counters. */ + public static final ConcurrentHashMap TX_PREPARE_COMMIT_COUNTERS = new ConcurrentHashMap<>(); + /** */ private static final long serialVersionUID = 0L; @@ -1197,6 +1201,27 @@ if (!txState().mvccEnabled()) ptr = cctx.tm().logTxRecord(this); } + + if (state == PREPARED) { + TX_PREPARE_COMMIT_COUNTERS.compute(nearXidVersion(), (k, v) -> { + if (v == null) + return 1; + + if (v == 0) + throw new AssertionError("PCPC problem"); + + return v + 1; + }); + } + + if (state == COMMITTED) { + TX_PREPARE_COMMIT_COUNTERS.compute(nearXidVersion(), (k, v) -> { + if (v == 0) + throw new AssertionError("There are more prepares than commits"); + + return v - 1; + }); + } } } Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnePhaseCommitTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnePhaseCommitTest.java (revision ) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnePhaseCommitTest.java (revision ) @@ -0,0 +1,88 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class TxOnePhaseCommitTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Nodes count (actually two times more nodes will started: server + client). */ + private static final int NODES_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(1)); // One-phase commit works when there are one primary and one backup. + + return cfg; + } + + /** + * + */ + public void test() throws Exception { + IgniteEx ig = (IgniteEx)startGrids(NODES_CNT); + + IgniteCache cache = ig.cache(CACHE_NAME); + + AtomicBoolean stop = new AtomicBoolean(false); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + while (!stop.get()) + cache.put(ThreadLocalRandom.current().nextInt(), ThreadLocalRandom.current().nextInt()); + } + }, 3, "tx-loader"); + + U.sleep(10_000); + + stop.set(true); + + fut.get(); + } +}