diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 2ebd72a..e9cfd10 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -63,7 +63,8 @@ object ConsumerConfig extends Config {
     autoOffsetReset match {
       case OffsetRequest.SmallestTimeString =>
       case OffsetRequest.LargestTimeString =>
-      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of auto.offset.reset in ConsumerConfig; " +
+                                                 "Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
     }
   }
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b2a7170..d431a44 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -453,6 +453,11 @@ private[kafka] class Log(val dir: File,
   def logEndOffset: Long = nextOffset.get
 
   /**
+   * Get the offset of the first message in the log.
+   */
+  def logStartOffset: Long = segments.view(0).start
+
+  /**
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment): LogSegment = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 37b71be..6abeb62 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -65,19 +65,60 @@ class ReplicaFetcherThread(name:String,
 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
-    // This means the local replica is out of date. Truncate the log and catch up from beginning.
-    val request = OffsetRequest(
+    val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
+    val log = replica.log.get
+
+    /**
+     * The leader's log could be partially overlapping with the follower's log. The only way to get an OffsetOutOfRangeException in such a
+     * situation is when the follower's end offset is ahead of the leader's end offset. This is possible if there is unclean leader election:
+     * A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up and before it has completely
+     * caught up with the leader's logs, the ISR goes down. The follower is now uncleanly elected as the new leader, and it appends messages.
+     * The old leader comes back up, becomes a follower and it may find that the current leader's end offset falls between its own
+     * start offset and its own end offset.
+     *
+     * In such a case, truncate the follower's log to the current leader's end offset and continue fetching.
+     *
+     * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
+     */
+    val leaderEndOffsetRequest = OffsetRequest(
+      replicaId = brokerConfig.brokerId,
+      requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))
+    )
+    val partitionErrorAndEndOffset = simpleConsumer.getOffsetsBefore(leaderEndOffsetRequest).partitionErrorAndOffsets(topicAndPartition)
+    val leaderEndOffset = partitionErrorAndEndOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndEndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndEndOffset.error)
+    }
+
+    if (log.logStartOffset < leaderEndOffset && log.logEndOffset > leaderEndOffset) {
+      log.truncateTo(leaderEndOffset)
+      return leaderEndOffset
+    }
+
+    /**
+     * Otherwise, the leader's log could be completely non-overlapping with the follower's log:
+     * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than or equal to
+     * the leader's start offset because the leader has deleted old logs (log.logEndOffset <= leaderStartOffset). OR
+     * 2. Unclean leader election: A follower could be down for a long time. When it starts up, the ISR goes down before the follower
+     * has the opportunity to even start catching up with the leader's logs. The follower is now uncleanly elected as the new leader.
+     * The old leader comes back up, becomes a follower and it may find that the current leader's end offset is smaller than or
+     * equal to its own start offset (log.logStartOffset >= leaderEndOffset).
+     *
+     * In both these cases, roll out a new log at the follower with the start offset equal to the current leader's start offset
+     * and continue fetching.
+     */
+    val leaderStartOffsetRequest = OffsetRequest(
       replicaId = brokerConfig.brokerId,
       requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
     )
-    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
-    val offset = partitionErrorAndOffset.error match {
-      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
-      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+    val partitionErrorAndStartOffset = simpleConsumer.getOffsetsBefore(leaderStartOffsetRequest).partitionErrorAndOffsets(topicAndPartition)
+    val leaderStartOffset = partitionErrorAndStartOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndStartOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndStartOffset.error)
     }
-    val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
-    replica.log.get.truncateAndStartWithNewOffset(offset)
-    offset
+
+    log.truncateAndStartWithNewOffset(leaderStartOffset)
+    leaderStartOffset
   }
 
   // any logic for partitions whose leader has changed
