diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
index c4b99e5..cc4477f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
@@ -17,12 +17,10 @@
*/
package org.apache.hadoop.hive.ql.exec.tez.tools;
-import java.util.List;
+import java.util.IdentityHashMap;
+import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
@@ -32,6 +30,8 @@
*/
public class TezMergedLogicalInput extends MergedLogicalInput {
+ private Map readyInputs = new IdentityHashMap();
+
@Override
public Reader getReader() throws Exception {
return new InputMerger(getInputs());
@@ -39,6 +39,11 @@ public Reader getReader() throws Exception {
@Override
public void setConstituentInputIsReady(Input input) {
- // ignore notification
+ synchronized (this) {
+ readyInputs.put(input, Boolean.TRUE);
+ }
+ if (readyInputs.size() == getInputs().size()) {
+ informInputReady();
+ }
}
}