Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16320

Document G1 heap region's effect on spark 2.0 vs 1.6

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • Documentation, SQL
    • None

    Description

      I did some test on parquet file with many nested columns (about 30G in
      400 partitions) and Spark 2.0 is sometimes slower.

      I tested following queries:
      1)

      select count(*) where id > some_id

      In this query performance is similar. (about 1 sec)

      2)

      select count(*) where nested_column.id > some_id

      Spark 1.6 -> 1.6 min
      Spark 2.0 -> 2.1 min

      Should I expect such a drop in performance ?

      I don't know how to prepare sample data to show the problem.
      Any ideas ? Or public data with many nested columns ?

      UPDATE
      I created script to generate data and to confirm this problem.

      #Initialization
      from pyspark import SparkContext, SparkConf
      from pyspark.sql import HiveContext
      from pyspark.sql.functions import struct
      conf = SparkConf()
      conf.set('spark.cores.max', 15)
      conf.set('spark.executor.memory', '30g')
      conf.set('spark.driver.memory', '30g')
      sc = SparkContext(conf=conf)
      sqlctx = HiveContext(sc)
      
      #Data creation
      MAX_SIZE = 2**32 - 1
      
      path = '/mnt/mfs/parquet_nested'
      
      def create_sample_data(levels, rows, path):
          
          def _create_column_data(cols):
              import random
              random.seed()
              return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in range(cols)}
          
          def _create_sample_df(cols, rows):
              rdd = sc.parallelize(range(rows))           
              data = rdd.map(lambda r: _create_column_data(cols))
              df = sqlctx.createDataFrame(data)
              return df
          
          def _create_nested_data(levels, rows):
              if len(levels) == 1:
                  return _create_sample_df(levels[0], rows).cache()
              else:
                  df = _create_nested_data(levels[1:], rows)
                  return df.select([struct(df.columns).alias("column{}".format(i)) for i in range(levels[0])])
          df = _create_nested_data(levels, rows)
          df.write.mode('overwrite').parquet(path)
          
      #Sample data
      create_sample_data([2,10,200], 1000000, path)
      
      #Query
      df = sqlctx.read.parquet(path)
      
      %%timeit
      df.where("column1.column5.column50 > {}".format(int(MAX_SIZE / 2))).count()
      

      Results
      Spark 1.6
      1 loop, best of 3: 1min 5s per loop
      Spark 2.0
      1 loop, best of 3: 1min 21s per loop

      UPDATE 2
      Analysis in https://issues.apache.org/jira/browse/SPARK-16321 direct to same source.
      I attached some VisualVM profiles there.
      Most interesting are from queries.
      https://issues.apache.org/jira/secure/attachment/12818785/spark16_query.nps
      https://issues.apache.org/jira/secure/attachment/12818784/spark2_query.nps

      Attachments

        1. spark1.6-ui.png
          100 kB
          Maciej Bryński
        2. spark2-ui.png
          103 kB
          Maciej Bryński

        Issue Links

          Activity

            People

              srowen Sean R. Owen
              maver1ck Maciej Bryński
              Votes:
              1 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: