Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.4.0
-
None
Description
in cases where a micro batch is being killed (interrupted), not during actual processing done by the ForeachDataWriter (when iterating the iterator), DataWritingSparkTask will handle the interruption and call dataWriter.abort()
the problem is that ForeachDataWriter has an empty implementation for the abort method.
due to that, I have tasks which uses the foreach writer and according to the documentation they are opening connections in the "open" method and closing the connections on the "close" method but since the "close" is never called, the connections are never closed
this wasn't the behavior pre spark 2.4
my suggestion is to call ForeachWriter.abort() when DataWriter.abort() is called, in order to notify the foreach writer that this task has failed
stack trace from the exception i have encountered:
org.apache.spark.TaskKilledException: null
at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)