Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-9806

Legacy tx invalidation code breaks data consistency between owners.

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      Reproducer:

      /*
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements.  See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License.  You may obtain a copy of the License at
       *
       *      http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      package org.apache.ignite.internal.processors.cache.transactions;
      
      import java.util.UUID;
      import java.util.function.Supplier;
      import org.apache.ignite.Ignite;
      import org.apache.ignite.IgniteCheckedException;
      import org.apache.ignite.IgniteTransactions;
      import org.apache.ignite.cache.CacheAtomicityMode;
      import org.apache.ignite.cache.CacheMode;
      import org.apache.ignite.cache.CacheWriteSynchronizationMode;
      import org.apache.ignite.configuration.CacheConfiguration;
      import org.apache.ignite.configuration.IgniteConfiguration;
      import org.apache.ignite.internal.IgniteEx;
      import org.apache.ignite.internal.managers.communication.GridIoPolicy;
      import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
      import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
      import org.apache.ignite.internal.util.typedef.G;
      import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
      import org.apache.ignite.testsuites.IgniteIgnore;
      import org.apache.ignite.transactions.Transaction;
      import org.apache.ignite.transactions.TransactionConcurrency;
      import org.apache.ignite.transactions.TransactionIsolation;
      import org.jetbrains.annotations.Nullable;
      import org.mockito.Mockito;
      import org.mockito.invocation.InvocationOnMock;
      import org.mockito.stubbing.Answer;
      
      /**
       * Tests data consistency if transaction is failed due to heuristic exception on originating node.
       */
      public class TxDataConsistencyOnCommitFailureTest extends GridCommonAbstractTest {
          /** */
          public static final int KEY = 0;
      
          /** */
          public static final String CLIENT = "client";
      
          /** */
          private int nodesCnt;
      
          /** */
          private int backups;
      
          /** {@inheritDoc} */
          @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
              IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
      
              cfg.setClientMode(igniteInstanceName.startsWith(CLIENT));
      
              cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
                  setCacheMode(CacheMode.PARTITIONED).
                  setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
                  setBackups(backups).
                  setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
      
              return cfg;
          }
      
          /** {@inheritDoc} */
          @Override protected void afterTest() throws Exception {
              super.afterTest();
      
              stopAllGrids();
          }
      
          /** */
          @IgniteIgnore(value = "https://issues.apache.org/jira/browse/IGNITE-590", forceFailure = false)
          public void testCommitErrorOnColocatedNode2PC() throws Exception {
              nodesCnt = 3;
      
              backups = 2;
      
              doTestCommitError(() -> primaryNode(KEY, DEFAULT_CACHE_NAME));
          }
      
          /**
           * @param factory Factory.
           */
          private void doTestCommitError(Supplier<Ignite> factory) throws Exception {
              Ignite crd = startGridsMultiThreaded(nodesCnt);
      
              crd.cache(DEFAULT_CACHE_NAME).put(KEY, KEY);
      
              Ignite ignite = factory.get();
      
              if (ignite == null)
                  ignite = startGrid("client");
      
              assertNotNull(ignite.cache(DEFAULT_CACHE_NAME));
      
              injectMockedTxManager(ignite);
      
              checkKey();
      
              IgniteTransactions transactions = ignite.transactions();
      
              try(Transaction tx = transactions.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, 1)) {
                  assertNotNull(transactions.tx());
      
                  ignite.cache(DEFAULT_CACHE_NAME).put(KEY, KEY + 1);
      
                  tx.commit();
      
                  fail();
              }
              catch (Exception t) {
                  // No-op.
              }
      
              checkKey();
      
              checkFutures();
          }
      
          /**
           * @param ignite Ignite.
           */
          private void injectMockedTxManager(Ignite ignite) {
              IgniteEx igniteEx = (IgniteEx)ignite;
      
              GridCacheSharedContext<Object, Object> ctx = igniteEx.context().cache().context();
      
              IgniteTxManager tm = ctx.tm();
      
              IgniteTxManager mockTm = Mockito.spy(tm);
      
              MockGridNearTxLocal locTx = new MockGridNearTxLocal(ctx, false, false, false, GridIoPolicy.SYSTEM_POOL,
                  TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, true, null, 1, null, 0, null);
      
              Mockito.doAnswer(new Answer<GridNearTxLocal>() {
                  @Override public GridNearTxLocal answer(InvocationOnMock invocation) throws Throwable {
                      mockTm.onCreated(null, locTx);
      
                      return locTx;
                  }
              }).when(mockTm).
                  newTx(locTx.implicit(), locTx.implicitSingle(), null, locTx.concurrency(),
                      locTx.isolation(), locTx.timeout(), locTx.storeEnabled(), null, locTx.size(), locTx.label());
      
              ctx.setTxManager(mockTm);
          }
      
          /** */
          private void checkKey() {
              for (Ignite ignite : G.allGrids()) {
                  if (!ignite.configuration().isClientMode())
                      assertNotNull(ignite.cache(DEFAULT_CACHE_NAME).localPeek(KEY));
              }
          }
      
          /** */
          private static class MockGridNearTxLocal extends GridNearTxLocal {
              /** Empty constructor. */
              public MockGridNearTxLocal() {
              }
      
              /**
               * @param ctx Context.
               * @param implicit Implicit.
               * @param implicitSingle Implicit single.
               * @param sys System.
               * @param plc Policy.
               * @param concurrency Concurrency.
               * @param isolation Isolation.
               * @param timeout Timeout.
               * @param storeEnabled Store enabled.
               * @param mvccOp Mvcc op.
               * @param txSize Tx size.
               * @param subjId Subj id.
               * @param taskNameHash Task name hash.
               * @param lb Label.
               */
              public MockGridNearTxLocal(GridCacheSharedContext ctx, boolean implicit, boolean implicitSingle, boolean sys,
                  byte plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout,
                  boolean storeEnabled, Boolean mvccOp, int txSize, @Nullable UUID subjId, int taskNameHash, @Nullable String lb) {
                  super(ctx, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, storeEnabled, mvccOp,
                      txSize, subjId, taskNameHash, lb);
              }
      
              /** {@inheritDoc} */
              @Override public void userCommit() throws IgniteCheckedException {
                  throw new IgniteCheckedException("Force failure");
              }
          }
      }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ascherbakov Alexey Scherbakov
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: