Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19846

Add a flag to disable constraint propagation

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0
    • SQL
    • None

    Description

      Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.

      Compared with other attempts to modify how constraints propagation works, this is a much simpler option: add a flag to disable constraint propagation.

          import org.apache.spark.ml.{Pipeline, PipelineStage}
          import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
      
          spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)
      
          val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
      
          val indexers = df.columns.tail.map(c => new StringIndexer()
            .setInputCol(c)
            .setOutputCol(s"${c}_indexed")
            .setHandleInvalid("skip"))
      
          val encoders = indexers.map(indexer => new OneHotEncoder()
            .setInputCol(indexer.getOutputCol)
            .setOutputCol(s"${indexer.getOutputCol}_encoded")
            .setDropLast(true))
      
          val stages: Array[PipelineStage] = indexers ++ encoders
          val pipeline = new Pipeline().setStages(stages)
      
          val startTime = System.nanoTime
          pipeline.fit(df).transform(df).show
          val runningTime = System.nanoTime - startTime
      

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              viirya L. C. Hsieh
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: