Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-11724

IgniteSpark integration forget to close the IgniteContext and stops the client node in case if error during PairFunction logic

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.8
    • 2.8
    • spark

    Description

      Next code could hang in case if PairFunction logic will throw the exception:

      public class Example {
          public static void main(String[] args) {
              String configPath = "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml";
              IgniteSparkSession igniteSession = IgniteSparkSession.builder()
                      .appName("Spark Ignite catalog example")
                      .master("local")
                      .config("ignite.disableSparkSQLOptimization", true)
                      .igniteConfig(configPath)
                      .getOrCreate();
              JavaSparkContext sparkCtx = new JavaSparkContext(igniteSession.sparkContext());
              final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList(
                      new GenericRow()
              ));
              JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new PairFunction<Row, Integer, Integer>() {
                  @Override
                  public Tuple2<Integer, Integer> call(Row row) throws Exception {
                      throw new IllegalStateException("some error");
                  }
              });
              JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<>(sparkCtx, configPath);
              JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, Integer>fromCache("Person");
              igniteRdd.savePairs(rdd_records);
              igniteContext.close(true);
          }
      }
      
      Looks like next internal code (saveValues method)should also close the IgniteContext in case of an unexpected exception, not only data streamer:
      
      try {
           it.foreach(value ⇒
      
      {          val key = affinityKeyFunc(value, node.orNull)           streamer.addData(key, value)        }
      
      )
           }
           finally
      
      {         streamer.close()     }
      
      })
       }
      
      

      Attachments

        1. logs.txt
          52 kB
          Andrei Aleksandrov

        Activity

          People

            zaleslaw Alexey Zinoviev
            aealeksandrov Andrei Aleksandrov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 0.5h
                0.5h