Details
-
Sub-task
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
ghx-label-9
Description
improvement idea for the future:
If Flink always writes EQ-delete files, and uses the same primary key a lot, we will have the same entry in the HashMap with multiple data sequence numbers. Then during probing, for each hash table lookup we need to loop over all the sequence numbers and check them. Actually we only need the largest data sequence number, the lower sequence numbers with the same primary keys don't add any value.
So we could add an Aggregation node to the right side of the join, like "PK1, PK2, ..., max(data_sequence_number), group by PK1, PK2, ...".
Now, we would need to decide when to add this node to the plan, or when we shouldn't. We should also avoid having an EXCHANGE between the aggregation node and the JOIN node, as it would be redundant as they would use the same partition key expressions (the primary keys).
If we had "hash teams" in Impala, we could always add this aggregator operator, as it would be in the same "hash team" with the JOIN operator, i.e. we wouldn't need to build the hash table twice. Microsoft's paper about hash joins and hash teams: https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=fc1c78cbef5062cf49fdb309b1935af08b759d2d