diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index c4b99e5..cc4477f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -17,12 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez.tools; -import java.util.List; +import java.util.IdentityHashMap; +import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tez.runtime.api.Input; -import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; @@ -32,6 +30,8 @@ */ public class TezMergedLogicalInput extends MergedLogicalInput { + private Map readyInputs = new IdentityHashMap(); + @Override public Reader getReader() throws Exception { return new InputMerger(getInputs()); @@ -39,6 +39,11 @@ public Reader getReader() throws Exception { @Override public void setConstituentInputIsReady(Input input) { - // ignore notification + synchronized (this) { + readyInputs.put(input, Boolean.TRUE); + } + if (readyInputs.size() == getInputs().size()) { + informInputReady(); + } } }