Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24647

Sink Should Return Writen Offsets For ProgressReporting

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Structured Streaming

    Description

      To be able to track data lineage for Structured Streaming (I intend to implement this to Open Source Project Spline), the monitoring needs to be able to not only to track where the data was read from but also where results were written to. This could be to my knowledge best implemented using monitoring StreamingQueryProgress. However currently written data offsets are not available on Sink or StreamWriter interface. Implementing as proposed would also bring symmetry to StreamingQueryProgress fields sources and sink.

       

      Similar Proposals

      Made in following jiras. These would not be sufficient for lineage tracking.

       

      Current State

      • Method Sink#addBatch returns Unit.
      • Object WriterCommitMessage does not carry any progress information about comitted rows.
      • StreamingQueryProgress reports offsetSeq start and end using sourceProgress value but sinkProgress only calls toString method.
        "sources" : [ {
          "description" : "KafkaSource[Subscribe[test-topic]]",
          "startOffset" : null,
          "endOffset" : { "test-topic" : { "0" : 5000 }},
          "numInputRows" : 5000,
          "processedRowsPerSecond" : 645.3278265358803
        } ],
        "sink" : {
          "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f"
        }
      

       

       

      Proposed State

      • Implement support only for v2 sinks as those are to use used in future.
      • WriterCommitMessage to hold optional min and max offset information of commited rows e.g. Kafka does it by returning RecordMetadata object from send method.
      • StreamingQueryProgress incorporate sinkProgress in similar fashion as sourceProgress.

       

       

        "sources" : [ {
          "description" : "KafkaSource[Subscribe[test-topic]]",
          "startOffset" : null,
          "endOffset" : { "test-topic" : { "0" : 5000 }},
          "numInputRows" : 5000,
          "processedRowsPerSecond" : 645.3278265358803
        } ],
        "sink" : {
          "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@9da556f",
         "startOffset" : null,
          "endOffset" { "sinkTopic": { "0": 333 }}
        }
      

       

      Implementation

      • PR submitters: Me and wajda as soon as prerequisite jira is merged.
      • Sinks: Modify all v2 sinks to conform a new interface or return dummy values.
      • ProgressReporter: Merge offsets from different batches properly, similarly to how it is done for sources.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              vackosar Vaclav Kosar
              Votes:
              10 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: