Details
Description
Hi,
Consider the following example
inp = LOAD '$INPUT' AS (memberId:long, shopId:long, score:int); tuplified = FOREACH inp GENERATE (memberId, shopId) AS tuplify, score; D1 = FOREACH tuplified GENERATE tuplify.memberId as memberId, tuplify.shopId as shopId, score AS score; D2 = FOREACH tuplified GENERATE tuplify.memberId as memberId, tuplify.shopId as shopId, score AS score; J = JOIN D1 By shopId, D2 by shopId; K = FOREACH J GENERATE D1::memberId AS member_id1, D2::memberId AS member_id2, D1::shopId as shop; EXPLAIN K; DUMP K;
It is a bit weird written like that, but it provides a minimal reproduction case (in the real case, the "tuplified" phase came from a multi-key grouping).
On input data:
1 1001 101 1 1002 103 1 1003 102 1 1004 102 2 1005 101 2 1003 101 2 1002 123 3 1042 101 3 1005 101 3 1002 133
This will give a wrongful output like ..
(1,1001,1001) (1,1002,1002) (1,1002,1002) (1,1002,1002)
The second column should be a member id so (1,2,3,4,5).
In the initial case, there was a FILTER (member_id1 < member_id2) after K, and computation failed because of PushUpFilter optimization mistakenly moving the LOFilter operation before the join, at a place where it tried to work on a tuple and failed.
My understanding of the issue is that when the ImplicitSplitInserter creates the LOSplitOutputs, it will correctly reset the schema, and the LOSplitOutput will regenerate uids for the fields of D1 and D2 ... but will not do that on the tuple members.
The logical plan after the ImplicitSplitINserter will look like (simplified)
|---D1: (Name: LOForEach Schema: memberId#124:long,shopId#125:long)ColumnPrune:InputUids=[127]ColumnPrune:OutputUids=[125, 124] |---tuplified: (Name: LOSplitOutput Schema: tuplify#127:tuple(memberId#124:long,shopId#125:long))ColumnPrune:InputUids=[123]ColumnPrune:OutputUids=[127] |---tuplified: (Name: LOSplit Schema: tuplify#123:tuple(memberId#124:long,shopId#125:long))ColumnPrune:InputUids=[123]ColumnPrune:OutputUids=[123] |---D2: (Name: LOForEach Schema: memberId#124:long,shopId#125:long)ColumnPrune:InputUids=[130]ColumnPrune:OutputUids=[125, 124] |---tuplified: (Name: LOSplitOutput Schema: tuplify#130:tuple(memberId#124:long,shopId#125:long))ColumnPrune:InputUids=[123]ColumnPrune:OutputUids=[130] |---tuplified: (Name: LOSplit Schema: tuplify#123:tuple(memberId#124:long,shopId#125:long))ColumnPrune:InputUids=[123]ColumnPrune:OutputUids=[123]
tuplified correctly gets a new uid (127 and 130) but the members of the tuple don't. When they get reprojected, both branches have the same uid and the join looks like:
|---J: (Name: LOJoin(HASH) Schema: D1::memberId#124:long,D1::shopId#125:long,D2::memberId#139:long,D2::shopId#132:long)ColumnPrune:InputUids=[125, 124, 132]ColumnPrune:OutputUids=[125, 124, 132] | | | shopId:(Name: Project Type: long Uid: 125 Input: 0 Column: 1) | | | shopId:(Name: Project Type: long Uid: 125 Input: 1 Column: 1)
If for example instead of reprojecting "memberId", we project "memberId+0", a new node is created, and ultimately the two branches of the join will correctly get separate uids.
My understanding is that LOSplitOutput.getSchema() should recurse on nested schema fields. However, I only have a light understanding of all of the logical plan handling, so I may be completely wrong.
Attached is a draft of patch and a test reproducing the issue. Unfortunately, I haven't been able to run all unit tests with the "fix" (I have some weird hangs)
I'd be happy if you could indicate if that looks like completely the wrong way to fix the issue.