Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23682

Memory issue with Spark structured streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • 2.4.0
    • SQL, Structured Streaming

    Description

      It seems like there is an issue with memory in structured streaming. A stream with aggregation (dropDuplicates()) and data partitioning constantly increases memory usage and finally executors fails with exit code 137:

      ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
      Container exited with a non-zero exit code 137
      Killed by external signal

      Stream creating looks something like this:

      session
      .readStream()
      .schema(inputSchema)
      .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
      .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
      .csv("s3://test-bucket/input")
      .as(Encoders.bean(TestRecord.class))
      .flatMap(mf, Encoders.bean(TestRecord.class))
      .dropDuplicates("testId", "testName")
      .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
      .writeStream()
      .option("path", "s3://test-bucket/output")
      .option("checkpointLocation", "s3://test-bucket/checkpoint")
      .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
      .partitionBy("year")
      .format("parquet")
      .outputMode(OutputMode.Append())
      .queryName("test-stream")
      .start();

      Analyzing the heap dump I found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore 

      On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark again. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it's not true. Moreover, GC time took more than 30% of total processing time.

      Attachments

        1. Spark executors GC time.png
          169 kB
          Yuriy Bondaruk
        2. Screen Shot 2018-03-28 at 16.44.20.png
          504 kB
          Andrew Korzhuev
        3. Screen Shot 2018-03-28 at 16.44.20.png
          464 kB
          Andrew Korzhuev
        4. Screen Shot 2018-03-28 at 16.44.20.png
          332 kB
          Andrew Korzhuev
        5. Screen Shot 2018-03-10 at 18.53.49.png
          458 kB
          Yuriy Bondaruk
        6. Screen Shot 2018-03-07 at 21.52.17.png
          1.02 MB
          Yuriy Bondaruk
        7. screen_shot_2018-03-20_at_15.23.29.png
          133 kB
          Andrew Korzhuev
        8. image-2018-03-22-14-46-31-960.png
          13 kB
          Cristian Cifuentes

        Issue Links

          Activity

            People

              Unassigned Unassigned
              bondyk Yuriy Bondaruk
              Votes:
              6 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: