Index: pom.xml =================================================================== --- pom.xml (revision 1629103) +++ pom.xml (working copy) @@ -151,7 +151,7 @@ 1.0.1 1.7.5 4.0.4 - 0.5.0 + 0.5.1 2.2.0 1.1 0.2 Index: ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (revision 1629103) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java (working copy) @@ -60,9 +60,6 @@ protected void initDataBuffer(boolean skipPruning) throws HiveException { buffer = new DataOutputBuffer(); try { - // where does this go to? - buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName()); - // add any other header info getConf().writeEventHeader(buffer); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (revision 1629103) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (working copy) @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +60,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -77,12 +79,13 @@ private final BytesWritable writable = new BytesWritable(); - private final BlockingQueue queue = - new LinkedBlockingQueue(); + private final BlockingQueue queue = new LinkedBlockingQueue(); + private final Set sourcesWaitingForEvents = new HashSet(); + private int sourceInfoCount = 0; - private InputInitializerContext context; + private final Object endOfEvents = new Object(); public DynamicPartitionPruner() { } @@ -91,9 +94,22 @@ throws SerDeException, IOException, InterruptedException, HiveException { - this.context = context; - this.initialize(work, jobConf); + synchronized(sourcesWaitingForEvents) { + initialize(work, jobConf); + if (sourcesWaitingForEvents.isEmpty()) { + return; + } + + Set states = Collections.singleton(VertexState.SUCCEEDED); + for (String source : sourcesWaitingForEvents) { + // we need to get state transition updates for the vertices that will send + // events to us. once we have received all events and a vertex has succeeded, + // we can move to do the pruning. + context.registerForVertexStateUpdates(source, states); + } + } + LOG.info("Waiting for events (" + sourceInfoCount + " items) ..."); // synchronous event processing loop. Won't return until all events have // been processed. @@ -102,7 +118,7 @@ LOG.info("Ok to proceed."); } - public BlockingQueue getQueue() { + public BlockingQueue getQueue() { return queue; } @@ -111,11 +127,14 @@ sourceInfoCount = 0; } - private void initialize(MapWork work, JobConf jobConf) throws SerDeException { + public void initialize(MapWork work, JobConf jobConf) throws SerDeException { this.clear(); Map columnMap = new HashMap(); + Set sources = work.getEventSourceTableDescMap().keySet(); - for (String s : work.getEventSourceTableDescMap().keySet()) { + sourcesWaitingForEvents.addAll(sources); + + for (String s : sources) { List tables = work.getEventSourceTableDescMap().get(s); List columnNames = work.getEventSourceColumnNameMap().get(s); List partKeyExprs = work.getEventSourcePartKeyExprMap().get(s); @@ -277,46 +296,30 @@ private void processEvents() throws SerDeException, IOException, InterruptedException { int eventCount = 0; - int neededEvents = getExpectedNumberOfEvents(); - while (neededEvents > eventCount) { - InputInitializerEvent event = queue.take(); + while (true) { + Object element = queue.take(); + + if (element == endOfEvents) { + // we're done processing events + break; + } + + InputInitializerEvent event = (InputInitializerEvent) element; + LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName() + ", " + (event.getUserPayload().limit() - event.getUserPayload().position())); - processPayload(event.getUserPayload()); + processPayload(event.getUserPayload(), event.getSourceVertexName()); eventCount += 1; - neededEvents = getExpectedNumberOfEvents(); - LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount); } + LOG.info("Received events: " + eventCount); } - private int getExpectedNumberOfEvents() throws InterruptedException { - int neededEvents = 0; + @SuppressWarnings("deprecation") + private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException, + IOException { - boolean notInitialized; - do { - neededEvents = 0; - notInitialized = false; - for (String s : sourceInfoMap.keySet()) { - int multiplier = sourceInfoMap.get(s).size(); - int taskNum = context.getVertexNumTasks(s); - LOG.info("Vertex " + s + " has " + taskNum + " events."); - if (taskNum < 0) { - notInitialized = true; - Thread.sleep(10); - continue; - } - neededEvents += (taskNum * multiplier); - } - } while (notInitialized); - - return neededEvents; - } - - @SuppressWarnings("deprecation") - private String processPayload(ByteBuffer payload) throws SerDeException, IOException { DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload)); - String sourceName = in.readUTF(); String columnName = in.readUTF(); boolean skip = in.readBoolean(); @@ -390,4 +393,26 @@ } } + public void addEvent(InputInitializerEvent event) { + synchronized(sourcesWaitingForEvents) { + if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) { + queue.offer(event); + } + } + } + + public void processVertex(String name) { + LOG.info("Vertex succeeded: " + name); + + synchronized(sourcesWaitingForEvents) { + sourcesWaitingForEvents.remove(name); + + if (sourcesWaitingForEvents.isEmpty()) { + // we've got what we need; mark the queue + queue.offer(endOfEvents); + } else { + LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events."); + } + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (revision 1629103) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (working copy) @@ -38,8 +38,9 @@ import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; @@ -243,9 +244,14 @@ } @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + pruner.processVertex(stateUpdate.getVertexName()); + } + + @Override public void handleInputInitializerEvent(List events) throws Exception { for (InputInitializerEvent e : events) { - pruner.getQueue().put(e); + pruner.addEvent(e); } } }