diff --git pom.xml pom.xml index d95f60252bf3c2bb9784b988baa3bc5d63903a9a..670278ca56eeba7b068e3c4126df2f1ffeed8c3d 100644 --- pom.xml +++ pom.xml @@ -151,7 +151,7 @@ 1.0.1 1.7.5 4.0.4 - 0.5.0 + 0.6.0-SNAPSHOT 2.2.0 1.1 0.2 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java index 7315be5ef423349cd2f26af06be90c0c3439983d..99bc6a7d6c81e716921e8bbffea28899ec17cd7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java @@ -60,9 +60,6 @@ public void initializeOp(Configuration hconf) throws HiveException { private void initDataBuffer(boolean skipPruning) throws HiveException { buffer = new DataOutputBuffer(); try { - // where does this go to? - buffer.writeUTF(((TezContext) TezContext.get()).getTezProcessorContext().getTaskVertexName()); - // add any other header info getConf().writeEventHeader(buffer); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java index 78d6cf508dfd838c890ff46f14fa993365c0b700..9227b7a6f760f7c29220f2a0c316b75f11a68025 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +60,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -77,12 +79,13 @@ private final BytesWritable writable = new BytesWritable(); - private final BlockingQueue queue = - new LinkedBlockingQueue(); + private final BlockingQueue queue = new LinkedBlockingQueue(); + + private final Set sourcesWaitingForEvents = new ConcurrentSkipListSet(); private int sourceInfoCount = 0; - private InputInitializerContext context; + private final Object endOfEvents = new Object(); public DynamicPartitionPruner() { } @@ -91,8 +94,16 @@ public void prune(MapWork work, JobConf jobConf, InputInitializerContext context throws SerDeException, IOException, InterruptedException, HiveException { - this.context = context; - this.initialize(work, jobConf); + initialize(work, jobConf); + + if (sourcesWaitingForEvents.isEmpty()) { + return; + } + + Set states = Collections.singleton(VertexState.SUCCEEDED); + for (String source : sourcesWaitingForEvents) { + context.registerForVertexStateUpdates(source, states); + } LOG.info("Waiting for events (" + sourceInfoCount + " items) ..."); // synchronous event processing loop. Won't return until all events have @@ -102,7 +113,7 @@ public void prune(MapWork work, JobConf jobConf, InputInitializerContext context LOG.info("Ok to proceed."); } - public BlockingQueue getQueue() { + public BlockingQueue getQueue() { return queue; } @@ -111,11 +122,14 @@ private void clear() { sourceInfoCount = 0; } - private void initialize(MapWork work, JobConf jobConf) throws SerDeException { + public void initialize(MapWork work, JobConf jobConf) throws SerDeException { this.clear(); Map columnMap = new HashMap(); + Set sources = work.getEventSourceTableDescMap().keySet(); - for (String s : work.getEventSourceTableDescMap().keySet()) { + sourcesWaitingForEvents.addAll(sources); + + for (String s : sources) { List tables = work.getEventSourceTableDescMap().get(s); List columnNames = work.getEventSourceColumnNameMap().get(s); List partKeyExprs = work.getEventSourcePartKeyExprMap().get(s); @@ -277,46 +291,35 @@ public SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobC private void processEvents() throws SerDeException, IOException, InterruptedException { int eventCount = 0; - int neededEvents = getExpectedNumberOfEvents(); - while (neededEvents > eventCount) { - InputInitializerEvent event = queue.take(); + while (true) { + Object element = queue.take(); + + if (element == endOfEvents) { + // we're done processing events + break; + } + + InputInitializerEvent event = (InputInitializerEvent) element; + + if (!sourcesWaitingForEvents.contains(event.getSourceVertexName())) { + // don't need the event + continue; + } + LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName() + ", " + (event.getUserPayload().limit() - event.getUserPayload().position())); - processPayload(event.getUserPayload()); + processPayload(event.getUserPayload(), event.getSourceVertexName()); eventCount += 1; - neededEvents = getExpectedNumberOfEvents(); - LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount); } - } - - private int getExpectedNumberOfEvents() throws InterruptedException { - int neededEvents = 0; - - boolean notInitialized; - do { - neededEvents = 0; - notInitialized = false; - for (String s : sourceInfoMap.keySet()) { - int multiplier = sourceInfoMap.get(s).size(); - int taskNum = context.getVertexNumTasks(s); - LOG.info("Vertex " + s + " has " + taskNum + " events."); - if (taskNum < 0) { - notInitialized = true; - Thread.sleep(10); - continue; - } - neededEvents += (taskNum * multiplier); - } - } while (notInitialized); - - return neededEvents; + LOG.info("Received events: " + eventCount); } @SuppressWarnings("deprecation") - private String processPayload(ByteBuffer payload) throws SerDeException, IOException { + private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException, + IOException { + DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload)); - String sourceName = in.readUTF(); String columnName = in.readUTF(); boolean skip = in.readBoolean(); @@ -390,4 +393,15 @@ public int read(byte[] bytes, int off, int len) throws IOException { } } + public void processVertex(String name) { + LOG.info("Vertex succeeded: " + name); + sourcesWaitingForEvents.remove(name); + + if (sourcesWaitingForEvents.isEmpty()) { + // we've got what we need; mark the queue + queue.offer(endOfEvents); + } else { + LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events."); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 6e1379edfbb197b2a687fcf00d29b1f516f0fcb1..44d8b7f2e464e0979df3d3fdac9f9bce2eb52e29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -38,8 +38,9 @@ import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; @@ -230,6 +231,11 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) { } @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + pruner.processVertex(stateUpdate.getVertexName()); + } + + @Override public void handleInputInitializerEvent(List events) throws Exception { for (InputInitializerEvent e : events) { pruner.getQueue().put(e);