Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27330

ForeachWriter is not being closed once a batch is aborted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 2.4.4, 3.0.0
    • Structured Streaming
    • None

    Description

      in cases where a micro batch is being killed (interrupted), not during actual processing done by the ForeachDataWriter (when iterating the iterator), DataWritingSparkTask will handle the interruption and call dataWriter.abort()

      the problem is that ForeachDataWriter has an empty implementation for the abort method.

      due to that, I have tasks which uses the foreach writer and according to the documentation they are opening connections in the "open" method and closing the connections on the "close" method but since the "close" is never called, the connections are never closed

      this wasn't the behavior pre spark 2.4

      my suggestion is to call ForeachWriter.abort() when DataWriter.abort() is called,  in order to notify the foreach writer that this task has failed

       

      stack trace from the exception i have encountered:
       org.apache.spark.TaskKilledException: null
       at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
       at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
       at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
      

       

      Attachments

        Activity

          People

            eyalzit Eyal Zituny
            eyalzit Eyal Zituny
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: