Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-7523

Exception on data expiration after sharedRDD.saveValues call

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 2.9
    • Fix Version/s: None
    • Component/s: spark
    • Labels:
      None

      Description

      Reproducer:

      package rdd_expiration;
      
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.List;
      import java.util.UUID;
      import java.util.concurrent.atomic.AtomicLong;
      import javax.cache.Cache;
      import javax.cache.expiry.CreatedExpiryPolicy;
      import javax.cache.expiry.Duration;
      import org.apache.ignite.Ignite;
      import org.apache.ignite.IgniteCache;
      import org.apache.ignite.Ignition;
      import org.apache.ignite.configuration.CacheConfiguration;
      import org.apache.ignite.configuration.DataRegionConfiguration;
      import org.apache.ignite.configuration.DataStorageConfiguration;
      import org.apache.ignite.configuration.IgniteConfiguration;
      import org.apache.ignite.lang.IgniteOutClosure;
      import org.apache.ignite.spark.JavaIgniteContext;
      import org.apache.ignite.spark.JavaIgniteRDD;
      import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
      import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
      import org.apache.log4j.Level;
      import org.apache.log4j.Logger;
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      
      import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
      import static org.apache.ignite.cache.CacheMode.PARTITIONED;
      import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
      
      /**
      * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
      * particular example is to provide the simplest code example of this logic.
      * <p>
      * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node.
      * <p>
      * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's
      * \{@code standalone} property to \{@code true} and running an Ignite node separately with
      * `examples/config/spark/example-shared-rdd.xml` config.
      */
      public class RddExpiration {
      /**
      * Executes the example.
      * @param args Command line arguments, none required.
      */
      public static void main(String args[]) throws InterruptedException {
      
      Ignite server = null;
      
      for (int i = 0; i < 4; i++) {
      IgniteConfiguration serverCfg = createIgniteCfg();
      serverCfg.setClientMode(false);
      serverCfg.setIgniteInstanceName("Server" + i);
      server = Ignition.start(serverCfg);
      }
      
      server.active(true);
      
      
      // Spark Configuration.
      SparkConf sparkConf = new SparkConf()
      .setAppName("JavaIgniteRDDExample")
      .setMaster("local")
      .set("spark.executor.instances", "2");
      
      // Spark context.
      JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
      
      // Adjust the logger to exclude the logs of no interest.
      Logger.getRootLogger().setLevel(Level.ERROR);
      Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
      
      // Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
      JavaIgniteContext<UUID, Integer> igniteContext = new JavaIgniteContext<UUID, Integer>(
      sparkContext,
      new IgniteOutClosure<IgniteConfiguration>() {
      @Override public IgniteConfiguration apply() {
      return createIgniteCfg();
      }
      },
      true);
      
      // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
      JavaIgniteRDD<UUID, Integer> sharedRDD = igniteContext.<UUID, Integer>fromCache("sharedRDD");
      
      long start = System.currentTimeMillis();
      
      long totalLoaded = 0;
      
      while(System.currentTimeMillis() - start < 55_000) {
      // Define data to be stored in the Ignite RDD (cache).
      List<Integer> data = new ArrayList<>(20_000);
      
      for (int i = 0; i < 20_000; i++)
      data.add(i);
      
      // Preparing a Java RDD.
      JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
      
      sharedRDD.saveValues(javaRDD);
      
      totalLoaded += 20_000;
      }
      System.out.println("Loaded " + totalLoaded);
      
      for (;;) {
      
      System.out.println(">>> Iterating over Ignite Shared RDD...");
      
      IgniteCache<Object, Object> cache = server.getOrCreateCache("sharedRDD");
      
      AtomicLong recordsLeft = new AtomicLong(0);
      for (Cache.Entry<Object, Object> entry : cache) {
      recordsLeft.incrementAndGet();
      }
      System.out.println("Left: " + recordsLeft.get());
      
      }
      // Close IgniteContext on all the workers.
      // igniteContext.close(true);
      }
      
      private static IgniteConfiguration createIgniteCfg() {
      
      IgniteConfiguration cfg = new IgniteConfiguration();
      
      cfg.setClientMode(true);
      
      DataStorageConfiguration memCfg = new DataStorageConfiguration()
      .setDefaultDataRegionConfiguration(
      new DataRegionConfiguration()
      .setCheckpointPageBufferSize(16 * 1024 * 1024)
      .setMaxSize(8 * 16 * 1024 * 1024)
      .setPersistenceEnabled(true));
      
      cfg.setDataStorageConfiguration(memCfg);
      
      TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(false);
      finder.setAddresses(Arrays.asList("localhost:47500..47600"));
      
      cfg.setDiscoverySpi( new TcpDiscoverySpi().setIpFinder(finder));
      
      
      CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>("sharedRDD");
      cacheCfg.setAtomicityMode(ATOMIC);
      cacheCfg.setCacheMode(PARTITIONED);
      cacheCfg.setBackups(1);
      cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
      cacheCfg.setEagerTtl(true);
      
      cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
      
      cfg.setCacheConfiguration(cacheCfg);
      
      return cfg;
      }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              zaleslaw Alexey Zinoviev
              Reporter:
              mcherkasov Mikhail Cherkasov
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: