Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java (date 1424863838000) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java (date 1424964487000) @@ -59,6 +59,9 @@ /** Affinity mapper function. */ private final CacheAffinityKeyMapper affMapper; + /** Default affinity mapper function. */ + private final CacheAffinityKeyMapper dfltAffMapper; + /** Affinity calculation results cache: topology version => partition => nodes. */ private final ConcurrentMap affCache; @@ -84,6 +87,7 @@ * @param cacheName Cache name. * @param aff Affinity function. * @param affMapper Affinity key mapper. + * @param dfltAffMapper Default affinity key mapper. * @param backups Number of backups. */ @SuppressWarnings("unchecked") @@ -91,6 +95,7 @@ String cacheName, CacheAffinityFunction aff, CacheAffinityKeyMapper affMapper, + CacheAffinityKeyMapper dfltAffMapper, int backups) { this.ctx = ctx; @@ -98,6 +103,7 @@ this.affMapper = affMapper; this.cacheName = cacheName; this.backups = backups; + this.dfltAffMapper = dfltAffMapper; log = ctx.logger(GridAffinityAssignmentCache.class); @@ -305,7 +311,18 @@ } } - return aff.partition(affMapper.affinityKey(key)); + return aff.partition(affinityKey(key)); + } + + /** + * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and + * will use {@link GridCacheDefaultAffinityKeyMapper default}. + * + * @param key Key. + * @return Affinity key. + */ + private Object affinityKey(Object key) { + return (dfltAffMapper != null && key instanceof GridCacheInternal ? dfltAffMapper : affMapper).affinityKey(key); } /** Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java (date 1424863838000) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java (date 1424964487000) @@ -44,8 +44,17 @@ /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { + GridCacheDefaultAffinityKeyMapper dfltAffKeyMapper = null; + + if (!cctx.config().getAffinityMapper().getClass().equals(GridCacheDefaultAffinityKeyMapper.class)) { + dfltAffKeyMapper = new GridCacheDefaultAffinityKeyMapper(); + + dfltAffKeyMapper.setIgnite(cctx.grid()); + dfltAffKeyMapper.setLog(cctx.logger(GridCacheDefaultAffinityKeyMapper.class)); + } + - aff = new GridAffinityAssignmentCache(cctx, cctx.namex(), cctx.config().getAffinity(), + aff = new GridAffinityAssignmentCache(cctx, cctx.namex(), cctx.config().getAffinity(), - cctx.config().getAffinityMapper(), cctx.config().getBackups()); + cctx.config().getAffinityMapper(), dfltAffKeyMapper, cctx.config().getBackups()); // Generate internal keys for partitions. int partCnt = partitions(); Index: ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java (date 1424863838000) +++ ../incubator-ignite/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java (date 1424964487000) @@ -45,7 +45,6 @@ private static final long serialVersionUID = 0L; /** Injected ignite instance. */ - @IgniteInstanceResource private Ignite ignite; /** Reflection cache. */ @@ -75,7 +74,6 @@ ); /** Logger. */ - @LoggerResource private transient IgniteLogger log; /** @@ -123,5 +121,25 @@ /** {@inheritDoc} */ @Override public void reset() { // No-op. + } + + /** + * Sets logger. + * + * @param log Ignite logger. + */ + @LoggerResource + public void setLog(IgniteLogger log) { + this.log = log; + } + + /** + * Sets Ignite. + * + * @param ignite Ignite. + */ + @IgniteInstanceResource + public void setIgnite(Ignite ignite) { + this.ignite = ignite; } } Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java (date 1424964487000) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java (date 1424964487000) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; + +/** + * Test affinity mapper. + */ +public abstract class GridCacheAbstractUsersAffinityMapperSelfTest extends GridCommonAbstractTest { + /** */ + private static final int KEY_CNT = 1000; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + public static final CacheAffinityKeyMapper AFFINITY_MAPPER = new UsersAffinityKeyMapper(); + + /** */ + public GridCacheAbstractUsersAffinityMapperSelfTest() { + super(false /* doesn't start grid */); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(null); + cacheCfg.setCacheMode(getCacheMode()); + cacheCfg.setAtomicityMode(getAtomicMode()); + cacheCfg.setDistributionMode(getDistributionMode()); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setPreloadMode(CachePreloadMode.SYNC); + cacheCfg.setAffinityMapper(AFFINITY_MAPPER); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** + * @return Distribution mode. + */ + protected abstract CacheDistributionMode getDistributionMode(); + + /** + * @return Cache atomicity mode. + */ + protected abstract CacheAtomicityMode getAtomicMode(); + + /** + * @return Cache mode. + */ + protected abstract CacheMode getCacheMode(); + + /** + * @throws Exception If failed. + */ + public void testAffinityMapper() throws Exception { + IgniteCache cache = startGrid(0).jcache(null); + + for (int i = 0; i < KEY_CNT; i++) { + cache.put(String.valueOf(i), String.valueOf(i)); + + cache.put(new TestAffinityKey(i, String.valueOf(i)), i); + } + + assertEquals(1, cache.get(new TestAffinityKey(1, "1"))); + + startGrid(1); + + for (int i = 0; i < KEY_CNT; i++) + grid(i % 2).compute().affinityRun(null, new TestAffinityKey(1, "1"), new NoopClosure()); + } + + /** + * Test key for field annotation. + */ + private static class TestAffinityKey implements Externalizable { + /** Key. */ + private int key; + + /** Affinity key. */ + @CacheAffinityKeyMapped + private String affKey; + + /** + * Constructor. + */ + public TestAffinityKey() { + } + + /** + * Constructor. + * + * @param key Key. + * @param affKey Affinity key. + */ + TestAffinityKey(int key, String affKey) { + this.key = key; + this.affKey = affKey; + } + + /** + * @return Key. + */ + public int key() { + return key; + } + + /** + * @return Affinity key. + */ + public String affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return o instanceof TestAffinityKey && key == ((TestAffinityKey)o).key(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key + affKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(key); + out.writeUTF(affKey); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + key = in.readInt(); + affKey = in.readUTF(); + } + } + + /** + * Users affinity mapper. + */ + private static class UsersAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper{ + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + GridArgumentCheck.notNull(key, "key"); + + assertFalse("GridCacheInternal entry mustn't be passed in user's key mapper.", + key instanceof GridCacheInternal); + + return super.affinityKey(key); + } + } + + /** + * Noop closure. + */ + private static class NoopClosure implements IgniteRunnable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java (date 1424964487000) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicUsersAffinityMapperSelfTest.java (date 1424964487000) @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheAtomicUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheAtomicUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.PARTITIONED; + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java (date 1424964487000) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedUsersAffinityMapperSelfTest.java (date 1424964487000) @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheReplicatedUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheReplicatedUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.REPLICATED; + } +} Index: ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java (date 1424964487000) +++ ../incubator-ignite/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxUsersAffinityMapperSelfTest.java (date 1424964487000) @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Test affinity mapper. + */ +public class GridCacheTxUsersAffinityMapperSelfTest extends GridCacheAbstractUsersAffinityMapperSelfTest { + /** */ + public GridCacheTxUsersAffinityMapperSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return CacheDistributionMode.PARTITIONED_ONLY; + }; + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return CacheMode.PARTITIONED; + } +}