Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27330

ForeachWriter is not being closed once a batch is aborted

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0
    • Fix Version/s: 2.4.4, 3.0.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      in cases where a micro batch is being killed (interrupted), not during actual processing done by the ForeachDataWriter (when iterating the iterator), DataWritingSparkTask will handle the interruption and call dataWriter.abort()

      the problem is that ForeachDataWriter has an empty implementation for the abort method.

      due to that, I have tasks which uses the foreach writer and according to the documentation they are opening connections in the "open" method and closing the connections on the "close" method but since the "close" is never called, the connections are never closed

      this wasn't the behavior pre spark 2.4

      my suggestion is to call ForeachWriter.abort() when DataWriter.abort() is called,  in order to notify the foreach writer that this task has failed

       

      stack trace from the exception i have encountered:
       org.apache.spark.TaskKilledException: null
       at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
       at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
       at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
       at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
       at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
      

       

        Attachments

          Activity

            People

            • Assignee:
              eyalzit Eyal Zituny
              Reporter:
              eyalzit Eyal Zituny
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: