diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 45bc0fd..87461df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -401,6 +401,10 @@ public void closeOp(boolean abort) throws HiveException { } } + protected boolean deferMultipleParentClose() { + return true; + } + private void fetchOneRow(byte tag) throws HiveException { try { boolean hasMore = sources[tag].pushRecord(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 47b5793..16aa378 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -712,7 +712,7 @@ private void processKey(Object row, @Override public void process(Object row, int tag) throws HiveException { firstRow = false; - ObjectInspector rowInspector = inputObjInspectors[tag]; + ObjectInspector rowInspector = inputObjInspectors[0]; // Total number of input rows is needed for hash aggregation only if (hashAggr) { numRowsInput++; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 00552a8..5c20b52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -75,6 +75,7 @@ private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; protected final transient Collection> asyncInitOperations = new HashSet<>(); + private transient int deferCloseParentCount; // It can be optimized later so that an operator operator (init/close) is performed // only after that operation has been performed on all the parents. This will require @@ -111,6 +112,7 @@ protected Operator() { childOperators = new ArrayList>(); parentOperators = new ArrayList>(); abortOp = new AtomicBoolean(false); + deferCloseParentCount = 0; } public Operator(CompilationOpContext cContext) { @@ -632,6 +634,18 @@ public void close(boolean abort) throws HiveException { return; } + /* + * Should we not close this operator until all parents have closed? + */ + if (deferMultipleParentClose()) { + deferCloseParentCount++; + if (deferCloseParentCount < parentOperators.size()) { + LOG.info("*** DEBUG *** defer close for operator " + getOperatorId() + "(" + deferCloseParentCount + ")"); + return; + } + LOG.info("*** DEBUG *** do deferred close for operator " + getOperatorId() + "(" + deferCloseParentCount + ")"); + } + // set state as CLOSE as long as all parents are closed // state == CLOSE doesn't mean all children are also in state CLOSE state = State.CLOSE; @@ -675,6 +689,15 @@ public void close(boolean abort) throws HiveException { protected void closeOp(boolean abort) throws HiveException { } + /** + * When true this operator wants close deferred until all parents have called + * close. + * @return + */ + protected boolean deferMultipleParentClose() { + return false; + } + private boolean jobCloseDone = false; // Operator specific logic goes here diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 23abec3..0642661 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -637,6 +637,10 @@ public void closeOp(boolean abort) throws HiveException { } } + protected boolean deferMultipleParentClose() { + return true; + } + @Override protected boolean allInitializedParentsAreClosed() { return true; diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q index 089ffe3..d521818 100644 --- ql/src/test/queries/clientpositive/tez_smb_1.q +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -85,3 +85,20 @@ join (select rt2.id from (select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 where vt1.id=vt2.id; + +explain +select count(*) from +(select rt1.key from +(select t1.key as key, t1.value as value from tab t1 group by key, value) rt1) vt1 +join +(select rt2.key from +(select t2.key as key, t2.value as value from tab_part t2 group by key, value) rt2) vt2 +where vt1.key=vt2.key; + +select count(*) from +(select rt1.key from +(select t1.key as key, t1.value as value from tab t1 group by key, value) rt1) vt1 +join +(select rt2.key from +(select t2.key as key, t2.value as value from tab_part t2 group by key, value) rt2) vt2 +where vt1.key=vt2.key;