diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index db94ac0..9d9329f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -415,6 +415,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } boolean numPartitionsNeeded; + Map assignment; do { numPartitionsNeeded = false; @@ -439,7 +440,20 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } else { numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName); if (numPartitionsCandidate == null) { - repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE; + assignment = new HashMap<>(); + for (final ClientMetadata clientMetadata : clientsMetadata.values()) { + for (final String consumerId : clientMetadata.consumers) { + assignment.put(consumerId, new Assignment( + Collections.emptyList(), + new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + AssignmentInfo.UNKNOWN_PARTITION).encode() + )); + } + } + return assignment; } } @@ -582,7 +596,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); - if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 3) { + if (minReceivedMetadataVersion >= 2) { for (final Map.Entry entry : clientsMetadata.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -600,7 +614,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } taskManager.setPartitionsByHostState(partitionsByHostState); - final Map assignment; if (versionProbing) { assignment = versionProbingAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index c577830..ee5f5fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -41,11 +41,13 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int LATEST_SUPPORTED_VERSION = 4; static final int UNKNOWN = -1; + public static final int UNKNOWN_PARTITION = 1; private final int usedVersion; private final int latestSupportedVersion; + private int errCode; private List activeTasks; private Map> standbyTasks; private Map> partitionsByHost; @@ -55,6 +57,7 @@ public class AssignmentInfo { final int latestSupportedVersion) { this.usedVersion = version; this.latestSupportedVersion = latestSupportedVersion; + this.errCode = 0; } public AssignmentInfo(final List activeTasks, @@ -74,25 +77,37 @@ public class AssignmentInfo { final List activeTasks, final Map> standbyTasks, final Map> hostState) { - this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0); if (version < 1 || version > LATEST_SUPPORTED_VERSION) { throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + "; was: " + version); } } - + public AssignmentInfo(final int version, + final List activeTasks, + final Map> standbyTasks, + final Map> hostState, + final int errCode) { + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode); + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } + } // for testing only; don't apply version checks AssignmentInfo(final int version, final int latestSupportedVersion, final List activeTasks, final Map> standbyTasks, - final Map> hostState) { + final Map> hostState, + final int errCode) { this.usedVersion = version; this.latestSupportedVersion = latestSupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; + this.errCode = errCode; } public int version() { @@ -133,6 +148,9 @@ public class AssignmentInfo { case 3: encodeVersionThree(out); break; + case 4: + encodeVersionFour(out); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -202,7 +220,10 @@ public class AssignmentInfo { encodeActiveAndStandbyTaskAssignment(out); encodePartitionsByHost(out); } - + private void encodeVersionFour(final DataOutputStream out) throws IOException { + encodeVersionThree(out); + out.writeInt(errCode); + } /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ @@ -228,6 +249,11 @@ public class AssignmentInfo { assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); decodeVersionThreeData(assignmentInfo, in); break; + case 4: + final int latestSupportedVer = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVer); + decodeVersionFourData(assignmentInfo, in); + break; default: final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -299,7 +325,11 @@ public class AssignmentInfo { decodeStandbyTasks(assignmentInfo, in); decodeGlobalAssignmentData(assignmentInfo, in); } - + private static void decodeVersionFourData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeVersionThreeData(assignmentInfo, in); + assignmentInfo.errCode = in.readInt(); + } @Override public int hashCode() { return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 4327e8f..3df215a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1277,16 +1277,17 @@ public class StreamsPartitionAssignorTest { final Map assignment = partitionAssignor.assign(metadata, subscriptions); assertThat(assignment.size(), equalTo(2)); - assertThat( - AssignmentInfo.decode(assignment.get("consumer1").userData()), - equalTo(new AssignmentInfo( - new ArrayList<>(activeTasks), - standbyTaskMap, - Collections.>emptyMap() - ))); + AssignmentInfo c1Info = AssignmentInfo.decode(assignment.get("consumer1").userData()); + assertThat(c1Info.standbyTasks(), equalTo(standbyTaskMap)); + assertThat(c1Info.partitionsByHost().isEmpty(), equalTo(true)); + assertThat(c1Info.activeTasks().containsAll(activeTasks), equalTo(true)); + assertThat(activeTasks.containsAll(c1Info.activeTasks()), equalTo(true)); assertThat(assignment.get("consumer1").partitions(), equalTo(Utils.mkList(t1p0, t1p1))); - assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); + AssignmentInfo futureInfo = AssignmentInfo.decode(assignment.get("future-consumer").userData()); + assertThat(futureInfo.activeTasks().isEmpty(), equalTo(true)); + assertThat(futureInfo.standbyTasks().isEmpty(), equalTo(true)); + assertThat(futureInfo.partitionsByHost().isEmpty(), equalTo(true)); assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index c7382e7..2c884de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -70,21 +70,21 @@ public class AssignmentInfoTest { @Test public void shouldEncodeAndDecodeVersion1() { final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.>emptyMap()); + final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.>emptyMap(), 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion2() { final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment, 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion3() { final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); }