diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala index d1ff674..fd2c482 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala @@ -53,8 +53,8 @@ object ImplicitConversions { implicit def wrapTimeWindowedKStream[K, V](inner: TimeWindowedKStreamJ[K, V]): TimeWindowedKStream[K, V] = new TimeWindowedKStream[K, V](inner) - implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = - new KTable[K, V](inner) + // implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = + // new KTable[K, V](inner) implicit def wrapKGroupedTable[K, V](inner: KGroupedTableJ[K, V]): KGroupedTable[K, V] = new KGroupedTable[K, V](inner) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index fcec778..0511a5a 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -112,7 +112,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * @see `org.apache.kafka.streams.StreamsBuilder#table` */ def table[K, V](topic: String)(implicit consumed: Consumed[K, V]): KTable[K, V] = - inner.table[K, V](topic, consumed) + new KTable[K, V]( + inner.table[K, V](topic, consumed) + ) /** * Create a [[kstream.KTable]] from the specified topic. @@ -126,7 +128,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])( implicit consumed: Consumed[K, V] ): KTable[K, V] = - inner.table[K, V](topic, consumed, materialized) + new KTable[K, V]( + inner.table[K, V](topic, consumed, materialized) + ) /** * Create a `GlobalKTable` from the specified topic. The serializers from the implicit `Consumed` diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index f6a22d9..158c40b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -47,7 +47,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { */ def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { val c: KTable[K, java.lang.Long] = - inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) + new KTable[K, java.lang.Long]( + inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) + ) c.mapValues[Long](Long2long _) } @@ -63,7 +65,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place // works perfectly with Scala 2.12 though - inner.reduce(reducer.asReducer, materialized) + new KTable[K, V]( + inner.reduce(reducer.asReducer, materialized) + ) /** * Aggregate the values of records in this stream by the grouped key. @@ -78,7 +82,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) + new KTable[K, VR]( + inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) + ) /** * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala index 76ea9ed..caa5922 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala @@ -46,7 +46,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { */ def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { val c: KTable[K, java.lang.Long] = - inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) + new KTable[K, java.lang.Long]( + inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) + ) c.mapValues[Long](Long2long _) } @@ -65,7 +67,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place // works perfectly with Scala 2.12 though - inner.reduce(adder.asReducer, subtractor.asReducer, materialized) + new KTable[K, V]( + inner.reduce(adder.asReducer, subtractor.asReducer, materialized) + ) /** * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]] @@ -82,5 +86,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)( implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = - inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized) + new KTable[K, VR]( + inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized) + ) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index b669771..8f4354f 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -46,7 +46,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filter` */ def filter(predicate: (K, V) => Boolean): KTable[K, V] = - inner.filter(predicate(_, _)) + new KTable[K, V]( + inner.filter(predicate.asPredicate) + ) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given @@ -59,7 +61,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filter` */ def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = - inner.filter(predicate.asPredicate, materialized) + new KTable[K, V]( + inner.filter(predicate.asPredicate, materialized) + ) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given @@ -70,7 +74,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = - inner.filterNot(predicate(_, _)) + new KTable[K, V]( + inner.filterNot(predicate.asPredicate) + ) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given @@ -83,7 +89,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = - inner.filterNot(predicate.asPredicate, materialized) + new KTable[K, V]( + inner.filterNot(predicate.asPredicate, materialized) + ) /** * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value @@ -96,7 +104,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ def mapValues[VR](mapper: V => VR): KTable[K, VR] = - inner.mapValues[VR](mapper.asValueMapper) + new KTable[K, VR]( + inner.mapValues[VR](mapper.asValueMapper) + ) /** * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value @@ -111,7 +121,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = - inner.mapValues[VR](mapper.asValueMapper, materialized) + new KTable[K, VR]( + inner.mapValues[VR](mapper.asValueMapper, materialized) + ) /** * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value @@ -124,7 +136,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] = - inner.mapValues[VR](mapper.asValueMapperWithKey) + new KTable[K, VR]( + inner.mapValues[VR](mapper.asValueMapperWithKey) + ) /** * Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value @@ -139,7 +153,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = - inner.mapValues[VR](mapper.asValueMapperWithKey) + new KTable[K, VR]( + inner.mapValues[VR](mapper.asValueMapperWithKey) + ) /** * Convert this changelog stream to a [[KStream]]. @@ -183,7 +199,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { */ def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], stateStoreNames: String*): KTable[K, VR] = - inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*) + new KTable[K, VR]( + inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*) + ) /** * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). @@ -208,7 +226,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], stateStoreNames: String*): KTable[K, VR] = - inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*) + new KTable[K, VR]( + inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*) + ) /** * Re-groups the records of this [[KTable]] using the provided key/value mapper @@ -232,7 +252,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#join` */ def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] = - inner.join[VO, VR](other.inner, joiner.asValueJoiner) + new KTable[K, VR]( + inner.join[VO, VR](other.inner, joiner.asValueJoiner) + ) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join. @@ -249,7 +271,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { joiner: (V, VO) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = - inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized) + new KTable[K, VR]( + inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized) + ) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join. @@ -261,7 +285,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#leftJoin` */ def leftJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] = - inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner) + new KTable[K, VR]( + inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner) + ) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join. @@ -278,7 +304,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { joiner: (V, VO) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = - inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized) + new KTable[K, VR]( + inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized) + ) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join. @@ -290,7 +318,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#leftJoin` */ def outerJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] = - inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner) + new KTable[K, VR]( + inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner) + ) /** * Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join. @@ -307,7 +337,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { joiner: (V, VO) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = - inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized) + new KTable[K, VR]( + inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized) + ) /** * Get the name of the local state store used that can be used to query this [[KTable]]. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index a602767..4ed1783 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -49,7 +49,10 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)( implicit materialized: Materialized[K, VR, ByteArraySessionStore] ): KTable[Windowed[K], VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) + new KTable[Windowed[K], VR]( + inner + .aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) + ) /** * Count the number of records in this stream by the grouped key into `SessionWindows`. @@ -61,7 +64,10 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { */ def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = { val c: KTable[Windowed[K], java.lang.Long] = - inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]]) + new KTable[Windowed[K], java.lang.Long]( + inner + .count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]]) + ) c.mapValues[Long](Long2long _) } @@ -77,5 +83,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { def reduce(reducer: (V, V) => V)( implicit materialized: Materialized[K, V, ByteArraySessionStore] ): KTable[Windowed[K], V] = - inner.reduce(reducer.asReducer, materialized) + new KTable[Windowed[K], V]( + inner.reduce(reducer.asReducer, materialized) + ) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index 9be5794..e99d418 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -48,7 +48,9 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( implicit materialized: Materialized[K, VR, ByteArrayWindowStore] ): KTable[Windowed[K], VR] = - inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) + new KTable[Windowed[K], VR]( + inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized) + ) /** * Count the number of records in this stream by the grouped key and the defined windows. @@ -60,7 +62,9 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { */ def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = { val c: KTable[Windowed[K], java.lang.Long] = - inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]]) + new KTable[Windowed[K], java.lang.Long]( + inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]]) + ) c.mapValues[Long](Long2long _) } @@ -76,5 +80,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { def reduce(reducer: (V, V) => V)( implicit materialized: Materialized[K, V, ByteArrayWindowStore] ): KTable[Windowed[K], V] = - inner.reduce(reducer.asReducer, materialized) + new KTable[Windowed[K], V]( + inner.reduce(reducer.asReducer, materialized) + ) } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 3d1bab5..80bf3d3 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -57,7 +57,10 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) - val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) + var userRegionsTable: KTable[String, String] = builder.table[String, String](userRegionsTopicJ) + userRegionsTable.filter { (_, _) => + true + } // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTable[String, Long] =