Stream-stream inner join is traditionally implemented using a two-way symmetric hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store.
- For each joining key, there can be multiple rows that have been received.
- So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
- Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
- Add the input row to corresponding stream’s state.
- If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.
Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.
- Queries to time range conditions - E.g. SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR
- Queries with windows as the matching key - E.g. SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour") (pseudo-SQL)