Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: streams
    • Labels:

      Description

      Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a, and assuming they are read from two topics which are partitioned on a and b respectively, they need to do the following pattern:

      tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
      
      tableA.join(tableB', joiner);
      

      Even if these two tables are read from two topics which are already partitioned on a, users still need to do the pre-aggregation in order to make the two joining streams to be on the same key. This is a draw-back from programability and we should fix it.

        Issue Links

          Activity

          Hide
          guozhang Guozhang Wang added a comment -
          Show
          guozhang Guozhang Wang added a comment - Please find the discussion and proposal page here: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins
          Hide
          guozhang Guozhang Wang added a comment -

          Another thing we may want to consider is to use a different memtable option in RocksDB: https://github.com/facebook/rocksdb/wiki/Hash-based-memtable-implementations

          Show
          guozhang Guozhang Wang added a comment - Another thing we may want to consider is to use a different memtable option in RocksDB: https://github.com/facebook/rocksdb/wiki/Hash-based-memtable-implementations
          Hide
          jfilipiak Jan Filipiak added a comment - - edited

          A few things I came accross building the current implementation based on the processor API.

          1. Partitioning
          I ended up with the need of passing an additional ValueMapper<K,K1> into the method. I had to use it in the Sinks partitioner to extract the partition/join-key from the key that is used for the repartition topic. It had to be extracted from the key as I still need to be able to pass nullvalues to the correct partition for deletes. This came from not knowing the number of partitions in the processor but only in the partitoner, this made the "API" kinda complicated.

          2. Range Select
          This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V> range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V> range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K>

          3. Key expansion
          After a join in this fashion, the key is what I started refering to as widened. Say you have KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition key, then, independently on the other table the new Key of the table must include KP and AK, wich is a wired thing compared to the traditonal relational database way. Imagin having a result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique key AK is not unique anymore, the processor might see the insert in the one partition before the delete in the other (eg when the rows KP was update). I think this should be embrased, because that is how it is. It should just be apparent for the user maybe as it needs to be dealt with in downstream processors.

          Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate the beauty of the threading model but stiching graphs together based on processornames and strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels and looks so much better than what is out there spark or storm. Watching their desprate attempts to put state in is a joy. Nice work. As soon as our implementation is hardend in production, Ill probably can share.

          Show
          jfilipiak Jan Filipiak added a comment - - edited A few things I came accross building the current implementation based on the processor API. 1. Partitioning I ended up with the need of passing an additional ValueMapper<K,K1> into the method. I had to use it in the Sinks partitioner to extract the partition/join-key from the key that is used for the repartition topic. It had to be extracted from the key as I still need to be able to pass nullvalues to the correct partition for deletes. This came from not knowing the number of partitions in the processor but only in the partitoner, this made the "API" kinda complicated. 2. Range Select This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V> range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V> range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K> 3. Key expansion After a join in this fashion, the key is what I started refering to as widened. Say you have KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition key, then, independently on the other table the new Key of the table must include KP and AK, wich is a wired thing compared to the traditonal relational database way. Imagin having a result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique key AK is not unique anymore, the processor might see the insert in the one partition before the delete in the other (eg when the rows KP was update). I think this should be embrased, because that is how it is. It should just be apparent for the user maybe as it needs to be dealt with in downstream processors. Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate the beauty of the threading model but stiching graphs together based on processornames and strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels and looks so much better than what is out there spark or storm. Watching their desprate attempts to put state in is a joy. Nice work. As soon as our implementation is hardend in production, Ill probably can share.
          Hide
          guozhang Guozhang Wang added a comment - - edited

          Thanks for the feedbacks!

          Re 1: Not sure I fully understand this. I thought you can pass a StreamPartitioner when calling addSink which should be sufficient?

          Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can use sth. similar to what you mentioned as range(K1 prefix) and check if key.startsWith(prefix) to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need to contribute back to RocksDB's JNI to make use of it.

          https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly

          Re 3: I think you do not need to keep both the old and new keys for repartitioning if the old values need to be sent as well, but rather send them as two separate records as <null, old> and <new, null> since after the repartitioning, they may be going to two different partitions and hence processed by two different joiners, which is the expected behavior. More precisely, we are going to send the <new, old> pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>, and partition on <PK-new> and <PK-old>. These two records may be sent to two different partitions and hence processed by two different processors.

          For example, if you have two KTables A and B, with the following schema:

          A:

          {key: a, value: a'}

          B:

          {key: b, value: a, c}

          And you want to join them by key a, now let's say table A just have two records:

          {a="a1", a'="a1-pre"}

          ,

          {a="a2", a'="a2-pre"}

          ,

          and an incoming record for table B comes as:

          {b="b", a="a1", c="c1"}

          Then a join result of

          {a="a1", joined = join("a1-pre", "c1")}

          should be output.

          Later when table B gets an update on the existing key "b":

          {b="b", a="a2", c="c2"}

          Two join results should be output: first negating the previous join result as

          {a="a1", joined = null}

          Then a new join result on the new re-partitioned key:

          {a="a2", joined = join("a2-pre", "c2")}

          Does that sound good to you?

          Show
          guozhang Guozhang Wang added a comment - - edited Thanks for the feedbacks! Re 1: Not sure I fully understand this. I thought you can pass a StreamPartitioner when calling addSink which should be sufficient? Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can use sth. similar to what you mentioned as range(K1 prefix) and check if key.startsWith(prefix) to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need to contribute back to RocksDB's JNI to make use of it. https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly Re 3: I think you do not need to keep both the old and new keys for repartitioning if the old values need to be sent as well, but rather send them as two separate records as <null, old> and <new, null> since after the repartitioning, they may be going to two different partitions and hence processed by two different joiners, which is the expected behavior. More precisely, we are going to send the <new, old> pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>, and partition on <PK-new> and <PK-old>. These two records may be sent to two different partitions and hence processed by two different processors. For example, if you have two KTables A and B , with the following schema: A: {key: a, value: a'} B: {key: b, value: a, c} And you want to join them by key a , now let's say table A just have two records: {a="a1", a'="a1-pre"} , {a="a2", a'="a2-pre"} , and an incoming record for table B comes as: {b="b", a="a1", c="c1"} Then a join result of {a="a1", joined = join("a1-pre", "c1")} should be output. Later when table B gets an update on the existing key "b": {b="b", a="a2", c="c2"} Two join results should be output: first negating the previous join result as {a="a1", joined = null} Then a new join result on the new re-partitioned key: {a="a2", joined = join("a2-pre", "c2")} Does that sound good to you?
          Hide
          jfilipiak Jan Filipiak added a comment - - edited

          I will just shoot a quick reply now, time somehow became sparse recently. Anyhow. The bottom line of our misunderstandings is always the same thing. My bad that I didn't see the wiki page, if that Range-query interface is addressed that's nice .

          Point 3 is the one that causes the most confusion I guess. In the repartition case we follow different pathes, where I am not sure that I was able to communicate mine well enough. I <3 the idea of having everything a derived store. ITE all this is beeing used to tail -F mysql-XXXX.bin | kafka | XXX | redis, therefore Redis become a derived store of mysql wich can be used for NoSql style reads. I infact am such a great fan of this concept that I tend to treat everything a derived store. For me this means a repartitioned topic is a derived store of the source topic. This stands in contrast to make a changelog out of it and materialize the changelog in say RocksDb. This leads to the "problem" that the changelog topic is not a derived store anymore. Wich gives me a personally bad feeling, it just pushes me out of my comfort zone. Confluent peeps seem to be in their comfort zone with change logging topics. In my narrative shit hits the fan when the property of beeing a derived store is lost. It leads to all the nasty things like beeing in the need of change logging your say RocksDbs as the intermidate topic wont hold stuff forever.

          In contrast to having a change-logging topic that I re-materialize and then changecapture again, I prefer todo the change capturing first and only maintain the state to wich downstream partitions a record is currently published. This works clean and nicely but brings with it what I call "key widening". Say I have KTable A and i want to repartition it to A' so that the topic containing A' is a derived store & logcompacted. Then I cant use Key<A> todo this for 2 reasons. The Stream partition, can only access the key to determine the partition to delete from (deletes come as null values), wich means the fields going to determine the partitions need to be in the key no matter what. Snippet:

          		topology.addSink(name, repartitionTopicName, new StreamPartitioner<K, VR>(){
          			private Serializer<KL> intermediateSerializer = intermediateSerde.serializer();
          			@Override
          			public Integer partition(K key, VR value, int numPartitions) {
          				KL newKey = intermideateKeyExtractor.apply(key);
          				//Copied from Default Partitioner, didn't want to create a CLUSTER object here to reuse it.
          				return (Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) % numPartitions )& 0x7fffffff;
          			}
          			
          		}, repartitionProcessorName);
          

          As you can see the result Key K contains the KL ( the key of the not repatitioned table).

          the second reason why this key must be there is that one needs to be able to build a derived stream A''. But since in A' a record can "move" from partition X to Y there is a race condition between the "insert" in Y and the delete in X. The repartitioner Processor repartitioning for A'' needs to treat them as different keys. If it would be the same key the delete would wipe the new value maybe. This puts downstream consumers of A'' also in the wired position that at any point in time there can be as many A-keys with different values as there are A' partitons -1 or a specific A key might vanish completly and then reappear. Wich is sometimes wanky to work around in the end application. But there is enough strategies to solve at least the multiple Akeys case, not so much for the complete fanish case. I hope this clarrifies stuff.

          Show
          jfilipiak Jan Filipiak added a comment - - edited I will just shoot a quick reply now, time somehow became sparse recently. Anyhow. The bottom line of our misunderstandings is always the same thing. My bad that I didn't see the wiki page, if that Range-query interface is addressed that's nice . Point 3 is the one that causes the most confusion I guess. In the repartition case we follow different pathes, where I am not sure that I was able to communicate mine well enough. I <3 the idea of having everything a derived store. ITE all this is beeing used to tail -F mysql-XXXX.bin | kafka | XXX | redis, therefore Redis become a derived store of mysql wich can be used for NoSql style reads. I infact am such a great fan of this concept that I tend to treat everything a derived store. For me this means a repartitioned topic is a derived store of the source topic. This stands in contrast to make a changelog out of it and materialize the changelog in say RocksDb. This leads to the "problem" that the changelog topic is not a derived store anymore. Wich gives me a personally bad feeling, it just pushes me out of my comfort zone. Confluent peeps seem to be in their comfort zone with change logging topics. In my narrative shit hits the fan when the property of beeing a derived store is lost. It leads to all the nasty things like beeing in the need of change logging your say RocksDbs as the intermidate topic wont hold stuff forever. In contrast to having a change-logging topic that I re-materialize and then changecapture again, I prefer todo the change capturing first and only maintain the state to wich downstream partitions a record is currently published. This works clean and nicely but brings with it what I call "key widening". Say I have KTable A and i want to repartition it to A' so that the topic containing A' is a derived store & logcompacted. Then I cant use Key<A> todo this for 2 reasons. The Stream partition, can only access the key to determine the partition to delete from (deletes come as null values), wich means the fields going to determine the partitions need to be in the key no matter what. Snippet: topology.addSink(name, repartitionTopicName, new StreamPartitioner<K, VR>(){ private Serializer<KL> intermediateSerializer = intermediateSerde.serializer(); @Override public Integer partition(K key, VR value, int numPartitions) { KL newKey = intermideateKeyExtractor.apply(key); //Copied from Default Partitioner, didn't want to create a CLUSTER object here to reuse it. return (Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) % numPartitions )& 0x7fffffff; } }, repartitionProcessorName); As you can see the result Key K contains the KL ( the key of the not repatitioned table). the second reason why this key must be there is that one needs to be able to build a derived stream A''. But since in A' a record can "move" from partition X to Y there is a race condition between the "insert" in Y and the delete in X. The repartitioner Processor repartitioning for A'' needs to treat them as different keys. If it would be the same key the delete would wipe the new value maybe. This puts downstream consumers of A'' also in the wired position that at any point in time there can be as many A-keys with different values as there are A' partitons -1 or a specific A key might vanish completly and then reappear. Wich is sometimes wanky to work around in the end application. But there is enough strategies to solve at least the multiple Akeys case, not so much for the complete fanish case. I hope this clarrifies stuff.
          Hide
          guozhang Guozhang Wang added a comment -

          Yeah this clarifies a lot. I see that you are not trying to creating a "table" inside Kafka Streams, but just want to re-partition the input binlog stream, and you are working on the lower-level APIs (originally I was confused since you mention "KTable" which is only available in the higher-level DSL). And here is my understanding / suggestions:

          1. the original Kafka topic that was directly piped from your MySQL bin-log has the form (key -> value)

          a ->

          {a', other-fields}

          and you want to repartition it into a new topic on field a', and also log compacted on a'.

          2. so instead of representing a delete / update record as "a -> null" / "a -> a'-new, other-fields-new", you can send the messages in a different format (there are already a few tools including Kafka Connect which allows to represent your binlog entries with such flexibility):

          a -> {pair

          {a'-old, a'-new}

          , other-fields-current-value }

          So that deletion becomes: a -> {pair

          {a'-old, null}

          , other-fields}

          3. in this case, you can access the value field to extract a' for partitioning even for deletion cases, and also for an update record, you can then send two records as the following to the re-partition topic:

          a'-old -> null,

          a'-new -> other-fields.

          Will that work for you?

          Show
          guozhang Guozhang Wang added a comment - Yeah this clarifies a lot. I see that you are not trying to creating a "table" inside Kafka Streams, but just want to re-partition the input binlog stream, and you are working on the lower-level APIs (originally I was confused since you mention "KTable" which is only available in the higher-level DSL). And here is my understanding / suggestions: 1. the original Kafka topic that was directly piped from your MySQL bin-log has the form (key -> value) a -> {a', other-fields} and you want to repartition it into a new topic on field a', and also log compacted on a'. 2. so instead of representing a delete / update record as "a -> null" / "a -> a'-new, other-fields-new", you can send the messages in a different format (there are already a few tools including Kafka Connect which allows to represent your binlog entries with such flexibility): a -> {pair {a'-old, a'-new} , other-fields-current-value } So that deletion becomes: a -> {pair {a'-old, null} , other-fields} 3. in this case, you can access the value field to extract a' for partitioning even for deletion cases, and also for an update record, you can then send two records as the following to the re-partition topic: a'-old -> null, a'-new -> other-fields. Will that work for you?
          Hide
          jfilipiak Jan Filipiak added a comment - - edited

          Hi, yes that is kinda where I am coming from. I completely understand where you are.
          Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner.

          Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api.

          So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => <C,List<Join<A,B>> this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C.

          All tables start of as one topic. keyed by this tables primary key
          Topic mysq__jadajadajada_A
          A.PK => A
          Topic mysq_B
          B.PK => B
          Topic mysq_C
          C.PK => C

          I am going to repartition B to A.PK now. In the first example without a widened key.
          Then it stays B.PK => B but partitioned by A.PK accordingly.

          then I can do the join with A and get
          B.PK => joined<B,A>

          as of your previous comment:

          Then a join result of
          {a="a1", joined = join("a1-pre", "c1")}

          Note the Key stays B.PK (unwindened).
          Now I am going to repartition based on C.PK still maintaining
          B.PK => joined<B,A>
          as the topic layout.
          Now, shit hits the fan. As I am doing my aggregation to become
          C,PK => List<Joined<A,B>>

          How would this aggregator looks now?

          List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>> current)
          {
             Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
             if(value == null)
             { 
                m.remove(key)
             }else
             {
               m.put(key,value)
             }
             return m.entrySet.asList
          
          }
          

          This wouldn't be much different with logged Changes<Joined<A,B>> only the remove and add would be to methods. The problem is, that it doesn't
          look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions.
          (the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined<A,B> /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined<A.PK,B.PK> the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation.

          I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something.
          My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated.

          PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head.

          Show
          jfilipiak Jan Filipiak added a comment - - edited Hi, yes that is kinda where I am coming from. I completely understand where you are. Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner. Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api. So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => <C,List<Join<A,B>> this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C. All tables start of as one topic. keyed by this tables primary key Topic mysq__jadajadajada_A A.PK => A Topic mysq_B B.PK => B Topic mysq_C C.PK => C I am going to repartition B to A.PK now. In the first example without a widened key. Then it stays B.PK => B but partitioned by A.PK accordingly. then I can do the join with A and get B.PK => joined<B,A> as of your previous comment: Then a join result of {a="a1", joined = join("a1-pre", "c1")} Note the Key stays B.PK (unwindened). Now I am going to repartition based on C.PK still maintaining B.PK => joined<B,A> as the topic layout. Now, shit hits the fan. As I am doing my aggregation to become C,PK => List<Joined<A,B>> How would this aggregator looks now? List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>> current) { Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>); if (value == null ) { m.remove(key) } else { m.put(key,value) } return m.entrySet.asList } This wouldn't be much different with logged Changes<Joined<A,B>> only the remove and add would be to methods. The problem is, that it doesn't look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions. (the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined<A,B> /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined<A.PK,B.PK> the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation. I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something. My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated. PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head.
          Hide
          guozhang Guozhang Wang added a comment -

          Yes, this does finally clarify your scenario, thanks!.

          I think the change<> pair can still help in your case, because it has the benefit that for aggregations for example, you have the clear information that "subtract the old value, and add the new value" instead of depending on whether the returned value is null. For example, the Streams DSL defines the aggregation operator in the following way (note that in your customized implementation you do not need to strictly follow the same pattern, but just to illustrate this idea):

          <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                         Aggregator<K, V, T> adder,
                                         Aggregator<K, V, T> substractor, ...);
          

          Let me try again with your example code and the aggregation pattern in the above Streams DSL, and if you do not agree let's have a small skype chat

          1. Suppose your current value in Table B is

          B.PK => B.V.old, which contains A.PK.old, C.PK, etc.

          And when you join tables A and B, you repartition the stream B by A.PK while still maintaining the message format as B.BK => B.V, and the join result is in the format of:

          B.PK => join<B.V, A.V>

          2. Now suppose you have an update on Table B, as B.PK => B.V.new, which contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a change pair of

          {old, new}

          , i.e.

          B.PK =>

          {B.V.old, B.V.new}

          , or more specifically:

          B.PK =>

          {<A.PK.old. C.PK, ...>, <A.PK.new, C.PK, ...>}

          3. When you repartition it based on A.PK value, this will result in two pairs sending to potentially two different partitions, as:

          B.PK =>

          {B.V.old, null}

          (sent to partition1)

          B.PK =>

          {null, B.V.new}

          (sent to partition2)

          4. These two records will be joined independently at two processors, each fetching one of the re-partitioned topic partition, and the result is:

          B.PK =>

          {joined(B.V.old, A.V.old), null}

          (here A.V.old corresponds to the value for key A.PK.old in Table A)

          B.PK =>

          {null, joined(B.V.new, A.V.new)}

          (here A.V.new corresponds to the value for key A.PK.new in Table A)

          and then they will be sent to the second topic that is partitioned on C.PK, and since their C.PK value is the same, they will be sent to the same partition, but in arbitrary order.

          5. The aggregation function consumes from the second re-partition topic based on C.PK, and does the aggregation by 1) call a subtract function on the old value of the pair, and then 2) call an add function on the new value of the pair, and if the value is null, skip that call. And more specifically the subtract / add functions look like:

          List<Joined<A,B>> subtractor.apply(C key, Joined<A,B> value, List<Joined<A,B>> current)
          {
             current.remove(key)
          
             return m.entrySet.asList
          }
          
          List<Joined<A,B>> adder.apply(C key, Joined<A,B> value, List<Joined<A,B>> current)
          {
             current.put(key, value)
          
             return m.entrySet.asList
          }
          

          And based on the order these two records are received, we will either call subtract(C.PK, joined(B.V.old, A.V.old), current) first, and then add(C.PK, joined(B.V.new, A.V.new), current, or vice versa, and either way it is correct, since B.V.old and B.V.new are different keys.

          Show
          guozhang Guozhang Wang added a comment - Yes, this does finally clarify your scenario, thanks!. I think the change<> pair can still help in your case, because it has the benefit that for aggregations for example, you have the clear information that "subtract the old value, and add the new value" instead of depending on whether the returned value is null. For example, the Streams DSL defines the aggregation operator in the following way (note that in your customized implementation you do not need to strictly follow the same pattern, but just to illustrate this idea): <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> adder, Aggregator<K, V, T> substractor, ...); Let me try again with your example code and the aggregation pattern in the above Streams DSL, and if you do not agree let's have a small skype chat 1. Suppose your current value in Table B is B.PK => B.V.old, which contains A.PK.old, C.PK, etc. And when you join tables A and B, you repartition the stream B by A.PK while still maintaining the message format as B.BK => B.V, and the join result is in the format of: B.PK => join<B.V, A.V> 2. Now suppose you have an update on Table B, as B.PK => B.V.new, which contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a change pair of {old, new} , i.e. B.PK => {B.V.old, B.V.new} , or more specifically: B.PK => {<A.PK.old. C.PK, ...>, <A.PK.new, C.PK, ...>} 3. When you repartition it based on A.PK value, this will result in two pairs sending to potentially two different partitions, as: B.PK => {B.V.old, null} (sent to partition1) B.PK => {null, B.V.new} (sent to partition2) 4. These two records will be joined independently at two processors, each fetching one of the re-partitioned topic partition, and the result is: B.PK => {joined(B.V.old, A.V.old), null} (here A.V.old corresponds to the value for key A.PK.old in Table A) B.PK => {null, joined(B.V.new, A.V.new)} (here A.V.new corresponds to the value for key A.PK.new in Table A) and then they will be sent to the second topic that is partitioned on C.PK, and since their C.PK value is the same, they will be sent to the same partition, but in arbitrary order. 5. The aggregation function consumes from the second re-partition topic based on C.PK, and does the aggregation by 1) call a subtract function on the old value of the pair, and then 2) call an add function on the new value of the pair, and if the value is null, skip that call. And more specifically the subtract / add functions look like: List<Joined<A,B>> subtractor.apply(C key, Joined<A,B> value, List<Joined<A,B>> current) { current.remove(key) return m.entrySet.asList } List<Joined<A,B>> adder.apply(C key, Joined<A,B> value, List<Joined<A,B>> current) { current.put(key, value) return m.entrySet.asList } And based on the order these two records are received, we will either call subtract(C.PK, joined(B.V.old, A.V.old), current) first, and then add(C.PK, joined(B.V.new, A.V.new), current , or vice versa, and either way it is correct, since B.V.old and B.V.new are different keys.
          Hide
          guozhang Guozhang Wang added a comment - - edited

          Jan Filipiak I am convinced that this combo-key is necessary to avoid out of ordering after talking with you offline. And I have updated the design proposal wiki accordingly: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins, feel free to take a look.

          Just a random thought as for your use case specifically: are relation A, B, and C all need to be captured as a KTable (i.e. the records as binlog / etc from some database table)? If the one with foreign key can be captured just a stream (i.e. KStream), then what you can do is to re-model your computation as (stream Join table1) Join table2, where stream Join table returns a stream. And in Kafka Streams DSL you can just do stream.selectKey(table1.key).join(table1).selectKey(table2.key).join(table2).

          Show
          guozhang Guozhang Wang added a comment - - edited Jan Filipiak I am convinced that this combo-key is necessary to avoid out of ordering after talking with you offline. And I have updated the design proposal wiki accordingly: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins , feel free to take a look. Just a random thought as for your use case specifically: are relation A, B, and C all need to be captured as a KTable (i.e. the records as binlog / etc from some database table)? If the one with foreign key can be captured just a stream (i.e. KStream), then what you can do is to re-model your computation as (stream Join table1) Join table2 , where stream Join table returns a stream . And in Kafka Streams DSL you can just do stream.selectKey(table1.key).join(table1).selectKey(table2.key).join(table2) .
          Hide
          jfilipiak Jan Filipiak added a comment - - edited

          Something that starts happening to us is that for low cardinality columns on the join, the prefix scan on the rocks can return a big amount of values. That leads to to much time spent between poll() and us loosing group membership. One could check the need for a poll() on the consumer while context.forward() maybe, as we do context.forward() for every row that comes from the prefix scan. The fix with setting session time-out very high, that we are currently using is not that good IMO

          Show
          jfilipiak Jan Filipiak added a comment - - edited Something that starts happening to us is that for low cardinality columns on the join, the prefix scan on the rocks can return a big amount of values. That leads to to much time spent between poll() and us loosing group membership. One could check the need for a poll() on the consumer while context.forward() maybe, as we do context.forward() for every row that comes from the prefix scan. The fix with setting session time-out very high, that we are currently using is not that good IMO
          Hide
          guozhang Guozhang Wang added a comment - - edited

          We are fixing this issue right now on the consumer layer with KIP-62: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

          Expecting to have it in the next minor release.

          Show
          guozhang Guozhang Wang added a comment - - edited We are fixing this issue right now on the consumer layer with KIP-62: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread Expecting to have it in the next minor release.
          Hide
          jfilipiak Jan Filipiak added a comment -

          The change doesn't seem to be in all the places, I thought about going Ktable<Pair<K1,K2>,T> but also settled with Ktable<K3,T> the other thing would just be generic overkill even though one can hide it from the user by having a PairSerde or something.

          Regarding your idea, I kind of fail to see how an update to table1 or table2 would be reflected in the output with regards to republish, isn't the only option there to have the stream materialized based on a window? Need to take a deeper look though.

          Show
          jfilipiak Jan Filipiak added a comment - The change doesn't seem to be in all the places, I thought about going Ktable<Pair<K1,K2>,T> but also settled with Ktable<K3,T> the other thing would just be generic overkill even though one can hide it from the user by having a PairSerde or something. Regarding your idea, I kind of fail to see how an update to table1 or table2 would be reflected in the output with regards to republish, isn't the only option there to have the stream materialized based on a window? Need to take a deeper look though.
          Hide
          guozhang Guozhang Wang added a comment -

          Yeah currently we do not have a windowed-stream table join, and hence the stream is not materialized but only the table. We can add this join type though in the next release if we feel it is a common request.

          Show
          guozhang Guozhang Wang added a comment - Yeah currently we do not have a windowed-stream table join, and hence the stream is not materialized but only the table. We can add this join type though in the next release if we feel it is a common request.
          Hide
          miguno Michael Noll added a comment - - edited

          Jan Filipiak: Now that support for global KTables is around the corner (see KIP-99 at https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649), would that serve some of your needs here? I am aware that "non-key joining in KTable" and "global KTables" is not a full overlap, but still the overlap is quite significant.

          Show
          miguno Michael Noll added a comment - - edited Jan Filipiak : Now that support for global KTables is around the corner (see KIP-99 at https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649 ), would that serve some of your needs here? I am aware that "non-key joining in KTable" and "global KTables" is not a full overlap, but still the overlap is quite significant.
          Hide
          jfilipiak Jan Filipiak added a comment -

          Hi, with regard of what I am trying here, the GlobalKTable is not usefull.
          It could be usefull if its source would also emmit change events. (I would still have todo the range lookup but i could save a repartition of my "bigger" table) The current design is not usuable here. :'(

          Show
          jfilipiak Jan Filipiak added a comment - Hi, with regard of what I am trying here, the GlobalKTable is not usefull. It could be usefull if its source would also emmit change events. (I would still have todo the range lookup but i could save a repartition of my "bigger" table) The current design is not usuable here. :'(
          Hide
          jfilipiak Jan Filipiak added a comment -

          AFAICS the table will be also filled by a different thread, IMO that makes it to hard to reason about the execution. If you look into the Table twice while processing a record, it might have different values. GlobalKTable makes me sad

          Show
          jfilipiak Jan Filipiak added a comment - AFAICS the table will be also filled by a different thread, IMO that makes it to hard to reason about the execution. If you look into the Table twice while processing a record, it might have different values. GlobalKTable makes me sad
          Hide
          vaughanp@advisory.com Paul Vaughan added a comment -

          Healthcare data is intrinsically relational, but the bulk of the data is related to patients in the sense that if the patient did not exist, the data would not exist. For example, these FHIR Resources all depend on a Patient: Encounter, MedicationOrder, Observation, Condition, etc., while Practitioner, Organization, etc. exist independent of patients. This suggests that a good partitioning strategy for processing this data would be to partition by patient in order to get good concurrency with minimal repartitioning/distribution. Data that is independent of the patient would likely need to be distributed to all nodes, but that data is relatively small.

          Processing this data includes checking for referential integrity, dealing with out-of-order data (Encounters that are received before the Patient is received), and re-keying. For example, when an Encounter arrives, a downstream version of that Encounter needs to be created with the patient’s downstream key. Similarly, when a new Patient arrives it should be given a downstream key and any Encounters that reference this patient need to be updated and sent downstream.

          But the data is also intrinsically keyed by something other than the patient. For example, an Encounter has a key and it is possible to get duplicate copies of an Encounter, either as corrections or simple duplicates. Thus it is desirable to use the intrinsic key with compacted topics, rather than using the patient as the Kafka topic key. While it is possible to key by one thing and partition by another using explicit partitioners, that seems both error prone and insufficient to keep the data only where it needs to be.

          Specifically, the High-level Streaming DSL does not seem to support the latter point. Without the foreign key support discussed here, it is necessary to do aggregation and remapping that cause implicit repartitioning. It seems determined to move the data around. It is not clear to me whether this KIP would eliminate that problem. Note that I found the documentation frustrating in that this repartitioning was not apparent from the documentation – it was most apparent by looking at the set of topics that get implicitly created.

          I would like to see the ability to transform a set of related incoming topics into a set of downstream topics including re-keying and sometimes renormalization using the high-level Streaming DSL. This seems like it is a start towards that, but is it sufficient?

          Show
          vaughanp@advisory.com Paul Vaughan added a comment - Healthcare data is intrinsically relational, but the bulk of the data is related to patients in the sense that if the patient did not exist, the data would not exist. For example, these FHIR Resources all depend on a Patient: Encounter, MedicationOrder, Observation, Condition, etc., while Practitioner, Organization, etc. exist independent of patients. This suggests that a good partitioning strategy for processing this data would be to partition by patient in order to get good concurrency with minimal repartitioning/distribution. Data that is independent of the patient would likely need to be distributed to all nodes, but that data is relatively small. Processing this data includes checking for referential integrity, dealing with out-of-order data (Encounters that are received before the Patient is received), and re-keying. For example, when an Encounter arrives, a downstream version of that Encounter needs to be created with the patient’s downstream key. Similarly, when a new Patient arrives it should be given a downstream key and any Encounters that reference this patient need to be updated and sent downstream. But the data is also intrinsically keyed by something other than the patient. For example, an Encounter has a key and it is possible to get duplicate copies of an Encounter, either as corrections or simple duplicates. Thus it is desirable to use the intrinsic key with compacted topics, rather than using the patient as the Kafka topic key. While it is possible to key by one thing and partition by another using explicit partitioners, that seems both error prone and insufficient to keep the data only where it needs to be. Specifically, the High-level Streaming DSL does not seem to support the latter point. Without the foreign key support discussed here, it is necessary to do aggregation and remapping that cause implicit repartitioning. It seems determined to move the data around. It is not clear to me whether this KIP would eliminate that problem. Note that I found the documentation frustrating in that this repartitioning was not apparent from the documentation – it was most apparent by looking at the set of topics that get implicitly created. I would like to see the ability to transform a set of related incoming topics into a set of downstream topics including re-keying and sometimes renormalization using the high-level Streaming DSL. This seems like it is a start towards that, but is it sufficient?
          Hide
          guozhang Guozhang Wang added a comment -

          Jan Filipiak I agree with you the KGlobalTable is not perfect for non-key joins, and that is because of a couple of trade-offs we have to make in the design. Much of it has been discussed in length in KIP-99:

          https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

          But let me tries to give a very brief summary:

          1) Note that different tasks, even if they are hosted by the same stream thread, can be executed at their own pace. For example, task1 could be executing with stream time 100 already, while task2 has not executed even a single record.
          2) When adding this globally replicated store, we need to determine whether it is replicated per-instance, per-thread, or per-task. If we want to have time synchronized between them we need to do that per-task or have the store itself to be "time-series indexed" so that we can query its different snapshots in time beyond just the most recent status. Both of which will largely increase the storage overhead.
          3) That is why we decided to go with the per-instance approach and do not trying to synchronize the update rates between global tables with any of the stream tasks.

          If you have some ideas how to improve this scenario, I'd love to hear them. cc Damian Guy

          Paul Vaughan I'm trying to understand your use case a bit better here: there are multiple streams that would be joined by one key "patient-id", but these streams' source topics are partitioned by another key "intrinsic key" for key-based log compaction. Is that right? If yes, then what you need to do is to repartition the data on the join key before executing the join assuming you are not going to do that as a global KTable - KStream join, and like you mentioned that can be done either explicitly or implicitly:

          encounters = builder.stream("topic1");  // keyed and partitioned on intrinsic key
          patients = builder.stream("topic2");  // keyed and partitioned on intrinsic key
          
          // explicitly
          
          encounters.selectKey(/*patient-id*/).through("repartition-topic1").join(patients.groupBy(/*patient-id*/).agg(...))
          
          // implicitly
          
          encounters.selectKey(/*patient-id*/).join(patients.groupBy(/*patient-id*/).agg(...))
          
          

          About documentation: thanks for your feedback, we can definitely improve our docs for that:

          Show
          guozhang Guozhang Wang added a comment - Jan Filipiak I agree with you the KGlobalTable is not perfect for non-key joins, and that is because of a couple of trade-offs we have to make in the design. Much of it has been discussed in length in KIP-99: https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams But let me tries to give a very brief summary: 1) Note that different tasks, even if they are hosted by the same stream thread, can be executed at their own pace. For example, task1 could be executing with stream time 100 already, while task2 has not executed even a single record. 2) When adding this globally replicated store, we need to determine whether it is replicated per-instance, per-thread, or per-task. If we want to have time synchronized between them we need to do that per-task or have the store itself to be "time-series indexed" so that we can query its different snapshots in time beyond just the most recent status. Both of which will largely increase the storage overhead. 3) That is why we decided to go with the per-instance approach and do not trying to synchronize the update rates between global tables with any of the stream tasks. If you have some ideas how to improve this scenario, I'd love to hear them. cc Damian Guy Paul Vaughan I'm trying to understand your use case a bit better here: there are multiple streams that would be joined by one key "patient-id", but these streams' source topics are partitioned by another key "intrinsic key" for key-based log compaction. Is that right? If yes, then what you need to do is to repartition the data on the join key before executing the join assuming you are not going to do that as a global KTable - KStream join, and like you mentioned that can be done either explicitly or implicitly: encounters = builder.stream( "topic1" ); // keyed and partitioned on intrinsic key patients = builder.stream( "topic2" ); // keyed and partitioned on intrinsic key // explicitly encounters.selectKey(/*patient-id*/).through( "repartition-topic1" ).join(patients.groupBy(/*patient-id*/).agg(...)) // implicitly encounters.selectKey(/*patient-id*/).join(patients.groupBy(/*patient-id*/).agg(...)) About documentation: thanks for your feedback, we can definitely improve our docs for that:
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user guozhangwang opened a pull request:

          https://github.com/apache/kafka/pull/3720

          [DO NOT MERGE] KAFKA-3705: non-key joins

          This is just for reviewing the diff easily to see how it is done by @jfillipiak.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/Kaiserchen/kafka KAFKA3705

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/3720.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3720


          commit 3da2b8f787a5d30dee2de71cf0f125ab3e57d89b
          Author: jfilipiak <jan.filipiak@trivago.com>
          Date: 2017-06-30T09:00:39Z

          onetomany join signature to show on mailing list

          commit cc9c6f4a68170fb829adb46a6de40ec0fc75716f
          Author: jfilipiak <jan.filipiak@trivago.com>
          Date: 2017-07-12T14:49:43Z

          stores

          commit 807e90aac82d7659310ce92066ac1df6e339068a
          Author: jfilipiak <jan.filipiak@trivago.com>
          Date: 2017-07-26T06:06:58Z

          just throw in most of the processors, wont build

          commit 1a6ff7b01ad35dd7eedf4c69aa534043ab1a8eb8
          Author: jfilipiak <jan.filipiak@trivago.com>
          Date: 2017-08-18T10:07:34Z

          random clean up

          commit ffe9b9496afbdad73bfcb9c014b6045b8ca95e79
          Author: jfilipiak <jan.filipiak@trivago.com>
          Date: 2017-08-19T19:22:02Z

          clean up as much as possible


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/3720 [DO NOT MERGE] KAFKA-3705 : non-key joins This is just for reviewing the diff easily to see how it is done by @jfillipiak. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Kaiserchen/kafka KAFKA3705 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3720.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3720 commit 3da2b8f787a5d30dee2de71cf0f125ab3e57d89b Author: jfilipiak <jan.filipiak@trivago.com> Date: 2017-06-30T09:00:39Z onetomany join signature to show on mailing list commit cc9c6f4a68170fb829adb46a6de40ec0fc75716f Author: jfilipiak <jan.filipiak@trivago.com> Date: 2017-07-12T14:49:43Z stores commit 807e90aac82d7659310ce92066ac1df6e339068a Author: jfilipiak <jan.filipiak@trivago.com> Date: 2017-07-26T06:06:58Z just throw in most of the processors, wont build commit 1a6ff7b01ad35dd7eedf4c69aa534043ab1a8eb8 Author: jfilipiak <jan.filipiak@trivago.com> Date: 2017-08-18T10:07:34Z random clean up commit ffe9b9496afbdad73bfcb9c014b6045b8ca95e79 Author: jfilipiak <jan.filipiak@trivago.com> Date: 2017-08-19T19:22:02Z clean up as much as possible

            People

            • Assignee:
              Unassigned
              Reporter:
              guozhang Guozhang Wang
            • Votes:
              1 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:

                Development