During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException reported by Kafka client. In our scenario we create single DStream as a union of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). Both Kafka clusters have the same topics and number of partitions.
After quick investigation, I found that class DirectKafkaInputDStream keeps offset state for topic and partitions, but it is not aware of different Kafka clusters.
For every topic, single DStream is created as a union from all configured Kafka clusters.
At the end, offsets from one Kafka cluster overwrite offsets from second one. Fortunately OffsetOutOfRangeException was thrown because offsets in both Kafka clusters are significantly different.