Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.15
-
None
-
None
-
JDK 1.8
Windows 10
-
Docs Required, Release Notes Required
Description
Problem scenario:
Start the Ignite server of one node, start one thin client and create a continuous query listener, and then use 50 threads to add 500 data to the cache concurrently.
Problem phenomenon:
Through the information printed on the listener, it was found that the number of events listened to each time varies, possibly 496, 498, 499 or 500...
Test Code:
import org.apache.ignite.Ignite; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import java.util.ArrayList; import java.util.List; public class StartServer { public static void main(String[] args) { IgniteConfiguration igniteConfiguration = new IgniteConfiguration(); TcpDiscoverySpi spi = new TcpDiscoverySpi(); List<String> addrList = new ArrayList<>(); addrList.add("127.0.0.1:47500"); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); ipFinder.setAddresses(addrList); spi.setIpFinder(ipFinder); igniteConfiguration.setDiscoverySpi(spi); Ignite ignite = Ignition.start(igniteConfiguration); } }
import org.apache.ignite.Ignition; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.client.ClientCache; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.configuration.ClientConfiguration; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import java.util.Iterator; public class StartThinClient { public static void main(String[] args) throws InterruptedException { String addr = "127.0.0.1:10800"; int threadNmu = 50; ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setAddresses(addr); IgniteClient client1 = Ignition.startClient(clientConfiguration); ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test"); ContinuousQuery<Object, Object> query = new ContinuousQuery<>(); query.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> cacheEntryEvents) throws CacheEntryListenerException { Iterator<CacheEntryEvent<?, ?>> iterator = cacheEntryEvents.iterator(); while (iterator.hasNext()) { CacheEntryEvent<?, ?> next = iterator.next(); System.out.println("----" + next.getKey()); } } }); cache1.query(query); IgniteClient client2 = Ignition.startClient(clientConfiguration); ClientCache<Object, Object> cache2 = client2.cache("test"); Thread[] threads = new Thread[threadNmu]; for (int i = 0; i < threads.length; ++i) { threads[i] = new Thread(new OperationInsert(cache2, i, 500, threadNmu)); } for (int i = 0; i < threads.length; ++i) { threads[i].start(); } for (Thread thread : threads) { thread.join(); } Thread.sleep(60000); } static class OperationInsert implements Runnable { private ClientCache<Object, Object> cache; private int k; private Integer test_rows; private Integer thread_cnt; public OperationInsert(ClientCache<Object, Object> cache, int k, Integer test_rows, Integer thread_cnt) { this.cache = cache; this.k = k; this.test_rows = test_rows; this.thread_cnt = thread_cnt; } @Override public void run() { for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + (test_rows/thread_cnt) * (k + 1); i++) { cache.put("" + i, "aaa"); } } } }
Running results:
Version:
The testing program uses Ignite version 2.15.0.
I attempted to insert data using one thread and did not observe any event loss. In addition, I also attempted an Ignite cluster with two or three nodes, which can still listen to all 500 events even when inserting data using multiple threads.This problem seems to only occur when concurrent threads insert data into a node.
Attachments
Attachments
Issue Links
- links to