Index: modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java new file mode 100644 --- /dev/null (date 1675008051613) +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java (date 1675008051613) @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cdc; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.metric.LongMetric; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; +import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX; +import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class CdcIndexRebuildTest extends AbstractCdcTest { + /** */ + public static final int WAL_ARCHIVE_TIMEOUT = 5_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setCdcEnabled(true) + .setMaxSize(512 * 1024 * 1024)) + .setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName)) + .setWalSegmentSize(16 * 1024 * 1024) + ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * + */ + @Test + public void testIndexRebuild() throws Exception { + startGrid(0).cluster().state(ACTIVE); + + int valsCnt = 1024 * 30; + + populateCache(valsCnt); + + CountingCdcConsumer cnsmr = new CountingCdcConsumer(); + + CdcMain cdc = createCdc(cnsmr, getConfiguration(grid(0).name())); + + IgniteInternalFuture fut = runAsync(cdc); + + log.warning(">>>>>> Waiting for all CDC events"); + assertTrue(waitForCondition(() -> cnsmr.commited() && cnsmr.eventsCount() == valsCnt, getTestTimeout())); + + log.warning(String.format(">>>>>> CountingCdcConsumer information [commited=%b, evtsCount=%d]:", + cnsmr.commited(), cnsmr.eventsCount())); + + waitForWalSegmentsHandling(cdc); + + cnsmr.resetEvents(); + + GridCacheContext cctx = grid(0).cachex(DEFAULT_CACHE_NAME).context(); + + log.warning(">>>>>> Forcing index rebuild"); + forceRebuildIndexes(grid(0), cctx); + + IgniteInternalFuture idxRebuildFut = indexRebuildFuture(grid(0), cacheId(DEFAULT_CACHE_NAME)); + + idxRebuildFut.get(getTestTimeout()); + + log.warning(String.format(">>>>>> CountingCdcConsumer information [commited=%b, evtsCount=%d]:", + cnsmr.commited(), cnsmr.eventsCount())); + + // Inserting of value fixes WAL handling: +// runAsync(() -> grid(0).cache(DEFAULT_CACHE_NAME).put(0, new TestVal())); + + waitForWalSegmentsHandling(cdc); + + fut.cancel(); + } + + /** + * @param cdc Cdc. + */ + private void waitForWalSegmentsHandling(CdcMain cdc) throws IgniteInterruptedCheckedException { + assertTrue("Wal segments was not committed by CdcConsumer", waitForCondition(() -> { + long lastArchivedSeg = grid(0).context().cache().context().wal().lastArchivedSegment(); + + MetricRegistry mreg = getFieldValue(cdc, "mreg"); + + long committedCdcSegIdx = mreg.findMetric(COMMITTED_SEG_IDX).value(); + long curCdcSegIdx = mreg.findMetric(CUR_SEG_IDX).value(); + + log.warning(String.format(">>>>>> Information about CDC and WAL: " + + "[lastArchivedSeg=%d, committedCdcSegIdx=%d, curCdcSegIdx=%d]", + lastArchivedSeg, + committedCdcSegIdx, + curCdcSegIdx)); + + return lastArchivedSeg == committedCdcSegIdx; + }, + WAL_ARCHIVE_TIMEOUT * 4, + WAL_ARCHIVE_TIMEOUT)); + } + + /** + * @param valsCnt Vals count. + */ + private int populateCache(int valsCnt) { + IgniteCache cache = grid(0).getOrCreateCache( + new CacheConfiguration(DEFAULT_CACHE_NAME) + .setIndexedTypes(Integer.class, TestVal.class)); + + Map vals = new HashMap<>(valsCnt); + + for (int i = 0; i < valsCnt; i++) + vals.put(i, new TestVal()); + + cache.putAll(vals); + return valsCnt; + } + + /** + * + */ + public static class CountingCdcConsumer implements CdcConsumer { + /** Commited. */ + private final AtomicBoolean commited = new AtomicBoolean(); + + /** Evts count. */ + private final AtomicInteger evtsCnt = new AtomicInteger(); + + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry mreg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean onEvents(Iterator events) { + commited.set(true); + + while (events.hasNext()) { + events.next(); + + evtsCnt.incrementAndGet(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public void onTypes(Iterator types) { + types.forEachRemaining(t -> { /* No-op */ }); + } + + /** {@inheritDoc} */ + @Override public void onMappings(Iterator mappings) { + mappings.forEachRemaining(m -> { /* No-op */ }); + } + + /** {@inheritDoc} */ + @Override public void onCacheChange(Iterator cacheEvents) { + cacheEvents.forEachRemaining(e -> { /* No-op */ }); + } + + /** {@inheritDoc} */ + @Override public void onCacheDestroy(Iterator caches) { + caches.forEachRemaining(c -> { /* No-op */ }); + } + + /** {@inheritDoc} */ + @Override public void stop() { + // No-op. + } + + /** + * + */ + public void resetEvents() { + commited.set(false); + evtsCnt.set(0); + } + + /** + * @return Commited. + */ + public boolean commited() { + return commited.get(); + } + + /** + * @return Evts count. + */ + public int eventsCount() { + return evtsCnt.get(); + } + } + + /** + * + */ + public static class TestVal { + /** Field 0. */ + @QuerySqlField(index = true, inlineSize = 256) + private final String field0; + + /** Field 0. */ + @QuerySqlField(index = true, inlineSize = 256) + private final String field1; + + /** Field 0. */ + @QuerySqlField(index = true, inlineSize = 256) + private final String field2; + + /** Field 0. */ + @QuerySqlField(index = true, inlineSize = 256) + private final String field3; + + /** Field 0. */ + @QuerySqlField(index = true, inlineSize = 256) + private final String field4; + + /** + * Default constructor. + */ + public TestVal() { + field0 = UUID.randomUUID().toString(); + field1 = UUID.randomUUID().toString(); + field2 = UUID.randomUUID().toString(); + field3 = UUID.randomUUID().toString(); + field4 = UUID.randomUUID().toString(); + } + } +}