Description
PeriodicRDDCheckpointer will automatically persist the last 3 datasets called by PeriodicRDDCheckpointer.update.
In GBTs, the last 3 intermediate rdds are still cached after fit()
scala> val dataset = spark.read.format("libsvm").load("./data/mllib/sample_kmeans_data.txt") dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector] scala> dataset.persist() res0: dataset.type = [label: double, features: vector] scala> dataset.count res1: Long = 6 scala> sc.getPersistentRDDs res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> *FileScan libsvm [label#0,features#1] Batched: false, Format: LibSVM, Location: InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>> MapPartitionsRDD[8] at persist at <console>:26) scala> import org.apache.spark.ml.regression._ import org.apache.spark.ml.regression._ scala> val model = gbt.fit(dataset) <console>:28: error: not found: value gbt val model = gbt.fit(dataset) ^ scala> val gbt = new GBTRegressor() gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_da1fe371a25e scala> val model = gbt.fit(dataset) 17/09/20 14:05:33 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 6 (= number of training instances) model: org.apache.spark.ml.regression.GBTRegressionModel = GBTRegressionModel (uid=gbtr_da1fe371a25e) with 20 trees scala> sc.getPersistentRDDs res3: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(322 -> MapPartitionsRDD[322] at mapPartitions at GradientBoostedTrees.scala:134, 307 -> MapPartitionsRDD[307] at mapPartitions at GradientBoostedTrees.scala:134, 8 -> *FileScan libsvm [label#0,features#1] Batched: false, Format: LibSVM, Location: InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>> MapPartitionsRDD[8] at persist at <console>:26, 292 -> MapPartitionsRDD[292] at mapPartitions at GradientBoostedTrees.scala:134) scala> sc.getPersistentRDDs.size res4: Int = 4