diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2a096e1..18d6b7e 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -111,15 +111,17 @@ abstract class DelayedOperation(override val delayMs: Long, * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until * the operation is actually completed. */ - private[server] def maybeTryComplete(): Boolean = { + private[server] def maybeTryComplete(): (Boolean, Boolean) = { var retry = false var done = false + var lockFailed = true do { if (lock.tryLock()) { try { tryCompletePending.set(false) done = tryComplete() } finally { + lockFailed = false lock.unlock() } // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set @@ -133,7 +135,7 @@ abstract class DelayedOperation(override val delayMs: Long, retry = !tryCompletePending.getAndSet(true) } } while (!isCompleted && retry) - done + (lockFailed, done) } /* @@ -180,8 +182,12 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri /* background thread expiring operations that have timed out */ private val expirationReaper = new ExpiredOperationReaper() + private val retryExctutor = new RetryOperationExecutor() + private val metricsTags = Map("delayedOperation" -> purgatoryName) + private val retryQueue = new ConcurrentLinkedQueue[T]() + newGauge( "PurgatorySize", new Gauge[Int] { @@ -201,6 +207,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri if (reaperEnabled) expirationReaper.start() + retryExctutor.start() + /** * Check if the operation can be completed, if not watch it based on the given watch keys * @@ -229,7 +237,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri // At this point the only thread that can attempt this operation is this current thread // Hence it is safe to tryComplete() without a lock - var isCompletedByMe = operation.tryComplete() + val isCompletedByMe = operation.tryComplete() if (isCompletedByMe) return true @@ -246,9 +254,10 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri } } - isCompletedByMe = operation.maybeTryComplete() - if (isCompletedByMe) - return true + operation.maybeTryComplete() match { + case (_, true) => return true + case _ => + } // if it cannot be completed by now and hence is watched, add to the expire queue also if (!operation.isCompleted) { @@ -339,6 +348,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def shutdown() { if (reaperEnabled) expirationReaper.shutdown() + retryExctutor.shutdown() timeoutTimer.shutdown() } @@ -368,9 +378,14 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri if (curr.isCompleted) { // another thread has completed this operation, just remove it iter.remove() - } else if (curr.maybeTryComplete()) { - iter.remove() - completed += 1 + } else { + val (lockFailed, completedByMe) = curr.maybeTryComplete() + if (completedByMe) { + iter.remove() + completed += 1 + } else if (lockFailed) { + retryQueue.add(curr) + } } } @@ -440,4 +455,39 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri advanceClock(200L) } } + + private class RetryOperationExecutor extends ShutdownableThread( + "RetryExecutor-%d-%s".format(brokerId, purgatoryName), + isInterruptible = false) { + + override def doWork(): Unit = { + if (retryQueue.isEmpty()) { + Thread.sleep(100) + return + } + + var removed = 0 + var completed = 0 + var total = 0 + val nextRetry = new ConcurrentLinkedQueue[T]() + + debug("Start Retrying Operations") + while (retryQueue.peek != null) { + total += 1 + val curr = retryQueue.poll() + if (curr.isCompleted) { + // another thread has completed this operation, just remove it + removed += 1 + } else { + curr.maybeTryComplete() match { + case (_, true) => completed += 1 + case _ => nextRetry.add(curr) + } + } + } + debug("Checked %d, completed %d, removed %d".format(total, completed, removed)) + retryQueue.addAll(nextRetry) + Thread.sleep(100) + } + } }