Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
We need to submit another PR against Spark to call the task failure callbacks before Spark calls the close function on various output streams.
For example, we need to intercept an exception and call TaskContext.markTaskFailed before calling close in the following code (in PairRDDFunctions.scala):
Utils.tryWithSafeFinally { while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } } { writer.close() }
Changes to Spark should include unit tests to make sure this always work in the future.