Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49123

Improve the logging behavior when Spark loaded zero result from Kafka after poll timeout

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.0
    • None
    • SQL
    • java version: openjdk version "1.8.0_402"

      python version: 3.10.2

      os: amazon linux 2

       

       

    Description

      When using spark structured streaming to read data from Kafka, the poll request can time out if the producer is writing data to the topic in transaction mode but not commit the transaction for a long time.

       

      When this happened, the `r.isEmpty` as `True` but the offsetAfterPoll changed. This will not thrown any error or warning logs. This makes users have no idea why the related spark task delay for `pollTimeoutMs`. Here is the related code:

       

       

          if (r.isEmpty) {
            // We cannot fetch anything after `poll`. Two possible cases:
            // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
            //   be thrown.
            // - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
            // - Fetched something but all of them are not invisible. This is a valid case and let the
            //   caller handles this.
            if (offset < range.earliest || offset >= range.latest) {
              throw new OffsetOutOfRangeException(
                Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
            } else if (offset == offsetAfterPoll) {
              throw new TimeoutException(
                s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
            }
          }

       

       

      To improve the user experience for troubleshooting when the poll get zero result before timeout but offsetAfterPoll changed, there should add some warning or error level logs:

       

      A sample looks like the following:

       

          if (r.isEmpty) {
            // We cannot fetch anything after `poll`. Two possible cases:
            // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
            //   be thrown.
            // - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
            // - Fetched something but all of them are invisible. This is a valid case and let the
            //   caller handles this. Although this is a valid case, it is necessary to add warning level logs to indicate zero data returned before timeout.
            if (offset < range.earliest || offset >= range.latest) {
              throw new OffsetOutOfRangeException(
                Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
            } else if (offset == offsetAfterPoll) {
              throw new TimeoutException(
                s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
            } else {
              logWarning(s"After taking $pollTimeoutMs milliseconds polling, ZERO record fetched while offset changed from $offset to $offsetAfterPoll.")
            }
      } 

       

       

      By the way, fix the typo from `Fetched something but all of them are not invisible` to `Fetched something but all of them are invisible`.

      Attachments

        Activity

          People

            Unassigned Unassigned
            yangguo Yang Guo
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified