From 852400b2a8e6c6593086279c356dc0d7f4caba63 Mon Sep 17 00:00:00 2001 From: HeryLong Date: Tue, 17 Sep 2019 08:34:47 +0800 Subject: [PATCH 1/2] just restart current reconfigurated connectorTask, not all cached updateTasks --- .../kafka/connect/storage/KafkaConfigBackingStore.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 3572d8cf8..ce911ba87 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -56,6 +56,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** *

@@ -633,8 +634,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { connectorTaskCounts.put(connectorName, newTaskCount); } - if (started) + if (started) { + // just restart current reconfigurated connectorTask, not all cached updateTasks + updatedTasks = updatedTasks.stream().filter(tasksId -> tasksId.connector().equals(connectorName)).collect(Collectors.toList()); updateListener.onTaskConfigUpdate(updatedTasks); + } } else { log.error("Discarding config update record with invalid key: {}", record.key()); } -- 2.19.1.windows.1