Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40249

Some cache miss cases in MLLib

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.2.0
    • None
    • MLlib
    • None
    • Spark core/sql/mllib 3.2.0

      xgboost4j-spark_2.12 1.6.2

      synapseml_2.12 0.10.0

    Description

      We develop a tool named AutoCache which can detect cache miss cases in spark application. After doing many tests, we detect many cache miss cases in MLLib, could you please condider fix these cases?

      •  mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
      • mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:246
      • mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:105
      • mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:255
      • mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
      • mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:188
      • mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:189
      • mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala/CrossValidator.scala:161

      the relevant test programs are:

      import com.microsoft.azure.synapse.ml.train.{ComputeModelStatistics, TrainClassifier}
      import org.apache.spark.ml.classification.LogisticRegression
      import org.apache.spark.sql.SparkSession
      
      
      object TrainClassifierTest {
      
        def main(args: Array[String]): Unit = {
      
          val spark: SparkSession = SparkSession.builder
            .appName("MLtest")
            .master("local[*]")
            .getOrCreate()
      
          val dataFile = "data/AdultCensusIncome.csv"
      
          val dataFrame = spark.read.format("csv")
            .option("sep", ", ")
            .option("header", "true")
            .option("schema", "hours-per-week: float")
            .load(dataFile)
      
          val data = dataFrame.select("education", "marital-status", "hours-per-week", "income")
          val splits = data.randomSplit(Array(0.75, 0.25), 123)
          val train = splits(0)
          val test = splits(1)
      
          val startTime = System.currentTimeMillis()
      
          val model = new TrainClassifier()
            .setModel(new LogisticRegression())
            .setLabelCol("income")
            .fit(train)
      
          val pred = model.transform(test)
          new ComputeModelStatistics().transform(pred)
      
      
          println(s"time: ${System.currentTimeMillis() - startTime}")
          System.in.read()
        }
      }
      
      import com.microsoft.azure.synapse.ml.automl.FindBestModel
      import com.microsoft.azure.synapse.ml.featurize.text.TextFeaturizer
      import com.microsoft.azure.synapse.ml.train.TrainClassifier
      import org.apache.spark.ml.Transformer
      import org.apache.spark.ml.classification.LogisticRegression
      import org.apache.spark.sql.SparkSession
      
      object FindBestModelTest {
        def main(args: Array[String]): Unit = {
          val spark: SparkSession = SparkSession.builder
            .appName("MLtest")
            .master("local[*]")
            .getOrCreate()
      
          val data = spark.read.parquet(
            "data/BookReviewsFromAmazon10K.parquet"
          ).cache()
          //    data.repartition(1).write.format("parquet").save("data/BookReviewsFromAmazon10K")
          val textFeaturizer = new TextFeaturizer()
            .setInputCol("text")
            .setOutputCol("features")
            .setUseStopWordsRemover(true)
            .setUseIDF(true)
            .setMinDocFreq(5)
            .setNumFeatures(1 << 16)
            .fit(data)
      
          val processedData = textFeaturizer.transform(data)
          val processed = processedData
            .withColumn("label", processedData.col("rating") > 3)
            .select("features", "label")
      
          val splits = processed.randomSplit(Array(0.80, 0.20), seed = 42)
      
          val train = splits(0)
          val test = splits(1)
      
          val lrHyperParams = Array(0.05, 0.1)
      
          val lrs = lrHyperParams.map(p => new LogisticRegression().setRegParam(p))
      
          val lrmodels: Array[Transformer] = lrs.map(lr => new TrainClassifier().setModel(lr).setLabelCol("label").fit(train))
      
          new FindBestModel()
            .setModels(lrmodels)
            .setEvaluationMetric("AUC")
            .fit(test)
      
          System.in.read()
        }
      }
      
      import org.apache.spark.ml.{Pipeline, PipelineModel}
      import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
      import org.apache.spark.ml.feature._
      import org.apache.spark.ml.tuning._
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types._
      
      import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
      
      // this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris)
      
      object SparkMLlibPipelineTest {
      
        def main(args: Array[String]): Unit = {
      
          val inputPath = "data/iris.csv"
          val nativeModelPath = "data/model"
          val pipelineModelPath = "data/model"
      
          val (treeMethod, numWorkers) = if (args.length == 4 && args(3) == "gpu") {
            ("gpu_hist", 1)
          } else ("auto", 2)
      
          val spark = SparkSession
            .builder()
            .appName("XGBoost4J-Spark Pipeline Example")
            .master("local[*]")
            .getOrCreate()
      
          // Load dataset
          val schema = new StructType(Array(
            StructField("sepal length", DoubleType, true),
            StructField("sepal width", DoubleType, true),
            StructField("petal length", DoubleType, true),
            StructField("petal width", DoubleType, true),
            StructField("class", StringType, true)))
      
          val rawInput = spark.read.schema(schema).option("header", false).csv(inputPath)
      
          // Split training and test dataset
          val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)
      
          // Build ML pipeline, it includes 4 stages:
          // 1, Assemble all features into a single vector column.
          // 2, From string label to indexed double label.
          // 3, Use XGBoostClassifier to train classification model.
          // 4, Convert indexed double label back to original string label.
          val assembler = new VectorAssembler()
            .setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
            .setOutputCol("features")
          val labelIndexer = new StringIndexer()
            .setInputCol("class")
            .setOutputCol("classIndex")
            .fit(training)
          val booster = new XGBoostClassifier(
            Map("eta" -> 0.1f,
              "max_depth" -> 2,
              "objective" -> "multi:softprob",
              "num_class" -> 3,
              "num_round" -> 100,
              "num_workers" -> numWorkers,
              "tree_method" -> treeMethod
            )
          )
          booster.setFeaturesCol("features")
          booster.setLabelCol("classIndex")
          val labelConverter = new IndexToString()
            .setInputCol("prediction")
            .setOutputCol("realLabel")
            .setLabels(labelIndexer.labels)
      
          val pipeline = new Pipeline()
            .setStages(Array(assembler, labelIndexer, booster, labelConverter))
          val model = pipeline.fit(training)
      
          // Batch prediction
          val prediction = model.transform(test)
      
          // Model evaluation
          val evaluator = new MulticlassClassificationEvaluator()
          evaluator.setLabelCol("classIndex")
          evaluator.setPredictionCol("prediction")
          val accuracy = evaluator.evaluate(prediction)
          println("The model accuracy is : " + accuracy)
      
          // Tune model using cross validation
          val paramGrid = new ParamGridBuilder()
            .addGrid(booster.maxDepth, Array(3, 8))
            .addGrid(booster.eta, Array(0.2, 0.6))
            .build()
          val cv = new CrossValidator()
            .setEstimator(pipeline)
            .setEvaluator(evaluator)
            .setEstimatorParamMaps(paramGrid)
            .setNumFolds(3)
      
          val cvModel = cv.fit(training)
      
          val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
            .asInstanceOf[XGBoostClassificationModel]
          println("The params of best XGBoostClassification model : " +
            bestModel.extractParamMap())
          println("The training summary of best XGBoostClassificationModel : " +
            bestModel.summary)
      
          // Export the XGBoostClassificationModel as local XGBoost model,
          // then you can load it back in local Python environment.
          bestModel.nativeBooster.saveModel(nativeModelPath)
      
          // ML pipeline persistence
          model.write.overwrite().save(pipelineModelPath)
      
          // Load a saved model and serving
          val model2 = PipelineModel.load(pipelineModelPath)
          model2.transform(test)
      
          System.in.read()
        }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            waruto210 Mingchao Wu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: