diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8cd489e..f6f18fe 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -60,7 +60,7 @@ class Log(val dir: File, private val lastflushedTime = new AtomicLong(time.milliseconds) /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[Long,LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] + private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() /* The number of times the log has been truncated */ @@ -170,7 +170,7 @@ class Log(val dir: File, this.recoveryPoint = lastOffset return } - val unflushed = logSegments(segments.floorKey(this.recoveryPoint), Long.MaxValue).iterator + val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) @@ -586,8 +586,8 @@ class Log(val dir: File, * All the log segments in this log ordered from oldest to newest */ def logSegments: Iterable[LogSegment] = { - import JavaConversions._ - segments.values + import JavaConversions.asIterable + asIterable(segments.values) } /** @@ -595,13 +595,13 @@ class Log(val dir: File, * that includes up to "to-1" or the end of the log (if to > logEndOffset) */ def logSegments(from: Long, to: Long): Iterable[LogSegment] = { - import JavaConversions._ + import JavaConversions.asIterable lock synchronized { - val floor: java.lang.Long = segments.floorKey(from) + val floor = segments.floorKey(from) if(floor eq null) asIterable(segments.headMap(to).values) else - asIterable(segments.subMap(floor.longValue, true, to, false).values) + asIterable(segments.subMap(floor, true, to, false).values) } }