diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index d1ff674..fd2c482 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -53,8 +53,8 @@ object ImplicitConversions {
implicit def wrapTimeWindowedKStream[K, V](inner: TimeWindowedKStreamJ[K, V]): TimeWindowedKStream[K, V] =
new TimeWindowedKStream[K, V](inner)
- implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
- new KTable[K, V](inner)
+ // implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =
+ // new KTable[K, V](inner)
implicit def wrapKGroupedTable[K, V](inner: KGroupedTableJ[K, V]): KGroupedTable[K, V] =
new KGroupedTable[K, V](inner)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index fcec778..0511a5a 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -112,7 +112,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
def table[K, V](topic: String)(implicit consumed: Consumed[K, V]): KTable[K, V] =
- inner.table[K, V](topic, consumed)
+ new KTable[K, V](
+ inner.table[K, V](topic, consumed)
+ )
/**
* Create a [[kstream.KTable]] from the specified topic.
@@ -126,7 +128,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
): KTable[K, V] =
- inner.table[K, V](topic, consumed, materialized)
+ new KTable[K, V](
+ inner.table[K, V](topic, consumed, materialized)
+ )
/**
* Create a `GlobalKTable` from the specified topic. The serializers from the implicit `Consumed`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index f6a22d9..158c40b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -47,7 +47,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
*/
def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
val c: KTable[K, java.lang.Long] =
- inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ new KTable[K, java.lang.Long](
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ )
c.mapValues[Long](Long2long _)
}
@@ -63,7 +65,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
// works perfectly with Scala 2.12 though
- inner.reduce(reducer.asReducer, materialized)
+ new KTable[K, V](
+ inner.reduce(reducer.asReducer, materialized)
+ )
/**
* Aggregate the values of records in this stream by the grouped key.
@@ -78,7 +82,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
- inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
+ new KTable[K, VR](
+ inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
+ )
/**
* Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 76ea9ed..caa5922 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -46,7 +46,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
*/
def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
val c: KTable[K, java.lang.Long] =
- inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ new KTable[K, java.lang.Long](
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
+ )
c.mapValues[Long](Long2long _)
}
@@ -65,7 +67,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
// works perfectly with Scala 2.12 though
- inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
+ new KTable[K, V](
+ inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
+ )
/**
* Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -82,5 +86,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
- inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
+ new KTable[K, VR](
+ inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
+ )
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b669771..8f4354f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -46,7 +46,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
def filter(predicate: (K, V) => Boolean): KTable[K, V] =
- inner.filter(predicate(_, _))
+ new KTable[K, V](
+ inner.filter(predicate.asPredicate)
+ )
/**
* Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given
@@ -59,7 +61,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
- inner.filter(predicate.asPredicate, materialized)
+ new KTable[K, V](
+ inner.filter(predicate.asPredicate, materialized)
+ )
/**
* Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given
@@ -70,7 +74,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
- inner.filterNot(predicate(_, _))
+ new KTable[K, V](
+ inner.filterNot(predicate.asPredicate)
+ )
/**
* Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given
@@ -83,7 +89,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
- inner.filterNot(predicate.asPredicate, materialized)
+ new KTable[K, V](
+ inner.filterNot(predicate.asPredicate, materialized)
+ )
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -96,7 +104,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: V => VR): KTable[K, VR] =
- inner.mapValues[VR](mapper.asValueMapper)
+ new KTable[K, VR](
+ inner.mapValues[VR](mapper.asValueMapper)
+ )
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -111,7 +121,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
- inner.mapValues[VR](mapper.asValueMapper, materialized)
+ new KTable[K, VR](
+ inner.mapValues[VR](mapper.asValueMapper, materialized)
+ )
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -124,7 +136,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
- inner.mapValues[VR](mapper.asValueMapperWithKey)
+ new KTable[K, VR](
+ inner.mapValues[VR](mapper.asValueMapperWithKey)
+ )
/**
* Create a new [[KTable]] by transforming the value of each record in this [[KTable]] into a new value
@@ -139,7 +153,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
- inner.mapValues[VR](mapper.asValueMapperWithKey)
+ new KTable[K, VR](
+ inner.mapValues[VR](mapper.asValueMapperWithKey)
+ )
/**
* Convert this changelog stream to a [[KStream]].
@@ -183,7 +199,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
stateStoreNames: String*): KTable[K, VR] =
- inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
+ new KTable[K, VR](
+ inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
+ )
/**
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
@@ -208,7 +226,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
stateStoreNames: String*): KTable[K, VR] =
- inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)
+ new KTable[K, VR](
+ inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)
+ )
/**
* Re-groups the records of this [[KTable]] using the provided key/value mapper
@@ -232,7 +252,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#join`
*/
def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
- inner.join[VO, VR](other.inner, joiner.asValueJoiner)
+ new KTable[K, VR](
+ inner.join[VO, VR](other.inner, joiner.asValueJoiner)
+ )
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed inner equi join.
@@ -249,7 +271,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
- inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ new KTable[K, VR](
+ inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ )
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -261,7 +285,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
def leftJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
- inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
+ new KTable[K, VR](
+ inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
+ )
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed left equi join.
@@ -278,7 +304,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
- inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ new KTable[K, VR](
+ inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ )
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -290,7 +318,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
def outerJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
- inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
+ new KTable[K, VR](
+ inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
+ )
/**
* Join records of this [[KTable]] with another [[KTable]]'s records using non-windowed outer equi join.
@@ -307,7 +337,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
- inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ new KTable[K, VR](
+ inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
+ )
/**
* Get the name of the local state store used that can be used to query this [[KTable]].
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index a602767..4ed1783 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -49,7 +49,10 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArraySessionStore]
): KTable[Windowed[K], VR] =
- inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
+ new KTable[Windowed[K], VR](
+ inner
+ .aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
+ )
/**
* Count the number of records in this stream by the grouped key into `SessionWindows`.
@@ -61,7 +64,10 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
*/
def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = {
val c: KTable[Windowed[K], java.lang.Long] =
- inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
+ new KTable[Windowed[K], java.lang.Long](
+ inner
+ .count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
+ )
c.mapValues[Long](Long2long _)
}
@@ -77,5 +83,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
def reduce(reducer: (V, V) => V)(
implicit materialized: Materialized[K, V, ByteArraySessionStore]
): KTable[Windowed[K], V] =
- inner.reduce(reducer.asReducer, materialized)
+ new KTable[Windowed[K], V](
+ inner.reduce(reducer.asReducer, materialized)
+ )
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 9be5794..e99d418 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -48,7 +48,9 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
): KTable[Windowed[K], VR] =
- inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
+ new KTable[Windowed[K], VR](
+ inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
+ )
/**
* Count the number of records in this stream by the grouped key and the defined windows.
@@ -60,7 +62,9 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
*/
def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
val c: KTable[Windowed[K], java.lang.Long] =
- inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
+ new KTable[Windowed[K], java.lang.Long](
+ inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
+ )
c.mapValues[Long](Long2long _)
}
@@ -76,5 +80,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
def reduce(reducer: (V, V) => V)(
implicit materialized: Materialized[K, V, ByteArrayWindowStore]
): KTable[Windowed[K], V] =
- inner.reduce(reducer.asReducer, materialized)
+ new KTable[Windowed[K], V](
+ inner.reduce(reducer.asReducer, materialized)
+ )
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 3d1bab5..80bf3d3 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -57,7 +57,10 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
- val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
+ var userRegionsTable: KTable[String, String] = builder.table[String, String](userRegionsTopicJ)
+ userRegionsTable.filter { (_, _) =>
+ true
+ }
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTable[String, Long] =