Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
None
-
None
Description
Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results.
The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation.
I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine.
I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well.
I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them.
After SPARK-17829 is complete and offsets have a .json method, an api for this ticket might look like
trait Sink {
def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit
where start and end were provided by StreamExecution.runBatch using committedOffsets and availableOffsets.
I'm not 100% certain that the offsets in the seq could always be mapped back to the correct source when restarting complicated multi-source jobs, but I think it'd be sufficient. Passing the string/json representation of the seq instead of the seq itself would probably be sufficient as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic to parse, and the constant defining that is private.
Attachments
Issue Links
- relates to
-
SPARK-24647 Sink Should Return Writen Offsets For ProgressReporting
- Resolved
- links to