diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java index 7abd94d..833f5da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java @@ -106,7 +106,7 @@ public DynamicPartitionPruner(InputInitializerContext context, MapWork work, Job this.context = context; this.work = work; this.jobConf = jobConf; - synchronized (this) { + synchronized (sourcesWaitingForEvents) { initialize(); } } @@ -116,21 +116,11 @@ public void prune() InterruptedException, HiveException { synchronized(sourcesWaitingForEvents) { - if (sourcesWaitingForEvents.isEmpty()) { return; } - - Set states = Collections.singleton(VertexState.SUCCEEDED); - for (String source : sourcesWaitingForEvents) { - // we need to get state transition updates for the vertices that will send - // events to us. once we have received all events and a vertex has succeeded, - // we can move to do the pruning. - context.registerForVertexStateUpdates(source, states); - } } - LOG.info("Waiting for events (" + sourceInfoCount + " sources) ..."); // synchronous event processing loop. Won't return until all events have // been processed. this.processEvents(); @@ -138,17 +128,7 @@ public void prune() LOG.info("Ok to proceed."); } - public BlockingQueue getQueue() { - return queue; - } - - private void clear() { - sourceInfoMap.clear(); - sourceInfoCount = 0; - } - private void initialize() throws SerDeException { - this.clear(); Map columnMap = new HashMap(); // sources represent vertex names Set sources = work.getEventSourceTableDescMap().keySet(); @@ -196,6 +176,18 @@ private void initialize() throws SerDeException { columnMap.put(columnName, si); } } + + if (!sourcesWaitingForEvents.isEmpty()) { + Set states = Collections.singleton(VertexState.SUCCEEDED); + for (String source : sourcesWaitingForEvents) { + // we need to get state transition updates for the vertices that will send + // events to us. once we have received all events and a vertex has succeeded, + // we can move to do the pruning. + context.registerForVertexStateUpdates(source, states); + } + + LOG.info("Waiting for events (" + sourceInfoCount + " sources) ..."); + } } private void prunePartitions() throws HiveException { @@ -476,6 +468,8 @@ public void addEvent(InputInitializerEvent event) { numEventsSeenPerSource.get(event.getSourceVertexName()).increment(); queue.offer(event); checkForSourceCompletion(event.getSourceVertexName()); + } else { + LOG.info("Arrived event from " + event.getSourceVertexName() + ", which was not expected"); } } }