Description
Create a general purpose windowed bolt that performs Joins on multiple data streams.
Since, depending on the topo config, the bolt could be receiving data either on 'default' streams or on named streams .... join bolt should be able to differentiate the incoming data based on names of upstream components as well as stream names.
Example:
The following SQL style join involving 4 tables :
select userId, key4, key2, key3 from stream1 join stream2 on stream2.userId = stream1.key1 join stream3 on stream3.key3 = stream2.userId left join stream4 on stream4.key4 = stream3.key3
Could be expressed using the Join Bolt over 4 named streams as :
new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that stream1/2/3/4 are names of streams. 'key1' is the key on which .join ("stream2", "userId", "stream1") //join stream2 on stream2.userId=stream1.key1 .join ("stream3", "key3", "stream2") //join stream3 on stream3.key3=stream2.userId .leftjoin ("stream4", "key4", "stream3") //left join stream4 on stream4.key4=stream3.key3 .select("userId, key4, key2, key3") // chose output fields .withWindowLength(..) .withSlidingInterval(..);
Or based on named source components :
new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that kafkaSpout1, hdfsSpout3 etc are names of upstream components .join ("kafkaSpout2", "userId", "kafkaSpout1" ) .join ("hdfsSpout3", "key3", "kafkaSpout2") .leftjoin ("mqttSpout1", "key4", "hdfsSpout3") .select ("userId, key4, key2, key3") .withWindowLength(..) .withSlidingInterval(..);
In order for the tuples to be joined correctly, 'fields grouping' should be employed on the incoming streams. Each stream should be grouped on the same key using which it will be joined against other streams. This is a restriction compared to SQL which allows join a table with others on any key and any number of keys.
For example: If a 'Stream1' is Fields Grouped on 'key1', we cannot use a different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1' can be joined using the same key with multiple other streams as show in this SQL.
select ....
from stream1
join stream2 on stream2.userId = stream1.key1
join stream3 on stream3.key3 = stream1.key2 // not supportable in Join Bolt
Consequently the join bolt's syntax is a bit simplified compared to SQL. The key name for any given stream only appears once, as soon the stream is introduced for the first time in the join. Thereafter that key is implicitly used for joining. See the case of 'stream3' being joined with both 'stream2' and 'stream4' in the first example.
Attachments
Issue Links
- links to