Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18258

Sinks need access to offset representation

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • None
    • None
    • Structured Streaming

    Description

      Transactional "exactly-once" semantics for output require storing an offset identifier in the same transaction as results.

      The Sink.addBatch method currently only has access to batchId and data, not the actual offset representation.

      I want to store the actual offsets, so that they are recoverable as long as the results are and I'm not locked in to a particular streaming engine.

      I could see this being accomplished by adding parameters to Sink.addBatch for the starting and ending offsets (either the offsets themselves, or the SPARK-17829 string/json representation). That would be an API change, but if there's another way to map batch ids to offset representations without changing the Sink api that would work as well.

      I'm assuming we don't need the same level of access to offsets throughout a job as e.g. the Kafka dstream gives, because Sinks are the main place that should need them.

      After SPARK-17829 is complete and offsets have a .json method, an api for this ticket might look like

      trait Sink {
        def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: OffsetSeq): Unit
      

      where start and end were provided by StreamExecution.runBatch using committedOffsets and availableOffsets.

      I'm not 100% certain that the offsets in the seq could always be mapped back to the correct source when restarting complicated multi-source jobs, but I think it'd be sufficient. Passing the string/json representation of the seq instead of the seq itself would probably be sufficient as well, but the convention of rendering a None as "-" in the json is maybe a little idiosyncratic to parse, and the constant defining that is private.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              koeninger Cody Koeninger
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: