Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24404

Increase currentEpoch when meet a EpochMarker in ContinuousQueuedDataReader.next() in CP mode based on PR #21353 #21332 #21293 and the latest master

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 2.3.0
    • None
    • Structured Streaming
    • None

    Description

      In CP mode, based on PR #21353 #21332 #21293 and the latest master ContinuousQueuedDataReader.next() will be invoked by ContinuousDataSourceRDD.compute to return UnsafeRow. When currentEntry polled from ArrayBlockingQueue is a EpochMarker, ContinuousQueuedDataReader will send `ReportPartitionOffset` message to epochCoordinator with currentEpoch of EpochTracker. The currentEpoch is a ThreadLocal variable, but now no place invoke `incrementCurrentEpoch` to increase currentEpoch in its thread, so `getCurrentEpoch` will return `None` all the time(because currentEpoch is -1). This will cause exception when invoke `None.get`. At the same time, in order to make the `ReportPartitionOffset` have correct semantics, we need increase currentEpoch before send this message

      Attachments

        Activity

          People

            Unassigned Unassigned
            Liangchang Zhu Liangchang Zhu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: