As a Spark user I'd like to have fast queries on Kafka table restricted by timestamp.
I'd like to have quick answers on questions like:
- What was inserted to Kafka in past x minutes
- What was inserted to Kafka in specified time range
select * from kafka_table where timestamp > from_unixtime(unix_timestamp() - 5 * 60, "YYYY-MM-dd HH:mm:ss")
select * from kafka_table where timestamp > $from_time and timestamp < $end_time
Currently timestamp restrictions are not pushdown to KafkaRelation and querying by timestamp on a large Kafka topic takes forever to complete.
Technically its possible to retrieve Kafka's offsets by provided timestamp with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. Afterwards we can query Kafka topic by retrieved timestamp ranges.
Querying by timestamp range is already implemented so this change should have minor impact.