Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java (revision ) @@ -2373,7 +2373,12 @@ byte txState = convertToTxState(txRecord.state()); - cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); + cctx.coordinators().updateTxState( + txRecord.mvccVersion().coordinatorVersion(), + txRecord.mvccVersion().counter(), + txState, + false + ); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2515,7 +2520,12 @@ byte txState = convertToTxState(txRecord.state()); - cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false); + cctx.coordinators().updateTxState( + txRecord.mvccVersion().coordinatorVersion(), + txRecord.mvccVersion().counter(), + txState, + false + ); break; Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java (revision ) @@ -2438,7 +2438,12 @@ if (cctx.wal() != null) ptr = cctx.wal().log(newTxRecord(tx)); - cctx.coordinators().updateState(tx.mvccSnapshot, commit ? TxState.COMMITTED : TxState.ABORTED, tx.local()); + cctx.coordinators().updateTxState( + tx.mvccSnapshot.coordinatorVersion(), + tx.mvccSnapshot.counter(), + commit ? TxState.COMMITTED : TxState.ABORTED, + tx.local() + ); } finally { cctx.database().checkpointReadUnlock(); @@ -2463,7 +2468,12 @@ if (cctx.wal() != null) cctx.wal().log(newTxRecord(tx)); - cctx.coordinators().updateState(tx.mvccSnapshot, TxState.PREPARED); + cctx.coordinators().updateTxState( + tx.mvccSnapshot.coordinatorVersion(), + tx.mvccSnapshot.counter(), + TxState.PREPARED, + true + ); } finally { cctx.database().checkpointReadUnlock(); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java (revision ) @@ -1174,7 +1174,7 @@ GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLocalTx(cctx, lockVer); lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer, op, needHistory, noCreate, resFut, needOldVal, filter, retVal, entryProc, invokeArgs)); @@ -1323,7 +1323,7 @@ GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLocalTx(cctx, lockVer); lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory, resFut, needOldVal, retVal, filter)); @@ -1415,7 +1415,7 @@ GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLocalTx(cctx, lockVer); lockFut.listen(new MvccAcquireLockListener(tx, this, mvccVer, resFut)); @@ -5218,7 +5218,7 @@ else if (res.resultType() == ResultType.LOCKED) { entry.unlockEntry(); - IgniteInternalFuture lockFuture = cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()); + IgniteInternalFuture lockFuture = cctx.kernalContext().coordinators().waitForLocalTx(cctx, res.resultVersion()); lockFuture.listen(this); @@ -5349,7 +5349,7 @@ else if (res.resultType() == ResultType.LOCKED) { entry.unlockEntry(); - cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this); + cctx.kernalContext().coordinators().waitForLocalTx(cctx, res.resultVersion()).listen(this); return; } @@ -5523,7 +5523,7 @@ else if (res.resultType() == ResultType.LOCKED) { entry.unlockEntry(); - cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this); + cctx.kernalContext().coordinators().waitForLocalTx(cctx, res.resultVersion()).listen(this); return; } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java (revision ) @@ -107,42 +107,31 @@ * @return State for given mvcc version. * @throws IgniteCheckedException If fails. */ - byte state(long crdVer, long cntr) throws IgniteCheckedException; + byte txState(long crdVer, long cntr) throws IgniteCheckedException; /** - * @param ver Version to check. - * @return State for given mvcc version. - * @throws IgniteCheckedException If fails. - */ - byte state(MvccVersion ver) throws IgniteCheckedException; - - /** - * @param ver Version. - * @param state State. - * @throws IgniteCheckedException If fails; - */ - void updateState(MvccVersion ver, byte state) throws IgniteCheckedException; - - /** - * @param ver Version. + * Update state of transaction in TX log. + * + * @param crdVer MVCC coordinator version. + * @param cntr MVCC counter. * @param state State. * @param primary Flag if this is primary node. * @throws IgniteCheckedException If fails; */ - void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException; + void updateTxState(long crdVer, long cntr, byte state, boolean primary) throws IgniteCheckedException; /** * @param crd Mvcc coordinator version. * @param cntr Mvcc counter. */ - void registerLocalTransaction(long crd, long cntr); + void registerLocalTx(long crd, long cntr); /** * @param crd Mvcc coordinator version. * @param cntr Mvcc counter. * @return {@code True} if there is an active local transaction with given version. */ - boolean hasLocalTransaction(long crd, long cntr); + boolean hasLocalTx(long crd, long cntr); /** * @param cctx Cache context. @@ -150,7 +139,7 @@ * @return Future, which is completed as soon as the lock is released. * @throws IgniteCheckedException If failed. */ - IgniteInternalFuture waitFor(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException; + IgniteInternalFuture waitForLocalTx(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException; /** * @param tracker Query tracker. Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java (revision ) @@ -1704,7 +1704,7 @@ mvccSnapshot = ver; if(dht()) - cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter()); + cctx.coordinators().registerLocalTx(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter()); qryEnlisted = true; } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java (revision ) @@ -140,9 +140,6 @@ /** */ private static final IgniteProductVersion MVCC_SUPPORTED_SINCE = IgniteProductVersion.fromString("2.7.0"); - /** */ - private static final Waiter LOCAL_TRANSACTION_MARKER = new LocalTransactionMarker(); - /** Dummy tx for vacuum. */ private static final IgniteInternalTx DUMMY_TX = new GridNearTxLocal(); @@ -208,8 +205,8 @@ /** */ private final Map waitTxFuts = new ConcurrentHashMap<>(); - /** */ - private final Map waitMap = new ConcurrentHashMap<>(); + /** Transaction wait map. */ + private final ConcurrentHashMap waitMap = new ConcurrentHashMap<>(); /** */ private final ActiveQueries activeQueries = new ActiveQueries(); @@ -483,64 +480,54 @@ } /** {@inheritDoc} */ - @Override public byte state(MvccVersion ver) throws IgniteCheckedException { - return state(ver.coordinatorVersion(), ver.counter()); - } - - /** {@inheritDoc} */ - @Override public byte state(long crdVer, long cntr) throws IgniteCheckedException { - assert txLog != null && mvccEnabled; - + @Override public byte txState(long crdVer, long cntr) throws IgniteCheckedException { return txLog.get(crdVer, cntr); } /** {@inheritDoc} */ - @Override public void updateState(MvccVersion ver, byte state) throws IgniteCheckedException { - updateState(ver, state, true); - } - - /** {@inheritDoc} */ - @Override public void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException { - assert txLog != null && mvccEnabled; - - TxKey key = new TxKey(ver.coordinatorVersion(), ver.counter()); + @Override public void updateTxState(long crdVer, long cntr, byte state, boolean primary) + throws IgniteCheckedException { + TxKey key = new TxKey(crdVer, cntr); txLog.put(key, state, primary); - Waiter waiter; + // Release the next transaction waiting for a lock. + if (primary && (state == TxState.ABORTED || state == TxState.COMMITTED)) { + WaitList waitList = waitMap.remove(key); - if (primary && (state == TxState.ABORTED || state == TxState.COMMITTED) - && (waiter = waitMap.remove(key)) != null) - waiter.run(ctx); + if (waitList != null) { + List waiters = waitList.complete(); + + if (waiters != null) { + for (Runnable waiter : waiters) + waiter.run(); + } + } + } } /** {@inheritDoc} */ - @Override public void registerLocalTransaction(long crd, long cntr) { - Waiter old = waitMap.putIfAbsent(new TxKey(crd, cntr), LOCAL_TRANSACTION_MARKER); - - assert old == null || old.hasLocalTransaction(); + @Override public void registerLocalTx(long crdVer, long cntr) { + waitMap.putIfAbsent(new TxKey(crdVer, cntr), new WaitList()); } /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction(long crd, long cntr) { - Waiter waiter = waitMap.get(new TxKey(crd, cntr)); - - return waiter != null && waiter.hasLocalTransaction(); + @Override public boolean hasLocalTx(long crdVer, long cntr) { + return waitMap.containsKey(new TxKey(crdVer, cntr)); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture waitFor(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException { + @Override public IgniteInternalFuture waitForLocalTx(GridCacheContext cctx, MvccVersion locked) { TxKey key = new TxKey(locked.coordinatorVersion(), locked.counter()); LockFuture fut = new LockFuture(cctx.ioPolicy()); - Waiter waiter = waitMap.merge(key, fut, Waiter::concat); + WaitList waitList = waitMap.get(key); - byte state = txLog.get(key); + boolean enqueued = waitList != null && waitList.addWaiter(fut); - if ((state == TxState.ABORTED || state == TxState.COMMITTED) - && !waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null) - waiter.run(ctx); + if (!enqueued) + fut.run(); return fut; } @@ -564,6 +551,54 @@ return tryRequestSnapshotLocal(null); } + /** + * Wait list for specific transaction.. + */ + private static class WaitList { + /** Waiters. */ + private ArrayList waiters; + + /** Completed flag. */ + private boolean completed; + + /** + * Add waiter if possible. + * + * @param waiter Waiter. + * @return {@code True} if waiter was added, {@code false} if wait list is already completed and cannot be used + * any more. + */ + private boolean addWaiter(Runnable waiter) { + synchronized (this) { + if (completed) + return false; + + if (waiters == null) + waiters = new ArrayList<>(2); + + waiters.add(waiter); + + return true; + } + } + + /** + * Complete + * + * @return Waiters available at the time of completion. + */ + private List complete() { + synchronized (this) { + if (completed) + return null; + + completed = true; + + return waiters; + } + } + } + /** {@inheritDoc} */ @Override public MvccSnapshot tryRequestSnapshotLocal(@Nullable IgniteInternalTx tx) throws ClusterTopologyCheckedException { MvccCoordinator crd = currentCoordinator(); @@ -1260,7 +1295,7 @@ if (!( key.major() == snapshot.coordinatorVersion() && key.minor() > snapshot.cleanupVersion() || key.major() > snapshot.coordinatorVersion())) { - byte state = state(key.major(), key.minor()); + byte state = txState(key.major(), key.minor()); assert state == TxState.ABORTED : "tx state=" + state; } @@ -1931,32 +1966,10 @@ } } - /** */ - private interface Waiter { - /** - * @param ctx Grid kernal context. - */ - void run(GridKernalContext ctx); - - /** - * @param other Another waiter. - * @return New compound waiter. - */ - Waiter concat(Waiter other); - - /** - * @return {@code True} if there is an active local transaction - */ - boolean hasLocalTransaction(); - - /** - * @return {@code True} if it is a compound waiter. - */ - boolean compound(); - } - - /** */ - private static class LockFuture extends GridFutureAdapter implements Waiter, Runnable { + /** + * Lock future. + */ + private class LockFuture extends GridFutureAdapter implements Runnable { /** */ private final byte plc; @@ -1969,140 +1982,13 @@ /** {@inheritDoc} */ @Override public void run() { - onDone(); - } - - /** {@inheritDoc} */ - @Override public void run(GridKernalContext ctx) { try { - ctx.pools().poolForPolicy(plc).execute(this); + ctx.pools().poolForPolicy(plc).execute(this::onDone); } catch (IgniteCheckedException e) { U.error(ctx.log(LockFuture.class), e); } } - - /** {@inheritDoc} */ - @Override public Waiter concat(Waiter other) { - return new CompoundWaiterNoLocal(this, other); - } - - /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean compound() { - return false; - } - } - - /** */ - private static class LocalTransactionMarker implements Waiter { - /** {@inheritDoc} */ - @Override public void run(GridKernalContext ctx) { - // No-op - } - - /** {@inheritDoc} */ - @Override public Waiter concat(Waiter other) { - return new CompoundWaiter(other); - } - - /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean compound() { - return false; - } - } - - /** */ - @SuppressWarnings("unchecked") - private static class CompoundWaiter implements Waiter { - /** */ - private final Object inner; - - /** - * @param waiter Waiter to wrap. - */ - private CompoundWaiter(Waiter waiter) { - inner = waiter.compound() ? ((CompoundWaiter)waiter).inner : waiter; - } - - /** - * @param first First waiter. - * @param second Second waiter. - */ - private CompoundWaiter(Waiter first, Waiter second) { - ArrayList list = new ArrayList<>(); - - add(list, first); - add(list, second); - - inner = list; - } - - /** */ - private void add(List to, Waiter waiter) { - if (!waiter.compound()) - to.add(waiter); - else if (((CompoundWaiter)waiter).inner.getClass() == ArrayList.class) - to.addAll((List)((CompoundWaiter)waiter).inner); - else - to.add((Waiter)((CompoundWaiter)waiter).inner); - } - - /** {@inheritDoc} */ - @Override public void run(GridKernalContext ctx) { - if (inner.getClass() == ArrayList.class) { - for (Waiter waiter : (List)inner) { - waiter.run(ctx); - } - } - else - ((Waiter)inner).run(ctx); - } - - /** {@inheritDoc} */ - @Override public Waiter concat(Waiter other) { - return new CompoundWaiter(this, other); - } - - /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean compound() { - return true; - } - } - - /** */ - private static class CompoundWaiterNoLocal extends CompoundWaiter { - /** - * @param first First waiter. - * @param second Second waiter. - */ - private CompoundWaiterNoLocal(Waiter first, Waiter second) { - super(first, second); - } - - /** {@inheritDoc} */ - @Override public Waiter concat(Waiter other) { - return new CompoundWaiterNoLocal(this, other); - } - - /** {@inheritDoc} */ - @Override public boolean hasLocalTransaction() { - return false; - } } /** Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java (revision ) @@ -140,7 +140,7 @@ byte state = state(cctx, mvccCrd, mvccCntr, 0); return state != TxState.COMMITTED && state != TxState.ABORTED - || cctx.kernalContext().coordinators().hasLocalTransaction(mvccCrd, mvccCntr); + || cctx.kernalContext().coordinators().hasLocalTx(mvccCrd, mvccCntr); } /** @@ -184,7 +184,7 @@ if ((mvccOpCntr & MVCC_HINTS_MASK) != 0) return (byte)(mvccOpCntr >>> MVCC_HINTS_BIT_OFF); - byte state = proc.state(mvccCrd, mvccCntr); + byte state = proc.txState(mvccCrd, mvccCntr); if ((state == TxState.NA || state == TxState.PREPARED) && (proc.currentCoordinator() == null // Recovery from WAL. Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccProcessorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccProcessorTest.java (revision c4ec543a8b0da9c5d9a2e25739ac31fd68d13a5d) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccProcessorTest.java (revision ) @@ -64,16 +64,17 @@ MvccProcessorImpl mvccProcessor = mvccProcessor(grid); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 1, MvccUtils.MVCC_OP_COUNTER_NA))); + assertEquals(TxState.NA, mvccProcessor.txState(1, 1)); grid.context().cache().context().database().checkpointReadLock(); + try { - mvccProcessor.updateState(new MvccVersionImpl(1, 1, MvccUtils.MVCC_OP_COUNTER_NA), TxState.PREPARED); - mvccProcessor.updateState(new MvccVersionImpl(1, 2, MvccUtils.MVCC_OP_COUNTER_NA), TxState.PREPARED); - mvccProcessor.updateState(new MvccVersionImpl(1, 3, MvccUtils.MVCC_OP_COUNTER_NA), TxState.COMMITTED); - mvccProcessor.updateState(new MvccVersionImpl(1, 4, MvccUtils.MVCC_OP_COUNTER_NA), TxState.ABORTED); - mvccProcessor.updateState(new MvccVersionImpl(1, 5, MvccUtils.MVCC_OP_COUNTER_NA), TxState.ABORTED); - mvccProcessor.updateState(new MvccVersionImpl(1, 6, MvccUtils.MVCC_OP_COUNTER_NA), TxState.PREPARED); + mvccProcessor.updateTxState(1, 1, TxState.PREPARED, true); + mvccProcessor.updateTxState(1, 2, TxState.PREPARED, true); + mvccProcessor.updateTxState(1, 3, TxState.COMMITTED, true); + mvccProcessor.updateTxState(1, 4, TxState.ABORTED, true); + mvccProcessor.updateTxState(1, 5, TxState.ABORTED, true); + mvccProcessor.updateTxState(1, 6, TxState.PREPARED, true); } finally { grid.context().cache().context().database().checkpointReadUnlock(); @@ -88,20 +89,20 @@ mvccProcessor = mvccProcessor(grid); } - assertEquals(TxState.PREPARED, mvccProcessor.state(new MvccVersionImpl(1, 1, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.PREPARED, mvccProcessor.state(new MvccVersionImpl(1, 2, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.COMMITTED, mvccProcessor.state(new MvccVersionImpl(1, 3, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.ABORTED, mvccProcessor.state(new MvccVersionImpl(1, 4, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.ABORTED, mvccProcessor.state(new MvccVersionImpl(1, 5, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.PREPARED, mvccProcessor.state(new MvccVersionImpl(1, 6, MvccUtils.MVCC_OP_COUNTER_NA))); + assertEquals(TxState.PREPARED, mvccProcessor.txState(1, 1)); + assertEquals(TxState.PREPARED, mvccProcessor.txState(1, 2)); + assertEquals(TxState.COMMITTED, mvccProcessor.txState(1, 3)); + assertEquals(TxState.ABORTED, mvccProcessor.txState(1, 4)); + assertEquals(TxState.ABORTED, mvccProcessor.txState(1, 5)); + assertEquals(TxState.PREPARED, mvccProcessor.txState(1, 6)); mvccProcessor.removeUntil(new MvccVersionImpl(1, 5, MvccUtils.MVCC_OP_COUNTER_NA)); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 1, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 2, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 3, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 4, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.NA, mvccProcessor.state(new MvccVersionImpl(1, 5, MvccUtils.MVCC_OP_COUNTER_NA))); - assertEquals(TxState.PREPARED, mvccProcessor.state(new MvccVersionImpl(1, 6, MvccUtils.MVCC_OP_COUNTER_NA))); + assertEquals(TxState.NA, mvccProcessor.txState(1, 1)); + assertEquals(TxState.NA, mvccProcessor.txState(1, 2)); + assertEquals(TxState.NA, mvccProcessor.txState(1, 3)); + assertEquals(TxState.NA, mvccProcessor.txState(1, 4)); + assertEquals(TxState.NA, mvccProcessor.txState(1, 5)); + assertEquals(TxState.PREPARED, mvccProcessor.txState(1, 6)); } }