Details
Description
we run some example code use BinaryClassificationEvaluator in MLlib, found that ShuffledRDD[28] at BinaryClassificationMetrics.scala:155 and UnionRDD[36] BinaryClassificationMetrics.scala:90 were used more than once but not cached.
We use spark-2.2.3 and found the code in branch master is still without cache, so we hope to improve it.
The example code is as follow:
import com.microsoft.ml.spark.lightgbm.LightGBMRegressor import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.sql.{DataFrame, SparkSession} object LightGBMRegressorTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("LightGBMRegressorTest") .master("local[*]") .getOrCreate() val startTime = System.currentTimeMillis() var originalData: DataFrame = spark.read.option("header", "true") .option("inferSchema", "true") .csv("data/hour.csv") val labelCol = "workingday" val cateCols = Array("season", "yr", "mnth", "hr") val conCols: Array[String] = Array("temp", "atemp", "hum", "casual", "cnt") val vecCols = conCols ++ cateCols import spark.implicits._ vecCols.foreach(col => { originalData = originalData.withColumn(col, $"$col".cast(DoubleType)) }) originalData = originalData.withColumn(labelCol, $"$labelCol".cast(IntegerType)) val assembler = new VectorAssembler().setInputCols(vecCols).setOutputCol("features") val classifier: LightGBMRegressor = new LightGBMRegressor().setNumIterations(100).setNumLeaves(31) .setBoostFromAverage(false).setFeatureFraction(1.0).setMaxDepth(-1).setMaxBin(255) .setLearningRate(0.1).setMinSumHessianInLeaf(0.001).setLambdaL1(0.0).setLambdaL2(0.0) .setBaggingFraction(0.5).setBaggingFreq(1).setBaggingSeed(1).setObjective("binary") .setLabelCol(labelCol).setCategoricalSlotNames(cateCols).setFeaturesCol("features") .setBoostingType("gbdt") val pipeline: Pipeline = new Pipeline().setStages(Array(assembler, classifier)) val Array(tr, te) = originalData.randomSplit(Array(0.7, .03), 666) val model = pipeline.fit(tr) val modelDF = model.transform(te) val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelCol).setRawPredictionCol("prediction") println(evaluator.evaluate(modelDF)) println(s"time: ${System.currentTimeMillis() - startTime}" ) System.in.read() } }