Details
Description
When using spark structured streaming to read data from Kafka, the poll request can time out if the producer is writing data to the topic in transaction mode but not commit the transaction for a long time.
When this happened, the `r.isEmpty` as `True` but the offsetAfterPoll changed. This will not thrown any error or warning logs. This makes users have no idea why the related spark task delay for `pollTimeoutMs`. Here is the related code:
if (r.isEmpty) { // We cannot fetch anything after `poll`. Two possible cases: // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will // be thrown. // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. // - Fetched something but all of them are not invisible. This is a valid case and let the // caller handles this. if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) } else if (offset == offsetAfterPoll) { throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") } }
To improve the user experience for troubleshooting when the poll get zero result before timeout but offsetAfterPoll changed, there should add some warning or error level logs:
A sample looks like the following:
if (r.isEmpty) { // We cannot fetch anything after `poll`. Two possible cases: // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will // be thrown. // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. // - Fetched something but all of them are invisible. This is a valid case and let the // caller handles this. Although this is a valid case, it is necessary to add warning level logs to indicate zero data returned before timeout. if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) } else if (offset == offsetAfterPoll) { throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") } else { logWarning(s"After taking $pollTimeoutMs milliseconds polling, ZERO record fetched while offset changed from $offset to $offsetAfterPoll.") } }
By the way, fix the typo from `Fetched something but all of them are not invisible` to `Fetched something but all of them are invisible`.