From 09747004599347fb9dc9962c92fcb1c3858d02f9 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 19 Feb 2015 09:38:37 -0800 Subject: [PATCH 1/2] leaner DelayedItem --- core/src/main/scala/kafka/utils/DelayedItem.scala | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index a4e0dab..56dff76 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -20,33 +20,24 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging { +class DelayedItem(delay: Long) extends Delayed with Logging { - val createdMs = SystemTime.milliseconds - val delayMs = { - val given = unit.toMillis(delay) - if (given < 0 || (createdMs + given) < 0) (Long.MaxValue - createdMs) - else given - } + private val due = SystemTime.milliseconds + delay - def this(delayMs: Long) = - this(delayMs, TimeUnit.MILLISECONDS) + def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) /** * The remaining delay time */ def getDelay(unit: TimeUnit): Long = { - val elapsedMs = (SystemTime.milliseconds - createdMs) - unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS) + unit.convert(max(due - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { - val delayed = d.asInstanceOf[DelayedItem] - val myEnd = createdMs + delayMs - val yourEnd = delayed.createdMs + delayed.delayMs + val other = d.asInstanceOf[DelayedItem] - if(myEnd < yourEnd) -1 - else if(myEnd > yourEnd) 1 + if(due < other.due) -1 + else if(due > other.due) 1 else 0 } -- 2.3.0 From 217e2e5ced372bdc1bf9fa1320c8b8e0c0d56a05 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 20 Feb 2015 15:02:36 -0800 Subject: [PATCH 2/2] renamed variables --- core/src/main/scala/kafka/utils/DelayedItem.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index 56dff76..cbab2a0 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -20,9 +20,9 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem(delay: Long) extends Delayed with Logging { +class DelayedItem(delayMs: Long) extends Delayed with Logging { - private val due = SystemTime.milliseconds + delay + private val dueMs = SystemTime.milliseconds + delayMs def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) @@ -30,15 +30,15 @@ class DelayedItem(delay: Long) extends Delayed with Logging { * The remaining delay time */ def getDelay(unit: TimeUnit): Long = { - unit.convert(max(due - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) } - + def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[DelayedItem] - if(due < other.due) -1 - else if(due > other.due) 1 + if(dueMs < other.dueMs) -1 + else if(dueMs > other.dueMs) 1 else 0 } - + } -- 2.3.0