Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-823

spark.default.parallelism's default is inconsistent across scheduler backends

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7.3, 0.8.0, 0.9.1
    • Fix Version/s: None
    • Labels:
      None

      Description

      The 0.7.3 configuration guide says that spark.default.parallelism's default is 8, but the default is actually max(totalCoreCount, 2) for the standalone scheduler backend, 8 for the Mesos scheduler, and threads for the local scheduler:

      https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L157
      https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala#L317
      https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala#L150

      Should this be clarified in the documentation? Should the Mesos scheduler backend's default be revised?

        Activity

        Hide
        dcarroll@cloudera.com Diana Carroll added a comment -

        Yes, please clarify the documentation, I just ran into this. the Configuration guide (http://spark.apache.org/docs/latest/configuration.html) says the default is 8.

        In testing this on Standalone Spark, there actually is no default value for the variable:
        >sc.getConf.contains("spark.default.parallelism")
        >res1: Boolean = false

        It looks like if the variable is not set, then the default behavior is decided in code, e.g. Partitioner.scala:

            if (rdd.context.conf.contains("spark.default.parallelism")) {
              new HashPartitioner(rdd.context.defaultParallelism)
            } else {
              new HashPartitioner(bySize.head.partitions.size)
            }
        
        Show
        dcarroll@cloudera.com Diana Carroll added a comment - Yes, please clarify the documentation, I just ran into this. the Configuration guide ( http://spark.apache.org/docs/latest/configuration.html ) says the default is 8. In testing this on Standalone Spark, there actually is no default value for the variable: >sc.getConf.contains("spark.default.parallelism") >res1: Boolean = false It looks like if the variable is not set, then the default behavior is decided in code, e.g. Partitioner.scala: if (rdd.context.conf.contains( "spark. default .parallelism" )) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) }
        Hide
        dcarroll@cloudera.com Diana Carroll added a comment -

        Okay, this is definitely more than a documentation bug, because PySpark and Scala work differently if spark.default.parallelism isn't set by the user. I'm testing using wordcount.

        Pyspark: reduceByKey will use the value of sc.defaultParallelism. That value is set to the number of threads when running locally. On my Spark Standalone "cluster" which has a single node with a single core, the value is 2. If I set spark.default.parallelism, it will set sc.defaultParallelism to that value and use that.

        Scala: reduceByKey will use the number of partitions in my file/map stage and ignore the value of sc.defaultParallelism. sc.defaultParallism is set by the same logic as pyspark (number of threads for local, 2 for my microcluster), it is just ignored.

        I'm not sure which approach is correct. Scala works as described here: http://spark.apache.org/docs/latest/tuning.html

        Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

        Show
        dcarroll@cloudera.com Diana Carroll added a comment - Okay, this is definitely more than a documentation bug, because PySpark and Scala work differently if spark.default.parallelism isn't set by the user. I'm testing using wordcount. Pyspark: reduceByKey will use the value of sc.defaultParallelism. That value is set to the number of threads when running locally. On my Spark Standalone "cluster" which has a single node with a single core, the value is 2. If I set spark.default.parallelism, it will set sc.defaultParallelism to that value and use that. Scala: reduceByKey will use the number of partitions in my file/map stage and ignore the value of sc.defaultParallelism. sc.defaultParallism is set by the same logic as pyspark (number of threads for local, 2 for my microcluster), it is just ignored. I'm not sure which approach is correct. Scala works as described here: http://spark.apache.org/docs/latest/tuning.html Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.
        Hide
        ilganeli Ilya Ganelin added a comment -

        Hi Josh Rosen I believe the documentation is up to date and I reviewed all usages of spark.default.parallelism and found no inconsistencies with the documentation. The only thing that is un-documented with regards to the usage of spark.default.parallelism is how it's used within the Partitioner class in both Spark and Python. If defined, the default number of partitions created is equal to spark.default.parallelism - otherwise, it's the local number of partitions. I think this issue can be closed - I don't think that particular case needs to be publicly documented (it's clearly evident in the code what is going on).

        Show
        ilganeli Ilya Ganelin added a comment - Hi Josh Rosen I believe the documentation is up to date and I reviewed all usages of spark.default.parallelism and found no inconsistencies with the documentation. The only thing that is un-documented with regards to the usage of spark.default.parallelism is how it's used within the Partitioner class in both Spark and Python. If defined, the default number of partitions created is equal to spark.default.parallelism - otherwise, it's the local number of partitions. I think this issue can be closed - I don't think that particular case needs to be publicly documented (it's clearly evident in the code what is going on).

          People

          • Assignee:
            ilganeli Ilya Ganelin
            Reporter:
            joshrosen Josh Rosen
          • Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development