diff --git a/build.gradle b/build.gradle index 696e1c431..fda65f91f 100644 --- a/build.gradle +++ b/build.gradle @@ -527,7 +527,6 @@ subprojects { description = 'Run checkstyle on all test Java sources' } - test.dependsOn('checkstyleMain', 'checkstyleTest') spotbugs { toolVersion = versions.spotbugs diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index 90d588288..433a18ddc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; import java.util.Objects; -public class ChangedDeserializer implements Deserializer>, WrappingNullableDeserializer, T> { +public class ChangedDeserializer implements Deserializer>, WrappingNullableDeserializer, Void, T> { private static final int NEWFLAG_SIZE = 1; @@ -37,9 +37,9 @@ public class ChangedDeserializer implements Deserializer>, Wrapping } @Override - public void setIfUnset(final Deserializer defaultDeserializer) { + public void setIfUnset(final Deserializer defaultKeyDeserializer, final Deserializer defaultValueDeserializer) { if (inner == null) { - inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + inner = Objects.requireNonNull(defaultValueDeserializer); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 551d94839..f5d63cdaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.StreamsException; import java.nio.ByteBuffer; import java.util.Objects; -public class ChangedSerializer implements Serializer>, WrappingNullableSerializer, T> { +public class ChangedSerializer implements Serializer>, WrappingNullableSerializer, Void, T> { private static final int NEWFLAG_SIZE = 1; @@ -38,9 +38,9 @@ public class ChangedSerializer implements Serializer>, WrappingNull } @Override - public void setIfUnset(final Serializer defaultSerializer) { + public void setIfUnset(final Serializer defaultKeySerializer, final Serializer defaultValueSerializer) { if (inner == null) { - inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + inner = Objects.requireNonNull(defaultValueSerializer); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java index a57e9a153..d0c0b14c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java @@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; -public interface WrappingNullableDeserializer extends Deserializer { - void setIfUnset(final Deserializer defaultDeserializer); -} +public interface WrappingNullableDeserializer extends Deserializer { + void setIfUnset(final Deserializer defaultKeyDeserializer, final Deserializer defaultValueDeserializer); +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java index 2d28e52db..8854a8d90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java @@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serializer; -public interface WrappingNullableSerializer extends Serializer { - void setIfUnset(final Serializer defaultSerializer); +public interface WrappingNullableSerializer extends Serializer { + void setIfUnset(final Serializer defaultKeySerializer, final Serializer defaultValueSerializer); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java index 31317c500..86191115f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java @@ -46,7 +46,7 @@ public class SubscriptionResponseWrapperSerde implements Serde - implements Serializer>, WrappingNullableSerializer, V> { + implements Serializer>, WrappingNullableSerializer, Void, V> { private Serializer serializer; @@ -55,9 +55,9 @@ public class SubscriptionResponseWrapperSerde implements Serde defaultSerializer) { + public void setIfUnset(final Serializer defaultKeySerializer, final Serializer defaultValueSerializer) { if (serializer == null) { - serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + serializer = Objects.requireNonNull(defaultValueSerializer); } } @@ -94,7 +94,7 @@ public class SubscriptionResponseWrapperSerde implements Serde - implements Deserializer>, WrappingNullableDeserializer, V> { + implements Deserializer>, WrappingNullableDeserializer, Void, V> { private Deserializer deserializer; @@ -103,9 +103,9 @@ public class SubscriptionResponseWrapperSerde implements Serde defaultDeserializer) { + public void setIfUnset(final Deserializer defaultKeyDeserializer, final Deserializer defaultValueDeserializer) { if (deserializer == null) { - deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + deserializer = Objects.requireNonNull(defaultValueDeserializer); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index 136128c53..d2cc989b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -50,7 +50,7 @@ public class SubscriptionWrapperSerde implements Serde } private static class SubscriptionWrapperSerializer - implements Serializer>, WrappingNullableSerializer, K> { + implements Serializer>, WrappingNullableSerializer, K, Void> { private final Supplier primaryKeySerializationPseudoTopicSupplier; private String primaryKeySerializationPseudoTopic = null; @@ -63,9 +63,9 @@ public class SubscriptionWrapperSerde implements Serde } @Override - public void setIfUnset(final Serializer defaultSerializer) { + public void setIfUnset(final Serializer defaultKeySerializer, final Serializer defaultValueSerializer) { if (primaryKeySerializer == null) { - primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be null"); + primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer); } } @@ -110,7 +110,7 @@ public class SubscriptionWrapperSerde implements Serde } private static class SubscriptionWrapperDeserializer - implements Deserializer>, WrappingNullableDeserializer, K> { + implements Deserializer>, WrappingNullableDeserializer, K, Void> { private final Supplier primaryKeySerializationPseudoTopicSupplier; private String primaryKeySerializationPseudoTopic = null; @@ -123,9 +123,9 @@ public class SubscriptionWrapperSerde implements Serde } @Override - public void setIfUnset(final Deserializer defaultDeserializer) { + public void setIfUnset(final Deserializer defaultKeyDeserializer, final Deserializer defaultValueDeserializer) { if (primaryKeyDeserializer == null) { - primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot be null"); + primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index e0f2510de..9b0a254b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -66,10 +66,13 @@ public class SinkNode extends ProcessorNode { valSerializer = (Serializer) context.valueSerde().serializer(); } - // if value serializers are internal wrapping serializers that may need to be given the default serializer + // if serializers are internal wrapping serializers that may need to be given the default serializer // then pass it the default one from the context if (valSerializer instanceof WrappingNullableSerializer) { - ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer()); + ((WrappingNullableSerializer) valSerializer).setIfUnset( + context.keySerde().serializer(), + context.valueSerde().serializer() + ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 717495ec6..8da907442 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; @@ -87,10 +88,13 @@ public class SourceNode extends ProcessorNode { this.valDeserializer = (Deserializer) context.valueSerde().deserializer(); } - // if value deserializers are internal wrapping deserializers that may need to be given the default + // if deserializers are internal wrapping deserializers that may need to be given the default // then pass it the default one from the context if (valDeserializer instanceof WrappingNullableDeserializer) { - ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer()); + ((WrappingNullableDeserializer) valDeserializer).setIfUnset( + context.keySerde().deserializer(), + context.valueSerde().deserializer() + ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest10049.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest10049.java new file mode 100644 index 000000000..14e99a141 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest10049.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.MatcherAssert.assertThat; + + +@RunWith(Parameterized.class) +public class KTableKTableForeignKeyJoinMaterializationIntegrationTest10049 { + + private static final String LEFT_TABLE = "left_table"; + private static final String RIGHT_TABLE = "right_table"; + private static final String OUTPUT = "output-topic"; + private final Properties streamsConfig; + private final boolean materialized; + private final boolean queriable; + + + /** + * A serde for any class that implements {@link JSONSerdeCompatible}. Note that the classes also need to + * be registered in the {@code @JsonSubTypes} annotation on {@link JSONSerdeCompatible}. + * + * @param The concrete type of the class that gets de/serialized + */ + public static class JSONSerde implements Serializer, Deserializer, Serde { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public void configure(final Map configs, final boolean isKey) {} + + @SuppressWarnings("unchecked") + @Override + public T deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + + try { + return (T) OBJECT_MAPPER.readValue(data, JSONSerdeCompatible.class); + } catch (final IOException e) { + throw new SerializationException(e); + } + } + + @Override + public byte[] serialize(final String topic, final T data) { + if (data == null) { + return null; + } + + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (final Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return this; + } + + @Override + public Deserializer deserializer() { + return this; + } + } + + /** + * An interface for registering types that can be de/serialized with {@link JSONSerde}. + */ + @SuppressWarnings("DefaultAnnotationParam") // being explicit for the example + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t") + @JsonSubTypes({ + @JsonSubTypes.Type(value = User.class, name = "user"), + @JsonSubTypes.Type(value = Foo.class, name = "foo"), + @JsonSubTypes.Type(value = FooUser.class, name = "foouser") + }) + public interface JSONSerdeCompatible { + + } + + // POJO classes + static public class User implements JSONSerdeCompatible { + public String userName; + public String foo; + + public User (String userName, String foo) { + this.userName = userName; + this.foo = foo; + } + + public User() { + super(); + } + } + + static public class Foo implements JSONSerdeCompatible { + public String foo; + public int bar; + + public Foo (String foo, int bar) { + this.foo = foo; + this.bar = bar; + } + + public Foo() { + super(); + } + } + + static public class FooUser implements JSONSerdeCompatible { + public String userName; + public String foo; + public int bar; + + public FooUser (String userName, String foo, int bar) { + this.userName = userName; + this.foo = foo; + this.bar = bar; + } + public FooUser() { + super(); + } + } + + public KTableKTableForeignKeyJoinMaterializationIntegrationTest10049(final boolean materialized, final boolean queriable) { + this.materialized = materialized; + this.queriable = queriable; + streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2"); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:9092"); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + + @Parameterized.Parameters(name = "materialized={0}, queriable={1}") + public static Collection data() { + return Arrays.asList( + new Object[] {false, false}, + new Object[] {true, false}, + new Object[] {true, true} + ); + } + + @Test + public void testHappyPathJoin() { + final Topology topology = getTopology(streamsConfig, "store"); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic userTopic = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new JSONSerde().serializer()); + final TestInputTopic fooTopic = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new JSONSerde().serializer()); + + final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new JSONSerde().deserializer()); + final KeyValueStore store = driver.getKeyValueStore("store"); + + userTopic.pipeInput("userA", new User("userA", "fooA")); + + assertThat( + outputTopic.readKeyValuesToMap(), + is(emptyMap()) + ); + if (materialized && queriable) { + assertThat( + asMap(store), + is(emptyMap()) + ); + } + + fooTopic.pipeInput("fooA", new Foo("fooA", 11)); + + Map results = outputTopic.readKeyValuesToMap(); + + System.out.println("results = " + results); + System.out.println("results.get(userA).userName = " + results.get("userA").userName); + assertThat("userA userName", results.get("userA").userName.equals("userA")); + assertThat("userA foo", results.get("userA").foo.equals("fooA")); + assertThat("bar must be equal to 11", results.get("userA").bar == 11); + + } + } + + private static Map asMap(final KeyValueStore store) { + final HashMap result = new HashMap<>(); + store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); + return result; + } + + private Topology getTopology(final Properties streamsConfig, + final String queryableStoreName) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KTable userKTable = builder.table(LEFT_TABLE);//, Consumed.with(Serdes.String(), new JSONSerde<>())); + final KTable fooKTable = builder.table(RIGHT_TABLE);//, Consumed.with(Serdes.String(), new JSONSerde<>())); + + final Function extractor = value -> value.foo; + final ValueJoiner joiner = (user, foo) -> { + return new FooUser(user.userName, foo.foo, foo.bar); + }; + + final Materialized> materialized; + if (queriable) { + materialized = Materialized.>as(queryableStoreName).withValueSerde(new JSONSerde()); + } else { + materialized = Materialized.with(null, new JSONSerde()); + } + + final KTable joinResult; + if (this.materialized) { + joinResult = userKTable.join( + fooKTable, + extractor, + joiner, + materialized + ); + } else { + joinResult = userKTable.join( + fooKTable, + extractor, + joiner + ); + } + + joinResult + .toStream() + .to(OUTPUT, Produced.with(Serdes.String(), new JSONSerde())); + + return builder.build(streamsConfig); + } +}