Index: modules/core/src/test/java/org/apache/ignite/cache/NPEReproducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/cache/NPEReproducer.java (date 1596540012335) +++ modules/core/src/test/java/org/apache/ignite/cache/NPEReproducer.java (date 1596540012335) @@ -0,0 +1,195 @@ +package org.apache.ignite.cache; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +public class NPEReproducer extends GridCommonAbstractTest { + /** Cache1 name. */ + private static final String CACHE_1_NAME = "cache1"; + + /** Cache2 name. */ + private static final String CACHE_2_NAME = "cache2"; + + /** rnd instance. */ + private static final GridRandom rnd = new GridRandom(); + + /** */ + private CacheConfiguration ccfg1; + + /** */ + private CacheConfiguration ccfg2; + + Person[] persons = createTestData(); + + Random random = new Random(); + + /** */ + private ListeningTestLogger testLogger = new ListeningTestLogger(log); + + /** */ + private LogListener lsnr = LogListener.matches("NullPointerException").build(); + + /** + * {@inheritDoc} + */ + @Override + protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setGridLogger(testLogger); + + ccfg1 = new CacheConfiguration(CACHE_1_NAME) + .setCacheMode(PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(FULL_SYNC) + .setBackups(1); + ccfg2 = new CacheConfiguration(CACHE_2_NAME) + .setCacheMode(PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(FULL_SYNC) + .setBackups(1); + + return cfg.setCacheConfiguration(ccfg1,ccfg2) + .setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true))); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + testLogger.registerListener(lsnr); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + testLogger.unregisterListener(lsnr); + } + + @Test + public void NPEReproducer() throws Exception { + + startGridsMultiThreaded(2); + + Ignition.setClientMode(true); + + IgniteEx client = startGrid("client"); + + client.cluster().active(true); + + CountDownLatch destroyLatch = new CountDownLatch(1); + + final IgniteCache cache = client.getOrCreateCache(ccfg1); + + final IgniteCache cache2 = client.getOrCreateCache(ccfg2); + + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(client); + + IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { + try { + destroyLatch.await(); + + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + doSleep(rnd.nextInt(500)); + + spi.stopBlock(); + }); + + cache.destroy(); + + f.get(); + } + catch (Exception e) { + e.printStackTrace(); + } + }); + + spi.blockMessages((node, msg) -> { + if (msg instanceof GridNearTxPrepareRequest) + destroyLatch.countDown(); + + return false; + }); + + Map values = new HashMap<>(); + + for (int i = 0; i < 100; i++) + values.put(random.nextInt(1000), persons[random.nextInt(100)]); + + IgniteTransactions txs = client.transactions(); + + IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED, 5000, 200)) { + + cache.putAll(values); + + cache2.putAll(values); + + tx.commit(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + f0.get(); + f1.get(); + + assertFalse(lsnr.check()); + } + + /** + * + */ + private Person[] createTestData() { + final int num = 100; + + Person[] res = new Person[num]; + + for (int i = 0; i < num; i++) + res[i] = new Person("John", "Dow", i); + + return res; + } + + private class Person { + String firstName; + String lastName; + int age; + + public Person(String firstName, String lastName, int age) { + this.firstName = firstName; + this.lastName = lastName; + this.age = age; + } + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java (revision 726d03b3e6b8011ff4ec4c9061547225567a302c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java (date 1596539929721) @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; @@ -981,6 +982,12 @@ key.finishUnmarshal(coctx, clsLdr); + try { + GridCacheContext.latch.await(500, TimeUnit.MILLISECONDS); + } + catch (Exception e){ + } + val.unmarshal(coctx, clsLdr); if (expiryPlcBytes != null && expiryPlc == null) Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java (revision 726d03b3e6b8011ff4ec4c9061547225567a302c) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java (date 1596539929847) @@ -42,6 +42,7 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -61,7 +62,7 @@ import org.junit.Test; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * @@ -150,6 +151,7 @@ */ @Test public void testTxOnCacheStopNoMessageBlock() throws Exception { + //Reproduce here runTxOnCacheStop(false); } @@ -172,8 +174,10 @@ ig.cluster().active(true); for (TransactionConcurrency conc : TransactionConcurrency.values()) { - for (TransactionIsolation iso : TransactionIsolation.values()) + for (TransactionIsolation iso : TransactionIsolation.values()) { + GridCacheContext.latch = new CountDownLatch(1); runTxOnCacheStop(conc, iso, ig, block); + } } } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java (revision 726d03b3e6b8011ff4ec4c9061547225567a302c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java (date 1596539929752) @@ -58,6 +58,8 @@ /** {@inheritDoc} */ @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (ctx == null) + throw new NullPointerException(); // No-op. } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java (revision 726d03b3e6b8011ff4ec4c9061547225567a302c) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java (date 1596539929768) @@ -2051,6 +2051,8 @@ return updatesAllowed; } + public static CountDownLatch latch = new CountDownLatch(1); + /** * Nulling references to potentially leak-prone objects. */ @@ -2062,6 +2064,8 @@ dataStructuresMgr = null; cacheObjCtx = null; + latch.countDown(); + if (expiryPlc instanceof Closeable) U.closeQuiet((Closeable)expiryPlc);