Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (revision 1632443) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (working copy) @@ -87,6 +87,8 @@ private final Object endOfEvents = new Object(); + private int totalEventCount = 0; + public DynamicPartitionPruner() { } @@ -114,7 +116,7 @@ // synchronous event processing loop. Won't return until all events have // been processed. this.processEvents(); - this.prunePartitions(work); + this.prunePartitions(work, context); LOG.info("Ok to proceed."); } @@ -163,12 +165,22 @@ } } - private void prunePartitions(MapWork work) throws HiveException { + private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException { + int expectedEvents = 0; for (String source : this.sourceInfoMap.keySet()) { for (SourceInfo si : this.sourceInfoMap.get(source)) { + int taskNum = context.getVertexNumTasks(source); + LOG.info("Expecting " + taskNum + " events for vertex " + source); + expectedEvents += taskNum; prunePartitionSingleSource(source, si, work); } } + + // sanity check. all tasks must submit events for us to succeed. + if (expectedEvents != totalEventCount) { + LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount); + throw new HiveException("Incorrect event count in dynamic parition pruning"); + } } private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work) @@ -396,7 +408,8 @@ public void addEvent(InputInitializerEvent event) { synchronized(sourcesWaitingForEvents) { if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) { - queue.offer(event); + ++totalEventCount; + queue.offer(event); } } }