diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a26822a..24085e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -57,7 +57,7 @@ public class StreamsBuilder { final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder; private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); - + /** * Create a {@link KStream} from the specified topics. * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value @@ -974,6 +974,7 @@ public class StreamsBuilder { * @param streams the {@link KStream}s to be merged * @return a {@link KStream} containing all records of the given streams */ + @Deprecated public synchronized KStream merge(final KStream... streams) { return internalStreamsBuilder.merge(streams); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 3a51fad..e33920a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -448,6 +448,11 @@ public interface KStream { void print(final Printed printed); /** + * Merges the given streams into one larger stream. + * @param StreamsBuilder instance and streams wanted to be merged. + */ + KStream merge(StreamsBuilder builder, KStream... streams); + /** * Write the records of this stream to a file at the given path. * This function will use the generated name of the parent processor node to label the key/value pairs printed to * the file. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7201a00..b0c8e21 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; @@ -153,7 +154,7 @@ public class KStreamImpl extends AbstractStream implements KStream>() { @Override public KeyValue apply(K key, V value) { - return new KeyValue<>(mapper.apply(key, value), value); + return (KeyValue) new KeyValue<>(mapper.apply(key, value), value); } } ), @@ -182,7 +183,9 @@ public class KStreamImpl extends AbstractStream implements KStream(builder, name, sourceNodes, this.repartitionRequired); } - + + + @Override public void print() { print(defaultKeyValueMapper, null, null, this.name); @@ -344,7 +347,12 @@ public class KStreamImpl extends AbstractStream implements KStream KStream merge(StreamsBuilder builder, KStream ... streams) { + return builder.merge(streams); + } + public static KStream merge(final InternalStreamsBuilder builder, final KStream[] streams) { if (streams == null || streams.length == 0) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 5e8687f..a2e5ca3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -467,5 +467,4 @@ public class KStreamImplTest { public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() { testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null); } - }