diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index c4ff39d..1b8d41c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -174,13 +174,14 @@ public class CacheConfiguration extends MutableConfiguration { /** Default size for onheap SQL row cache size. */ public static final int DFLT_SQL_ONHEAP_ROW_CACHE_SIZE = 10 * 1024; - /** Default threshold for concurrent load-all */ - public static final int DFLT_LOAD_ALL_THRESHOLD = 5; + /** Default threshold for concurrent loading of keys from {@link CacheStore}. */ + public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5; /** Cache name. */ private String name; - private int storeConcurrentLoadAllThreshold = DFLT_LOAD_ALL_THRESHOLD; + /** Threshold for concurrent loading of keys from {@link CacheStore}. */ + private int storeConcurrentLoadAllThreshold = DFLT_CONCURRENT_LOAD_ALL_THRESHOLD; /** Rebalance thread pool size. */ @Deprecated @@ -839,6 +840,37 @@ public class CacheConfiguration extends MutableConfiguration { } /** + * Gets the threshold used in cases when values for multiple keys are being loaded from an underlying + * {@link CacheStore} in parallel. In the situation when several threads load the same or intersecting set of keys + * and the total number of keys to load is less or equal to this threshold then there will be no a second call to + * the storage in order to load a key from thread A if the same key is already being loaded by thread B. + * + * The threshold should be controlled wisely. On the one hand if it's set to a big value then the interaction with + * a storage during the load of missing keys will be minimal. On the other hand the big value may result in + * significant performance degradation because it is needed to check for every key whether it's being loaded or not. + * + * When not set, default value is {@link #DFLT_CONCURRENT_LOAD_ALL_THRESHOLD}. + * + * @return The concurrent load-all threshold. + */ + public int getStoreConcurrentLoadAllThreshold() { + return storeConcurrentLoadAllThreshold; + } + + /** + * Sets the concurrent load-all threshold used for cases when keys' values are being loaded from {@link CacheStore} + * in parallel. + * + * @param storeConcurrentLoadAllThreshold The concurrent load-all threshold. + * @return {@code this} for chaining. + */ + public CacheConfiguration setStoreConcurrentLoadAllThreshold(int storeConcurrentLoadAllThreshold) { + this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold; + + return this; + } + + /** * Gets key topology resolver to provide mapping from keys to nodes. * * @return Key topology resolver to provide mapping from keys to nodes. @@ -1887,24 +1919,6 @@ public class CacheConfiguration extends MutableConfiguration { } /** - * The key-count threshold above which concurrent loads are applied - * @return The concurrent load-all threshold - */ - public int getStoreConcurrentLoadAllThreshold() { - return storeConcurrentLoadAllThreshold; - } - - /** - * Sets the key count threshold above which concurrent loads are applied - * @param storeConcurrentLoadAllThreshold The concurrent load-all threshold - * @return {@code this} for chaining. - */ - public CacheConfiguration setStoreConcurrentLoadAllThreshold(int storeConcurrentLoadAllThreshold) { - this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold; - return this; - } - - /** * Filter that accepts all nodes. */ public static class IgniteAllNodesPredicate implements IgnitePredicate { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java index 2636272..8992326 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java @@ -40,7 +40,7 @@ import org.jsr166.ConcurrentHashMap8; */ public class CacheStoreBalancingWrapper implements CacheStore { /** */ - public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_LOAD_ALL_THRESHOLD; + public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD; /** Delegate store. */ private CacheStore delegate; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index e18c5e6..6bfafd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -112,7 +112,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt store = cacheStoreWrapper(ctx, cfgStore, cfg); - singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store, cfg.getStoreConcurrentLoadAllThreshold()); + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store, + cfg.getStoreConcurrentLoadAllThreshold()); ThreadLocal sesHolder0 = null; diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java index 1e3e4b4..bfbb08c 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper; @@ -127,15 +128,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentLoad() throws Exception { - int threads = 5; + CacheConfiguration cfg = new CacheConfiguration(); - final int keys = 50; + assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold()); + doTestConcurrentLoad(5, 50, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentLoadCustomThreshold() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setStoreConcurrentLoadAllThreshold(15); + + assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold()); + + doTestConcurrentLoad(5, 50, cfg.getStoreConcurrentLoadAllThreshold()); + } + + /** + * @throws Exception If failed. + */ + private void doTestConcurrentLoad(int threads, final int keys, int threshold) throws Exception { final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys); - final CacheStoreBalancingWrapper wrapper =new CacheStoreBalancingWrapper<>(store); + final CacheStoreBalancingWrapper wrapper = new CacheStoreBalancingWrapper<>(store, threshold); GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() { @@ -159,17 +180,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentLoadAll() throws Exception { - int threads = 5; + CacheConfiguration cfg = new CacheConfiguration(); - final int threshold = 5; + assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold()); - final int keysCnt = 100; + doTestConcurrentLoadAll(5, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 150); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentLoadAllCustomThreshold() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setStoreConcurrentLoadAllThreshold(15); + assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold()); + + doTestConcurrentLoadAll(5, cfg.getStoreConcurrentLoadAllThreshold(), 150); + } + + /** + * @throws Exception If failed. + */ + private void doTestConcurrentLoadAll(int threads, final int threshold, final int keysCnt) throws Exception { final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt); - final CacheStoreBalancingWrapper wrapper = new CacheStoreBalancingWrapper<>(store); + final CacheStoreBalancingWrapper wrapper = new CacheStoreBalancingWrapper<>(store, threshold); GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() {