-
Type:
Bug
-
Status: Open
-
Priority:
Major
-
Resolution: Unresolved
-
Affects Version/s: 2.8
-
Fix Version/s: None
-
Component/s: persistence
-
Labels:None
For now, FileWriteAheadLogManager#hasIndex firstly determines that the WAL segment not exists in an archive (File.exists) and then determines that index was in the archive (using Files.list). If the archive file was created between these operations hasIndex will return the false-negative result and the partition map exchange will fail on this node.
Reproducer:
public class IgniteWalHistoryReservationsWithLoadTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1)); DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY) .setWalSegmentSize(512 * 1024) .setCheckpointFrequency(500); cfg.setDataStorageConfiguration(memCfg); CacheConfiguration ccfg1 = new CacheConfiguration(); ccfg1.setName("cache1"); ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32)); cfg.setCacheConfiguration(ccfg1); return cfg; } @Test public void testReservationWithConstantLoad() throws Exception { final IgniteEx node = startGrid(0); node.cluster().active(true); AtomicLong cntr = new AtomicLong(100_000); ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr); IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader"); U.sleep(500); forceCheckpoint(node); // Reserve history from the beginning. node.context().cache().context().database().reserveHistoryForExchange(); long endTime = U.currentTimeMillis() + 60_000; GridCacheContext ctx = node.cachex("cache1").context(); int grpId = ctx.groupId(); int parts = ctx.topology().partitions(); try { while (U.currentTimeMillis() < endTime && !Thread.currentThread().isInterrupted()) { try { for (int p = 0; p < parts; p++) { boolean reserved = node.context().cache().context().database().reserveHistoryForPreloading(grpId, p, cntr.get()); assertTrue("Unable to reserve history [p=" + p + ", cntr=" + cntr.get() + "]", reserved); } } finally { node.context().cache().context().database().releaseHistoryForPreloading(); } } } finally { node.context().cache().context().database().releaseHistoryForExchange(); ldr.stop(); } fut.get(10_000); } static class ConstantLoader implements Callable<Void> { private final IgniteCache cache; private final AtomicLong cntr; private volatile boolean stop; ConstantLoader(IgniteCache cache, AtomicLong cntr) { this.cache = cache; this.cntr = cntr; } @Override public Void call() throws Exception { while (!stop && !Thread.currentThread().isInterrupted()) { long n = cntr.getAndIncrement(); cache.put(n, n); if (n % 100_000 == 0) log.info("Loaded " + n); } return null; } public void stop() { stop = true; } } /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { stopAllGrids(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); cleanPersistenceDir(); } }
- links to