From 227e1c80b5f155e520aa1d92ca76b6ce3d31875b Mon Sep 17 00:00:00 2001 From: pnahas Date: Mon, 5 Apr 2021 19:11:43 -0400 Subject: [PATCH] Backoff between directory lock attempts --- .../processor/internals/StreamThread.java | 3 ++- .../streams/processor/internals/TaskManager.java | 16 ++++++++++++++-- .../processor/internals/StreamThreadTest.java | 3 ++- .../processor/internals/TaskManagerTest.java | 3 ++- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 2ca41840d..1e110dd84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -345,7 +345,8 @@ public class StreamThread extends Thread { builder, adminClient, stateDirectory, - StreamThread.processingMode(config) + StreamThread.processingMode(config), + config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG) ); log.info("Creating consumer client"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 2001c9157..48c0234f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; @@ -90,6 +91,7 @@ public class TaskManager { // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance private final Set lockedTaskDirectories = new HashSet<>(); private java.util.function.Consumer> resetter; + private final long lockExceptionRetryBackoffMs; TaskManager(final ChangelogReader changelogReader, final UUID processId, @@ -99,7 +101,8 @@ public class TaskManager { final InternalTopologyBuilder builder, final Admin adminClient, final StateDirectory stateDirectory, - final StreamThread.ProcessingMode processingMode) { + final StreamThread.ProcessingMode processingMode, + final long lockExceptionRetryBackoffMs) { this.changelogReader = changelogReader; this.processId = processId; this.logPrefix = logPrefix; @@ -109,6 +112,7 @@ public class TaskManager { this.adminClient = adminClient; this.stateDirectory = stateDirectory; this.processingMode = processingMode; + this.lockExceptionRetryBackoffMs = lockExceptionRetryBackoffMs; final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); @@ -419,12 +423,15 @@ public class TaskManager { */ boolean tryToCompleteRestoration() { boolean allRunning = true; - + boolean mustBackoff = false; final List activeTasks = new LinkedList<>(); for (final Task task : tasks.values()) { try { task.initializeIfNeeded(); } catch (final LockException | TimeoutException e) { + if (e instanceof LockException) { + mustBackoff = true; // Prevent infinite looping + } // it is possible that if there are multiple threads within the instance that one thread // trying to grab the task from the other, while the other has not released the lock since // it did not participate in the rebalance. In this case we can just retry in the next iteration @@ -437,6 +444,11 @@ public class TaskManager { } } + if (mustBackoff) { + // Sleep before retrying the lock to prevent effectively running while(true) { tryLock(); } + Utils.sleep(lockExceptionRetryBackoffMs); + } + if (allRunning && !activeTasks.isEmpty()) { final Set restored = changelogReader.completedChangelogs(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7ab5b56d6..5a8f184a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -732,7 +732,8 @@ public class StreamThreadTest { null, null, null, - null + null, + 0 ) { @Override int commit(final Collection tasksToCommit) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 444437a45..e2e82d321 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -174,7 +174,8 @@ public class TaskManagerTest { topologyBuilder, adminClient, stateDirectory, - processingMode + processingMode, + 0 ); taskManager.setMainConsumer(consumer); } -- 2.28.0.windows.1