Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
When attempting to pull records from Kafka, it fails to fetch message buffer for some partitions of the Kafka topic and hence extracts 0 records (as in the log snippet below). However, it successfully fetches records from the other partitions in the same job. The exception thrown for the failed attempt is - java.lang.RuntimeException: error code 1
```
_2016-03-09 18:22:39,776 INFO [main] gobblin.runtime.mapreduce.MRJobLauncher: 1 out of 1 tasks of job job_GobblinKafka_1457576535185 are running in container attempt_1457496256361_12490_m_000001_0
2016-03-09 18:22:39,963 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Pulling topic KafkaTopic
2016-03-09 18:22:39,963 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Pulling partition KafkaTopic:15 from offset 1803914315 to 1832644466, range=28730151
2016-03-09 18:22:40,547 WARN [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaWrapper: Fetch message buffer for partition KafkaTopic:15 has failed: java.lang.RuntimeException: error code 1. Will refresh topic metadata and retry
2016-03-09 18:22:40,547 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaWrapper: Fetching topic metadata from broker kafka_broker_name:9092
2016-03-09 18:22:40,639 WARN [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaWrapper: Fetch message buffer for partition KafkaTopic:15 has failed: java.lang.RuntimeException: error code 1. This partition will be skipped.
2016-03-09 18:22:40,639 INFO [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor: Finished pulling topic KafkaTopic
2016-03-09 18:22:40,640 INFO [TaskExecutor-0] gobblin.runtime.Task: Extracted 0 data records_
```
Github Url : https://github.com/linkedin/gobblin/issues/823
Github Reporter : asrayousuf
Github Created At : 2016-03-10T05:12:39Z
Github Updated At : 2017-01-12T04:47:32Z
Comments
zliu41 wrote on 2016-03-11T01:02:11Z : Error code 1 means offset is out of range, i.e. 1803914315 is not a valid offset for partition 15. My guess is 1803914315 was the earliest available offset for that partition when it asked for the earliest offset, but at the time it was fetching the message buffer, that offset is no longer there.
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-195123622
asrayousuf wrote on 2016-03-14T10:09:17Z : @zliu41 Thanks for pointing this out. This does seem to be the case as I ran a separate kafka consumer to fetch the earliest and the latest offset, and the ones returned for some of the partitions were completely different from the ones that gobblin is fetching at the same time instant. For these partitions, the earliest offset fetched by gobblin no longer exists. Hence, the data for these particular partitions are not getting extracted resulting in no data getting published.
Why is the wrong offset getting fetched by gobblin? How can I ensure that the complete data for all the partitions get pulled?
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-196236739
zliu41 wrote on 2016-03-15T00:15:39Z : @asrayousuf I have no idea. If Gobblin and your separate kafka consumer fetch the earliest offset at the same time, they should get the same value.
The code for fetching earliest offsets in Gobblin is in method `getEarliestOffset` in `KafkaWrapper`, which is based on Kafka's SimpleConsumer. You may want to do some testing on that piece of code.
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-196580940
stakiar wrote on 2016-03-28T16:05:21Z : @asrayousuf did you ever figure out what the problem was here?
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-202461181
asrayousuf wrote on 2016-03-29T13:12:52Z : The problem is along the lines that @zliu41 pointed out. The refresh interval of my kafka is 1 sec so fetching the data for the earliest offset is failing for some partitions as the offset no longer exists when the fetch happens. I haven't yet found a solution to the scenario when I have to run it from the earliest offset.
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-202886079
stakiar wrote on 2016-04-26T22:05:26Z : The fetching of the offsets happens in the `Source` correct? Which means there is a delay between when the offset is fetched and when the `Extractor` starts to read the data. When running Gobblin in MR mode there may be long delays between between when the `Source` runs and map tasks actually get launched, especially on busy clusters.
Could we simply add a feature to re-fetch the offset if it is stale, and fetch.from.earliest.offset=true?
Github Url : https://github.com/linkedin/gobblin/issues/823#issuecomment-214902180