Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2334

Bolt for Joining streams

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0, 1.x
    • Fix Version/s: 2.0.0, 1.1.0
    • Component/s: None
    • Labels:
      None

      Description

      Create a general purpose windowed bolt that performs Joins on multiple data streams.

      Since, depending on the topo config, the bolt could be receiving data either on 'default' streams or on named streams .... join bolt should be able to differentiate the incoming data based on names of upstream components as well as stream names.

      Example:

      The following SQL style join involving 4 tables :

      select  userId, key4, key2, key3
      from stream1 
      join       stream2  on stream2.userId =  stream1.key1
      join       stream3  on stream3.key3   =  stream2.userId
      left join  stream4  on stream4.key4   =  stream3.key3
      

      Could be expressed using the Join Bolt over 4 named streams as :

      new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that stream1/2/3/4 are names of streams. 'key1' is the key on which 
           .join     ("stream2", "userId",  "stream1") //join stream2 on stream2.userId=stream1.key1
           .join     ("stream3", "key3",    "stream2") //join stream3 on stream3.key3=stream2.userId   
           .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on stream4.key4=stream3.key3
           .select("userId, key4, key2, key3")         // chose output fields
           .withWindowLength(..)
           .withSlidingInterval(..);
      

      Or based on named source components :

      new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that kafkaSpout1, hdfsSpout3 etc are names of upstream components 
           .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
           .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
           .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
           .select ("userId, key4, key2, key3")
           .withWindowLength(..)
           .withSlidingInterval(..);
      

      In order for the tuples to be joined correctly, 'fields grouping' should be employed on the incoming streams. Each stream should be grouped on the same key using which it will be joined against other streams. This is a restriction compared to SQL which allows join a table with others on any key and any number of keys.

      For example: If a 'Stream1' is Fields Grouped on 'key1', we cannot use a different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1' can be joined using the same key with multiple other streams as show in this SQL.

      select ....
      from stream1 
      join  stream2  on stream2.userId =  stream1.key1
      join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join Bolt 
      

      Consequently the join bolt's syntax is a bit simplified compared to SQL. The key name for any given stream only appears once, as soon the stream is introduced for the first time in the join. Thereafter that key is implicitly used for joining. See the case of 'stream3' being joined with both 'stream2' and 'stream4' in the first example.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                roshan_naik Roshan Naik
                Reporter:
                roshan_naik Roshan Naik
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 6h 20m
                  6h 20m