From 19cbacf05b641d0a3f892f6f9a7dc288170253da Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Mon, 4 Aug 2014 09:57:42 -0700 Subject: [PATCH] HIVE-7492: Enhance output collector for Hive(Map|Reduce)Function. --- .../ql/exec/spark/HiveBaseFunctionResultList.java | 127 +++++++++++++++++++++ .../hive/ql/exec/spark/HiveKVResultCache.java | 116 +++++++++++++++++++ .../hadoop/hive/ql/exec/spark/HiveMapFunction.java | 15 +-- .../ql/exec/spark/HiveMapFunctionResultList.java | 60 ++++++++++ .../hive/ql/exec/spark/HiveReduceFunction.java | 20 +--- .../exec/spark/HiveReduceFunctionResultList.java | 61 ++++++++++ .../hadoop/hive/ql/exec/spark/SparkCollector.java | 56 --------- .../hive/ql/exec/spark/TestHiveKVResultCache.java | 89 +++++++++++++++ 8 files changed, 460 insertions(+), 84 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java new file mode 100644 index 0000000..03500df --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.OutputCollector; +import scala.Tuple2; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Base class for + * - collecting Map/Reduce function output and + * - providing an Iterable interface for fetching output records. Input records + * are processed in lazy fashion i.e when output records are requested + * through Iterator interface. + */ +public abstract class HiveBaseFunctionResultList implements + Iterable, OutputCollector, Serializable { + + private final Iterator inputIterator; + + // Contains results from last processed input record. + private final HiveKVResultCache lastRecordOutput; + private boolean iteratorAlreadyCreated = false; + + public HiveBaseFunctionResultList(Configuration conf, Iterator inputIterator) { + this.inputIterator = inputIterator; + this.lastRecordOutput = new HiveKVResultCache(conf); + } + + @Override + public Iterator iterator() { + Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once."); + iteratorAlreadyCreated = true; + return new ResultIterator(); + } + + @Override + public void collect(BytesWritable key, BytesWritable value) throws IOException { + lastRecordOutput.add(copyBytesWritable(key), copyBytesWritable(value)); + } + + private static BytesWritable copyBytesWritable(BytesWritable bw) { + BytesWritable copy = new BytesWritable(); + copy.set(bw); + return copy; + } + + /** Process the given record. */ + protected abstract void processNextRecord(T inputRecord) throws IOException; + + /** Is the current state of the record processor done? */ + protected abstract boolean processingDone(); + + /** Close the record processor */ + protected abstract void closeRecordProcessor(); + + /** Implement Iterator interface */ + public class ResultIterator implements Iterator { + @Override + public boolean hasNext(){ + // Return remaining records (if any) from last processed input record. + if (lastRecordOutput.hasNext()) { + return true; + } + + lastRecordOutput.clear(); + + // Process the records in the input iterator until + // - new output records are available for serving downstream operator, + // - input records are exhausted or + // - processing is completed. + while (inputIterator.hasNext() && !processingDone()) { + try { + processNextRecord(inputIterator.next()); + if (lastRecordOutput.hasNext()) { + return true; + } + } catch (IOException ex) { + // TODO: better handling of exception. + throw new RuntimeException("Error while processing input.", ex); + } + } + + // At this point we are done processing the input. Close the record processor + closeRecordProcessor(); + lastRecordOutput.clear(); + return false; + } + + @Override + public Tuple2 next() { + if (hasNext()) { + return lastRecordOutput.next(); + } + throw new NoSuchElementException("There are no more elements"); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator.remove() is not supported"); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java new file mode 100644 index 0000000..9725c4f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.Reporter; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +/** + * Wrapper around {@link org.apache.hadoop.hive.ql.exec.persistence.RowContainer} + */ +public class HiveKVResultCache { + + public static final int IN_MEMORY_CACHE_SIZE = 512; + private static final String COL_NAMES = "key,value"; + private static final String COL_TYPES = + serdeConstants.BINARY_TYPE_NAME + ":" + serdeConstants.BINARY_TYPE_NAME; + + private RowContainer container; + private int cursor = 0; + + public HiveKVResultCache(Configuration conf) { + initRowContainer(conf); + } + + private void initRowContainer(Configuration conf) { + try { + container = new RowContainer(IN_MEMORY_CACHE_SIZE, conf, Reporter.NULL); + + String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + TableDesc tableDesc = + PlanUtils.getDefaultQueryOutputTableDesc(COL_NAMES, COL_TYPES, fileFormat); + + SerDe serDe = (SerDe) tableDesc.getDeserializer(); + ObjectInspector oi = ObjectInspectorUtils.getStandardObjectInspector( + serDe.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + + container.setSerDe(serDe, oi); + container.setTableDesc(tableDesc); + } catch(Exception ex) { + throw new RuntimeException("Failed to create RowContainer", ex); + } + } + + public void add(BytesWritable key, BytesWritable value) { + List row = new ArrayList(2); + row.add(key); + row.add(value); + + try { + container.addRow(row); + } catch(HiveException ex) { + throw new RuntimeException("Failed to add KV pair to RowContainer", ex); + } + } + + public void clear() { + try { + container.clearRows(); + } catch(HiveException ex) { + throw new RuntimeException("Failed to clear rows in RowContainer", ex); + } + cursor = 0; + } + + public boolean hasNext() { + return container.rowCount() > 0 && cursor < container.rowCount(); + } + + public Tuple2 next() { + Preconditions.checkState(hasNext()); + + try { + List row; + if (cursor == 0) { + row = container.first(); + } else { + row = container.next(); + } + cursor++; + return new Tuple2(row.get(0), row.get(1)); + } catch(HiveException ex) { + throw new RuntimeException("Failed to get row from RowContainer", ex); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 93a6592..b18dcbf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -32,8 +32,6 @@ BytesWritable, BytesWritable> { private static final long serialVersionUID = 1L; - private transient ExecMapper mapper; - private transient SparkCollector collector; private transient JobConf jobConf; private byte[] buffer; @@ -47,19 +45,12 @@ public HiveMapFunction(byte[] buffer) { call(Iterator> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); - mapper = new ExecMapper(); - mapper.configure(jobConf); - collector = new SparkCollector(); } - collector.clear(); - while(it.hasNext() && !ExecMapper.getDone()) { - Tuple2 input = it.next(); - mapper.map(input._1(), input._2(), collector, Reporter.NULL); - } + ExecMapper mapper = new ExecMapper(); + mapper.configure(jobConf); - mapper.close(); - return collector.getResult(); + return new HiveMapFunctionResultList(jobConf, it, mapper); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java new file mode 100644 index 0000000..90be4d6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.Reporter; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Iterator; + +public class HiveMapFunctionResultList extends + HiveBaseFunctionResultList> { + private final ExecMapper mapper; + + /** + * Instantiate result set Iterable for Map function output. + * + * @param inputIterator Input record iterator. + * @param mapper Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecMapper} instance. + */ + public HiveMapFunctionResultList(Configuration conf, + Iterator> inputIterator, ExecMapper mapper) { + super(conf, inputIterator); + this.mapper = mapper; + } + + @Override + protected void processNextRecord(Tuple2 inputRecord) + throws IOException { + mapper.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL); + } + + @Override + protected boolean processingDone() { + return ExecMapper.getDone(); + } + + @Override + protected void closeRecordProcessor() { + mapper.close(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index dd81013..fa12b0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -32,8 +32,6 @@ BytesWritable, BytesWritable> { private static final long serialVersionUID = 1L; - private transient ExecReducer reducer; - private transient SparkCollector collector; private transient JobConf jobConf; private byte[] buffer; @@ -47,21 +45,11 @@ public HiveReduceFunction(byte[] buffer) { call(Iterator>> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); - jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); - - reducer = new ExecReducer(); - reducer.configure(jobConf); - collector = new SparkCollector(); - } - - collector.clear(); - while (it.hasNext()) { - Tuple2> tup = it.next(); - reducer.reduce(tup._1(), tup._2().iterator(), collector, Reporter.NULL); + jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); } - reducer.close(); - return collector.getResult(); + ExecReducer reducer = new ExecReducer(); + reducer.configure(jobConf); + return new HiveReduceFunctionResultList(jobConf, it, reducer); } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java new file mode 100644 index 0000000..f877d35 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.Reporter; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Iterator; + +public class HiveReduceFunctionResultList extends + HiveBaseFunctionResultList>> { + private final ExecReducer reducer; + + /** + * Instantiate result set Iterable for Reduce function output. + * + * @param inputIterator Input record iterator. + * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. + */ + public HiveReduceFunctionResultList(Configuration conf, + Iterator>> inputIterator, + ExecReducer reducer) { + super(conf, inputIterator); + this.reducer = reducer; + } + + @Override + protected void processNextRecord(Tuple2> inputRecord) + throws IOException { + reducer.reduce(inputRecord._1(), inputRecord._2().iterator(), this, Reporter.NULL); + } + + @Override + protected boolean processingDone() { + return false; + } + + @Override + protected void closeRecordProcessor() { + reducer.close(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java deleted file mode 100644 index 3894630..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.OutputCollector; - -import scala.Tuple2; - -public class SparkCollector implements OutputCollector, Serializable { - private static final long serialVersionUID = 1L; - - private List> result = new ArrayList>(); - - @Override - public void collect(BytesWritable key, BytesWritable value) throws IOException { - result.add(new Tuple2(copyBytesWritable(key), copyBytesWritable(value))); - } - - // TODO: Move this to a utility class. - public static BytesWritable copyBytesWritable(BytesWritable bw) { - BytesWritable copy = new BytesWritable(); - copy.set(bw); - return copy; - } - - public void clear() { - result.clear(); - } - - public List> getResult() { - return result; - } - -} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java new file mode 100644 index 0000000..64a4c5a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.io.BytesWritable; +import org.junit.Test; +import scala.Tuple2; + +import static org.junit.Assert.assertTrue; + +public class TestHiveKVResultCache { + @Test + public void testSimple() throws Exception { + // Create KV result cache object, add one (k,v) pair and retrieve them. + HiveConf conf = new HiveConf(); + HiveKVResultCache cache = new HiveKVResultCache(conf); + + BytesWritable key = new BytesWritable("key".getBytes()); + BytesWritable value = new BytesWritable("value".getBytes()); + cache.add(key, value); + + assertTrue("KV result cache should have at least one element", cache.hasNext()); + + Tuple2 row = cache.next(); + assertTrue("Incorrect key", row._1().equals(key)); + assertTrue("Incorrect value", row._2().equals(value)); + + assertTrue("Cache shouldn't have more records", !cache.hasNext()); + } + + @Test + public void testSpilling() throws Exception { + HiveConf conf = new HiveConf(); + HiveKVResultCache cache = new HiveKVResultCache(conf); + + final int recordCount = HiveKVResultCache.IN_MEMORY_CACHE_SIZE * 3; + + // Test using the same cache where first n rows are inserted then cache is cleared. + // Next reuse the same cache and insert another m rows and verify the cache stores correctly. + // This simulates reusing the same cache over and over again. + testSpillingHelper(cache, recordCount); + testSpillingHelper(cache, 1); + testSpillingHelper(cache, recordCount); + } + + /** Helper method which inserts numRecords and retrieves them from cache and verifies */ + private void testSpillingHelper(HiveKVResultCache cache, int numRecords) { + for(int i=0; i row = cache.next(); + assertTrue("Unexpected key at position: " + recordsSeen, + new String(row._1().getBytes()).equals(key)); + assertTrue("Unexpected value at position: " + recordsSeen, + new String(row._2().getBytes()).equals(value)); + + recordsSeen++; + } + + assertTrue("Retrieved record count doesn't match inserted record count", + numRecords == recordsSeen); + + cache.clear(); + } +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48)