Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-19561

Ignite thin client continuous query listener cannot listen to all events

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.15
    • None
    • cache, thin client
    • None
    • JDK 1.8 

      Windows 10

    • Docs Required, Release Notes Required

    Description

      Problem scenario:

      Start the Ignite server of one node, start one thin client and create a continuous query listener, and then use 50 threads to add 500 data to the cache concurrently.

      Problem phenomenon:

      Through the information printed on the listener, it was found that the number of events listened to each time varies, possibly 496, 498, 499 or 500...

      Test Code:

      import org.apache.ignite.Ignite;
      import org.apache.ignite.Ignition;
      import org.apache.ignite.configuration.IgniteConfiguration;
      import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
      import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
      
      import java.util.ArrayList;
      import java.util.List;
      
      public class StartServer {
          public static void main(String[] args) {
              IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
              TcpDiscoverySpi spi = new TcpDiscoverySpi();
              List<String> addrList = new ArrayList<>();
              addrList.add("127.0.0.1:47500");
              TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
              ipFinder.setAddresses(addrList);
              spi.setIpFinder(ipFinder);
              igniteConfiguration.setDiscoverySpi(spi);
              Ignite ignite = Ignition.start(igniteConfiguration);
          }
      }
      
      import org.apache.ignite.Ignition;
      import org.apache.ignite.cache.query.ContinuousQuery;
      import org.apache.ignite.client.ClientCache;
      import org.apache.ignite.client.IgniteClient;
      import org.apache.ignite.configuration.ClientConfiguration;
      
      import javax.cache.event.CacheEntryEvent;
      import javax.cache.event.CacheEntryListenerException;
      import javax.cache.event.CacheEntryUpdatedListener;
      import java.util.Iterator;
      
      public class StartThinClient {
          public static void main(String[] args) throws InterruptedException {
              String addr = "127.0.0.1:10800";
      
              int threadNmu = 50;
      
              ClientConfiguration clientConfiguration = new ClientConfiguration();
              clientConfiguration.setAddresses(addr);
      
              IgniteClient client1 = Ignition.startClient(clientConfiguration);
      
              ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test");
      
              ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
              query.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
                  @Override
                  public void onUpdated(Iterable<CacheEntryEvent<?, ?>> cacheEntryEvents) throws CacheEntryListenerException {
                      Iterator<CacheEntryEvent<?, ?>> iterator = cacheEntryEvents.iterator();
                      while (iterator.hasNext()) {
                          CacheEntryEvent<?, ?> next = iterator.next();
                          System.out.println("----" + next.getKey());
                      }
                  }
              });
      
              cache1.query(query);
      
              IgniteClient client2 = Ignition.startClient(clientConfiguration);
              ClientCache<Object, Object> cache2 = client2.cache("test");
      
              Thread[] threads = new Thread[threadNmu];
              for (int i = 0; i < threads.length; ++i) {
                  threads[i] = new Thread(new OperationInsert(cache2, i, 500, threadNmu));
              }
              for (int i = 0; i < threads.length; ++i) {
                  threads[i].start();
              }
              for (Thread thread : threads) {
                  thread.join();
              }
      
              Thread.sleep(60000);
      
          }
      
          static class OperationInsert implements Runnable {
      
              private ClientCache<Object, Object> cache;
              private int k;
              private Integer test_rows;
              private Integer thread_cnt;
      
              public OperationInsert(ClientCache<Object, Object> cache, int k, Integer test_rows, Integer thread_cnt) {
                  this.cache = cache;
                  this.k = k;
                  this.test_rows = test_rows;
                  this.thread_cnt = thread_cnt;
              }
      
              @Override
              public void run() {
                  for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + (test_rows/thread_cnt) * (k + 1); i++) {
                      cache.put("" + i, "aaa");
                  }
              }
          }
      
      } 

      Running results:

      result1.logresult2.log

      Version:

      The testing program uses Ignite version 2.15.0.

      I attempted to insert data using one thread and did not observe any event loss. In addition, I also attempted an Ignite cluster with two or three nodes, which can still listen to all 500 events even when inserting data using multiple threads.This problem seems to only occur when concurrent threads insert data into a node.

      Attachments

        1. result1.log
          6 kB
          Mengyu Jing
        2. result2.log
          6 kB
          Mengyu Jing

        Activity

          People

            ptupitsyn Pavel Tupitsyn
            lonesomerain Mengyu Jing
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: