Details
Description
We develop a tool named AutoCache which can detect cache miss cases in spark application. After doing many tests, we detect many cache miss cases in MLLib, could you please condider fix these cases?
- mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
- mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:246
- mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:105
- mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:255
- mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
- mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:188
- mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:189
- mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala/CrossValidator.scala:161
the relevant test programs are:
import com.microsoft.azure.synapse.ml.train.{ComputeModelStatistics, TrainClassifier} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.sql.SparkSession object TrainClassifierTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder .appName("MLtest") .master("local[*]") .getOrCreate() val dataFile = "data/AdultCensusIncome.csv" val dataFrame = spark.read.format("csv") .option("sep", ", ") .option("header", "true") .option("schema", "hours-per-week: float") .load(dataFile) val data = dataFrame.select("education", "marital-status", "hours-per-week", "income") val splits = data.randomSplit(Array(0.75, 0.25), 123) val train = splits(0) val test = splits(1) val startTime = System.currentTimeMillis() val model = new TrainClassifier() .setModel(new LogisticRegression()) .setLabelCol("income") .fit(train) val pred = model.transform(test) new ComputeModelStatistics().transform(pred) println(s"time: ${System.currentTimeMillis() - startTime}") System.in.read() } }
import com.microsoft.azure.synapse.ml.automl.FindBestModel import com.microsoft.azure.synapse.ml.featurize.text.TextFeaturizer import com.microsoft.azure.synapse.ml.train.TrainClassifier import org.apache.spark.ml.Transformer import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.sql.SparkSession object FindBestModelTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder .appName("MLtest") .master("local[*]") .getOrCreate() val data = spark.read.parquet( "data/BookReviewsFromAmazon10K.parquet" ).cache() // data.repartition(1).write.format("parquet").save("data/BookReviewsFromAmazon10K") val textFeaturizer = new TextFeaturizer() .setInputCol("text") .setOutputCol("features") .setUseStopWordsRemover(true) .setUseIDF(true) .setMinDocFreq(5) .setNumFeatures(1 << 16) .fit(data) val processedData = textFeaturizer.transform(data) val processed = processedData .withColumn("label", processedData.col("rating") > 3) .select("features", "label") val splits = processed.randomSplit(Array(0.80, 0.20), seed = 42) val train = splits(0) val test = splits(1) val lrHyperParams = Array(0.05, 0.1) val lrs = lrHyperParams.map(p => new LogisticRegression().setRegParam(p)) val lrmodels: Array[Transformer] = lrs.map(lr => new TrainClassifier().setModel(lr).setLabelCol("label").fit(train)) new FindBestModel() .setModels(lrmodels) .setEvaluationMetric("AUC") .fit(test) System.in.read() } }
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature._ import org.apache.spark.ml.tuning._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel} // this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris) object SparkMLlibPipelineTest { def main(args: Array[String]): Unit = { val inputPath = "data/iris.csv" val nativeModelPath = "data/model" val pipelineModelPath = "data/model" val (treeMethod, numWorkers) = if (args.length == 4 && args(3) == "gpu") { ("gpu_hist", 1) } else ("auto", 2) val spark = SparkSession .builder() .appName("XGBoost4J-Spark Pipeline Example") .master("local[*]") .getOrCreate() // Load dataset val schema = new StructType(Array( StructField("sepal length", DoubleType, true), StructField("sepal width", DoubleType, true), StructField("petal length", DoubleType, true), StructField("petal width", DoubleType, true), StructField("class", StringType, true))) val rawInput = spark.read.schema(schema).option("header", false).csv(inputPath) // Split training and test dataset val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123) // Build ML pipeline, it includes 4 stages: // 1, Assemble all features into a single vector column. // 2, From string label to indexed double label. // 3, Use XGBoostClassifier to train classification model. // 4, Convert indexed double label back to original string label. val assembler = new VectorAssembler() .setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")) .setOutputCol("features") val labelIndexer = new StringIndexer() .setInputCol("class") .setOutputCol("classIndex") .fit(training) val booster = new XGBoostClassifier( Map("eta" -> 0.1f, "max_depth" -> 2, "objective" -> "multi:softprob", "num_class" -> 3, "num_round" -> 100, "num_workers" -> numWorkers, "tree_method" -> treeMethod ) ) booster.setFeaturesCol("features") booster.setLabelCol("classIndex") val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("realLabel") .setLabels(labelIndexer.labels) val pipeline = new Pipeline() .setStages(Array(assembler, labelIndexer, booster, labelConverter)) val model = pipeline.fit(training) // Batch prediction val prediction = model.transform(test) // Model evaluation val evaluator = new MulticlassClassificationEvaluator() evaluator.setLabelCol("classIndex") evaluator.setPredictionCol("prediction") val accuracy = evaluator.evaluate(prediction) println("The model accuracy is : " + accuracy) // Tune model using cross validation val paramGrid = new ParamGridBuilder() .addGrid(booster.maxDepth, Array(3, 8)) .addGrid(booster.eta, Array(0.2, 0.6)) .build() val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(3) val cvModel = cv.fit(training) val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2) .asInstanceOf[XGBoostClassificationModel] println("The params of best XGBoostClassification model : " + bestModel.extractParamMap()) println("The training summary of best XGBoostClassificationModel : " + bestModel.summary) // Export the XGBoostClassificationModel as local XGBoost model, // then you can load it back in local Python environment. bestModel.nativeBooster.saveModel(nativeModelPath) // ML pipeline persistence model.write.overwrite().save(pipelineModelPath) // Load a saved model and serving val model2 = PipelineModel.load(pipelineModelPath) model2.transform(test) System.in.read() } }