diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8264b16..a6af7e7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -687,16 +687,9 @@ // need to remove by hive .13. Also, do not change default (see SMB operator) HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100, ""), - HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, + HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", false, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), - HIVEMAPJOINUSEOPTIMIZEDKEYS("hive.mapjoin.optimized.keys", true, - "Whether MapJoin hashtable should use optimized (size-wise), keys, allowing the table to take less\n" + - "memory. Depending on key, the memory savings for entire table can be 5-15% or so."), - HIVEMAPJOINLAZYHASHTABLE("hive.mapjoin.lazy.hashtable", true, - "Whether MapJoin hashtable should deserialize values on demand. Depending on how many values in\n" + - "the table the join will actually touch, it can save a lot of memory by not creating objects for\n" + - "rows that are not needed. If all rows are needed obviously there's no gain."), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java new file mode 100644 index 0000000..80315c3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.ObjectOutputStream; +import java.util.AbstractCollection; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +@SuppressWarnings("deprecation") +public class FlatRowContainer extends AbstractCollection + implements MapJoinRowContainer, AbstractRowContainer.RowIterator>, List { + private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + private static final int UNKNOWN = Integer.MAX_VALUE; + + private static Log LOG = LogFactory.getLog(FlatRowContainer.class); + + /** + * In lazy mode, 0s element contains context for deserialization and all the other + * elements contains byte arrays to be deserialized. After deserialization, the array + * contains row count * row size elements - a matrix of rows stored. + */ + private Object[] array; + /** + * This is kind of tricky. UNKNOWN number means unknown. Other positive numbers represent + * row length (see array javadoc). Non-positive numbers mean row length is zero (thus, + * array is empty); they represent (negated) number of rows (for joins w/o projections). + */ + private int rowLength = UNKNOWN; + private byte aliasFilter = (byte) 0xff; + private boolean isAliasFilterSet = true; // by default assume no filter tag so we are good + + public FlatRowContainer() { + this.array = EMPTY_OBJECT_ARRAY; + } + + /** Called when loading the hashtable. */ + public void add(MapJoinObjectSerDeContext context, + BytesWritable value) throws HiveException { + SerDe serde = context.getSerDe(); + isAliasFilterSet = !context.hasFilterTag(); // has tag => need to set later + if (rowLength == UNKNOWN) { + try { + rowLength = ObjectInspectorUtils.getStructSize(serde.getObjectInspector()); + } catch (SerDeException ex) { + throw new HiveException("Get structure size error", ex); + } + if (rowLength == 0) { + array = EMPTY_OBJECT_ARRAY; + } + } + if (rowLength > 0) { + int rowCount = (array.length / rowLength); + listRealloc(array.length + rowLength); + read(serde, value, rowCount); + } else { + --rowLength; // see rowLength javadoc + } + } + + // Implementation of AbstractRowContainer and assorted methods + + @Override + public void addRow(List t) throws HiveException { + LOG.debug("Add is called with " + t.size() + " objects"); + // This is not called when building HashTable; we don't expect it to be called ever. + int offset = prepareForAdd(t.size()); + if (offset < 0) return; + for (int i = 0; i < t.size(); ++i) { + this.array[offset + i] = t.get(i); + } + } + + @Override + public void addRow(Object[] value) throws HiveException { + LOG.debug("Add is called with " + value.length + " objects"); + // This is not called when building HashTable; we don't expect it to be called ever. + int offset = prepareForAdd(value.length); + if (offset < 0) return; + System.arraycopy(value, 0, this.array, offset, value.length); + } + + private int prepareForAdd(int len) throws HiveException { + if (rowLength < 0) { + if (len != 0) { + throw new HiveException("Different size rows: 0 and " + len); + } + --rowLength; // see rowLength javadoc + return -1; + } + if (rowLength != len) { + throw new HiveException("Different size rows: " + rowLength + " and " + len); + } + int oldLen = this.array.length; + listRealloc(oldLen + len); + return oldLen; + } + + @Override + public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) { + throw new UnsupportedOperationException(this.getClass().getName() + " cannot be serialized"); + } + + @Override + public AbstractRowContainer.RowIterator> rowIter() throws HiveException { + if (array.length == rowLength) { + // optimize for common case - just one row for a key, container acts as iterator + return this; + } + return rowLength > 0 ? new RowIterator() : new EmptyRowIterator(-rowLength); + } + + @Override + public List first() throws HiveException { + if (array.length != rowLength) { + throw new AssertionError("Incorrect iterator usage, not single-row"); + } + return this; // optimize for common case - just one row for a key, container acts as row + } + + @Override + public List next() { + return null; // single-row case, there's no next + } + + /** Iterator for row length 0. */ + private static class EmptyRowIterator implements AbstractRowContainer.RowIterator> { + private static final List EMPTY_ROW = new ArrayList(); + private int rowCount; + public EmptyRowIterator(int rowCount) { + this.rowCount = rowCount; + } + + @Override + public List first() throws HiveException { + return next(); + } + + @Override + public List next() throws HiveException { + return (--rowCount < 0) ? null : EMPTY_ROW; + } + } + + /** Row iterator for non-zero-length rows. */ + private class RowIterator implements AbstractRowContainer.RowIterator> { + private int index = 0; + + @Override + public List first() throws HiveException { + index = 0; + if (array.length > 0) { + return new ReadOnlySubList(0, rowLength); + } + return null; + } + + @Override + public List next() { + index += rowLength; + if (index < array.length) { + return new ReadOnlySubList(index, rowLength); + } + return null; + } + } + + private void read(SerDe serde, Writable writable, int rowOffset) throws HiveException { + try { + ObjectInspectorUtils.copyStructToArray( + serde.deserialize(writable), serde.getObjectInspector(), + ObjectInspectorCopyOption.WRITABLE, this.array, rowOffset * rowLength); + } catch (SerDeException ex) { + throw new HiveException("Lazy deserialize error", ex); + } + } + + @Override + public int rowCount() throws HiveException { + return rowLength > 0 ? (array.length / rowLength) : -rowLength; // see rowLength javadoc + } + + @Override + public void clearRows() { + array = EMPTY_OBJECT_ARRAY; + rowLength = 0; + } + + @Override + public byte getAliasFilter() throws HiveException { + ensureAliasFilter(); + return this.aliasFilter; + } + + private void ensureAliasFilter() throws HiveException { + if (!isAliasFilterSet && rowLength > 0) { + for (int offset = rowLength - 1; offset < array.length; offset += rowLength) { + aliasFilter &= ((ShortWritable)array[offset]).get(); + } + } + isAliasFilterSet = true; + } + + @Override + public MapJoinRowContainer copy() throws HiveException { + FlatRowContainer result = new FlatRowContainer(); + result.array = new Object[this.array.length]; + System.arraycopy(this.array, 0, result.array, 0, this.array.length); + result.rowLength = rowLength; + result.aliasFilter = aliasFilter; + return result; + } + + // Implementation of List and assorted methods + + private void listRealloc(int length) { + Object[] array = new Object[length]; + if (this.array.length > 0) { + System.arraycopy(this.array, 0, array, 0, this.array.length); + } + this.array = array; + } + + @Override + public int size() { + checkSingleRow(); + return array.length; + } + + @Override + public Object get(int index) { + return array[index]; + } + + private class ReadOnlySubList extends AbstractList { + private int offset; + private int size; + + ReadOnlySubList(int from, int size) { + this.offset = from; + this.size = size; + } + + public Object get(int index) { + return array[index + offset]; + } + + public int size() { + return size; + } + + public Iterator iterator() { + return listIterator(); + } + + public ListIterator listIterator(int index) { + return listIteratorInternal(offset + index, offset, offset + size); + } + + public List subList(int fromIndex, int toIndex) { + return new ReadOnlySubList(offset + fromIndex, toIndex - fromIndex); + } + + public Object[] toArray() { + Object[] result = new Object[size]; + System.arraycopy(array, offset, result, 0, size); + return result; + } + } // end ReadOnlySubList + + @Override + public Object[] toArray() { + checkSingleRow(); + return array; + } + + @Override + public Iterator iterator() { + return listIterator(); + } + + @Override + public ListIterator listIterator() { + return listIterator(0); + } + + @Override + public ListIterator listIterator(final int index) { + checkSingleRow(); + return listIteratorInternal(index, 0, array.length); + } + + private ListIterator listIteratorInternal( + final int index, final int iterMinPos, final int iterMaxPos) { + return new ListIterator() { + private int pos = index - 1; + public int nextIndex() { + return pos + 1; + } + public int previousIndex() { + return pos - 1; + } + public boolean hasNext() { + return nextIndex() < iterMaxPos; + } + public boolean hasPrevious() { + return previousIndex() >= iterMinPos; + } + public Object next() { + if (!hasNext()) throw new NoSuchElementException(); + return get(++pos); + } + public Object previous() { + if (!hasPrevious()) throw new NoSuchElementException(); + return get(--pos); + } + + public void remove() { throw new UnsupportedOperationException(); } + public void set(Object e) { throw new UnsupportedOperationException(); } + public void add(Object e) { throw new UnsupportedOperationException(); } + }; // end ListIterator + } + + @Override + public int indexOf(Object o) { + checkSingleRow(); + for (int i = 0; i < array.length; ++i) { + if (o == null) { + if (array[i] == null) return i; + } else { + if (o.equals(array[i])) return i; + } + } + return -1; + } + + private void checkSingleRow() throws AssertionError { + if (array.length != rowLength) { + throw new AssertionError("Incorrect list usage, not single-row"); + } + } + + @Override + public int lastIndexOf(Object o) { + checkSingleRow(); + for (int i = array.length - 1; i >= 0; --i) { + if (o == null) { + if (array[i] == null) return i; + } else { + if (o.equals(array[i])) return i; + } + } + return -1; + } + + @Override + public List subList(int fromIndex, int toIndex) { + checkSingleRow(); + return new ReadOnlySubList(fromIndex, toIndex - fromIndex); + } + + public boolean addAll(int index, Collection c) { + throw new UnsupportedOperationException(); + } + public Object set(int index, Object element) { throw new UnsupportedOperationException(); } + public void add(int index, Object element) { throw new UnsupportedOperationException(); } + public Object remove(int index) { throw new UnsupportedOperationException(); } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index cebe2ff..3adaab7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -58,8 +58,6 @@ private static final float LOADFACTOR = 0.75f; private final HashMap mHash; // main memory HashMap private MapJoinKey lastKey = null; - private final boolean useLazyRows; - private final boolean useOptimizedKeys; private Output output = new Output(0); // Reusable output for serialization public HashMapWrapper(Map metaData) { @@ -67,30 +65,24 @@ public HashMapWrapper(Map metaData) { int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME)); float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME)); mHash = new HashMap(threshold, loadFactor); - useLazyRows = useOptimizedKeys = false; } public HashMapWrapper() { this(HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT.defaultFloatVal, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD.defaultIntVal, - HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, false, false, -1); + HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, -1); } public HashMapWrapper(Configuration hconf, long keyCount) { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), - HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE), - HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS), keyCount); + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), keyCount); } - private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor, - boolean useLazyRows, boolean useOptimizedKeys, long keyCount) { + private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor, long keyCount) { super(createConstructorMetaData(threshold, loadFactor)); threshold = calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount); mHash = new HashMap(threshold, loadFactor); - this.useLazyRows = useLazyRows; - this.useOptimizedKeys = useOptimizedKeys; } public static int calculateTableSize( @@ -131,21 +123,14 @@ public void clear() { public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, MapJoinObjectSerDeContext valueContext, Writable currentValue) throws SerDeException, HiveException { - // We pass key in as reference, to find out quickly if optimized keys can be used. - // However, we do not reuse the object since we are putting them into the hashmap. - // Later, we don't create optimized keys in MapJoin if hash map doesn't have optimized keys. - if (lastKey == null && !useOptimizedKeys) { - lastKey = new MapJoinKeyObject(); - } - - lastKey = MapJoinKey.read(output, lastKey, keyContext, currentKey, false); - LazyFlatRowContainer values = (LazyFlatRowContainer)get(lastKey); + MapJoinKey key = MapJoinKey.read(output, keyContext, currentKey); + FlatRowContainer values = (FlatRowContainer)get(key); if (values == null) { - values = new LazyFlatRowContainer(); - put(lastKey, values); + values = new FlatRowContainer(); + put(key, values); } - values.add(valueContext, (BytesWritable)currentValue, useLazyRows); - return lastKey; + values.add(valueContext, (BytesWritable)currentValue); + return key; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/LazyFlatRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/LazyFlatRowContainer.java deleted file mode 100644 index 089871c..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/LazyFlatRowContainer.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.ObjectOutputStream; -import java.util.AbstractCollection; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Writable; - -@SuppressWarnings("deprecation") -public class LazyFlatRowContainer extends AbstractCollection - implements MapJoinRowContainer, AbstractRowContainer.RowIterator>, List { - private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; - private static final int UNKNOWN = Integer.MAX_VALUE; - - private static Log LOG = LogFactory.getLog(LazyFlatRowContainer.class); - - /** - * In lazy mode, 0s element contains context for deserialization and all the other - * elements contains byte arrays to be deserialized. After deserialization, the array - * contains row count * row size elements - a matrix of rows stored. - */ - private Object[] array; - /** - * This is kind of tricky. UNKNOWN number means lazy mode. Other positive numbers represent - * row length (see array javadoc). Non-positive numbers mean row length is zero (thus, - * array is empty); they represent (negated) number of rows (for joins w/o projections). - */ - private int rowLength = UNKNOWN; - private byte aliasFilter = (byte) 0xff; - - public LazyFlatRowContainer() { - this.array = EMPTY_OBJECT_ARRAY; - } - - /** Called when loading the hashtable. */ - public void add(MapJoinObjectSerDeContext context, - BytesWritable value, boolean allowLazy) throws HiveException { - if (allowLazy) { - addLazy(context, value); - return; - } - SerDe serde = context.getSerDe(); - boolean hasValues = isLazy() ? setRowLength(serde, 0) : (rowLength > 0); - int rowCount = rowCount(); - if (hasValues) { - listRealloc(array.length + rowLength); - read(serde, value, rowCount); - } else { - --rowLength; // see rowLength javadoc - } - } - - private void addLazy(MapJoinObjectSerDeContext valueContext, BytesWritable currentValue) { - if (!isLazy()) { - throw new AssertionError("Not in lazy mode"); - } - int size = this.array.length; - if (size == 0) { - // TODO: we store valueContext needlessly in each RowContainer because the - // accessor cannot pass it to us for lazy deserialization. - listRealloc(2); - this.array[0] = valueContext; - ++size; - } else { - if (this.array[0] != valueContext) { - throw new AssertionError("Different valueContext for the same table"); - } - listRealloc(size + 1); - } - byte[] rawData = new byte[currentValue.getSize()]; - System.arraycopy(currentValue.getBytes(), 0, rawData, 0, rawData.length); - this.array[size] = rawData; - } - - // Implementation of AbstractRowContainer and assorted methods - - @Override - public void addRow(List t) throws HiveException { - LOG.debug("Add is called with " + t.size() + " objects"); - // This is not called when building HashTable; we don't expect it to be called ever. - int offset = prepareForAdd(t.size()); - if (offset < 0) return; - for (int i = 0; i < t.size(); ++i) { - this.array[offset + i] = t.get(i); - } - } - - @Override - public void addRow(Object[] value) throws HiveException { - LOG.debug("Add is called with " + value.length + " objects"); - // This is not called when building HashTable; we don't expect it to be called ever. - int offset = prepareForAdd(value.length); - if (offset < 0) return; - System.arraycopy(value, 0, this.array, offset, value.length); - } - - private int prepareForAdd(int len) throws HiveException { - if (isLazy()) { - throw new AssertionError("Cannot add in lazy mode"); - } - if (rowLength < 0) { - if (len != 0) { - throw new HiveException("Different size rows: 0 and " + len); - } - --rowLength; // see rowLength javadoc - return -1; - } - if (rowLength != len) { - throw new HiveException("Different size rows: " + rowLength + " and " + len); - } - int oldLen = this.array.length; - listRealloc(oldLen + len); - return oldLen; - } - - @Override - public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) { - throw new UnsupportedOperationException(this.getClass().getName() + " cannot be serialized"); - } - - @Override - public AbstractRowContainer.RowIterator> rowIter() throws HiveException { - ensureEager(); // if someone wants an iterator they are probably going to use it - if (array.length == rowLength) { - // optimize for common case - just one row for a key, container acts as iterator - return this; - } - return rowLength > 0 ? new RowIterator() : new EmptyRowIterator(-rowLength); - } - - @Override - public List first() throws HiveException { - if (isLazy()) { - throw new AssertionError("In lazy mode"); - } - if (array.length != rowLength) { - throw new AssertionError("Incorrect iterator usage, not single-row"); - } - return this; // optimize for common case - just one row for a key, container acts as row - } - - @Override - public List next() { - return null; // single-row case, there's no next - } - - /** Iterator for row length 0. */ - private static class EmptyRowIterator implements AbstractRowContainer.RowIterator> { - private static final List EMPTY_ROW = new ArrayList(); - private int rowCount; - public EmptyRowIterator(int rowCount) { - this.rowCount = rowCount; - } - - @Override - public List first() throws HiveException { - return next(); - } - - @Override - public List next() throws HiveException { - return (--rowCount < 0) ? null : EMPTY_ROW; - } - } - - /** Row iterator for non-zero-length rows. */ - private class RowIterator implements AbstractRowContainer.RowIterator> { - private int index = 0; - - @Override - public List first() throws HiveException { - assert !isLazy(); - index = 0; - if (array.length > 0) { - return new ReadOnlySubList(0, rowLength); - } - return null; - } - - @Override - public List next() { - assert !isLazy(); - index += rowLength; - if (index < array.length) { - return new ReadOnlySubList(index, rowLength); - } - return null; - } - } - - private void ensureEager() throws HiveException { - if (!isLazy()) return; - if (this.array.length == 0) { - rowLength = 0; - return; - } - Object[] lazyObjs = this.array; - assert lazyObjs.length > 1; - MapJoinObjectSerDeContext context = (MapJoinObjectSerDeContext)lazyObjs[0]; - SerDe serde = context.getSerDe(); - int rowCount = lazyObjs.length - 1; - if (!setRowLength(serde, rowCount)) return; - - this.array = new Object[rowLength * rowCount]; - ByteBufferWritable writable = new ByteBufferWritable(); - for (int i = 0; i < rowCount; ++i) { - writable.setBytes((byte[])lazyObjs[i + 1]); - read(serde, writable, i); - } - setAliasFilter(context); - } - - private boolean setRowLength(SerDe serde, int rowCount) throws HiveException { - try { - rowLength = ObjectInspectorUtils.getStructSize(serde.getObjectInspector()); - } catch (SerDeException ex) { - throw new HiveException("Get structure size error", ex); - } - if (rowLength == 0) { - rowLength = -rowCount; // see javadoc for rowLength - array = EMPTY_OBJECT_ARRAY; - return false; - } - return true; - } - - private void read(SerDe serde, Writable writable, int rowOffset) throws HiveException { - try { - ObjectInspectorUtils.copyStructToArray( - serde.deserialize(writable), serde.getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE, this.array, rowOffset * rowLength); - } catch (SerDeException ex) { - throw new HiveException("Lazy deserialize error", ex); - } - } - - private boolean isLazy() { - return rowLength == UNKNOWN; - } - - @Override - public int rowCount() throws HiveException { - ensureEager(); - return rowLength > 0 ? (array.length / rowLength) : -rowLength; // see rowLength javadoc - } - - @Override - public void clearRows() { - assert !isLazy(); - array = EMPTY_OBJECT_ARRAY; - rowLength = 0; - } - - @Override - public byte getAliasFilter() throws HiveException { - ensureEager(); - return this.aliasFilter; - } - - private void setAliasFilter(MapJoinObjectSerDeContext context) throws HiveException { - if (isLazy()) { - throw new AssertionError("In lazy mode"); - } - if (rowLength <= 0 || !context.hasFilterTag()) return; - for (int offset = rowLength - 1; offset < array.length; offset += rowLength) { - aliasFilter &= ((ShortWritable)array[offset]).get(); - } - } - - @Override - public MapJoinRowContainer copy() throws HiveException { - ensureEager(); // If someone wants a copy they are probably going to use it. - LazyFlatRowContainer result = new LazyFlatRowContainer(); - result.array = new Object[this.array.length]; - System.arraycopy(this.array, 0, result.array, 0, this.array.length); - result.rowLength = rowLength; - result.aliasFilter = aliasFilter; - return result; - } - - // Implementation of List and assorted methods - - private void listRealloc(int length) { - Object[] array = new Object[length]; - if (this.array.length > 0) { - System.arraycopy(this.array, 0, array, 0, this.array.length); - } - this.array = array; - } - - @Override - public int size() { - checkSingleRow(); - return array.length; - } - - @Override - public Object get(int index) { - return array[index]; - } - - private class ReadOnlySubList extends AbstractList { - private int offset; - private int size; - - ReadOnlySubList(int from, int size) { - this.offset = from; - this.size = size; - } - - public Object get(int index) { - return array[index + offset]; - } - - public int size() { - return size; - } - - public Iterator iterator() { - return listIterator(); - } - - public ListIterator listIterator(int index) { - return listIteratorInternal(offset + index, offset, offset + size); - } - - public List subList(int fromIndex, int toIndex) { - return new ReadOnlySubList(offset + fromIndex, toIndex - fromIndex); - } - - public Object[] toArray() { - Object[] result = new Object[size]; - System.arraycopy(array, offset, result, 0, size); - return result; - } - } // end ReadOnlySubList - - @Override - public Object[] toArray() { - checkSingleRow(); - return array; - } - - @Override - public Iterator iterator() { - return listIterator(); - } - - @Override - public ListIterator listIterator() { - return listIterator(0); - } - - @Override - public ListIterator listIterator(final int index) { - checkSingleRow(); - return listIteratorInternal(index, 0, array.length); - } - - private ListIterator listIteratorInternal( - final int index, final int iterMinPos, final int iterMaxPos) { - return new ListIterator() { - private int pos = index - 1; - public int nextIndex() { - return pos + 1; - } - public int previousIndex() { - return pos - 1; - } - public boolean hasNext() { - return nextIndex() < iterMaxPos; - } - public boolean hasPrevious() { - return previousIndex() >= iterMinPos; - } - public Object next() { - if (!hasNext()) throw new NoSuchElementException(); - return get(++pos); - } - public Object previous() { - if (!hasPrevious()) throw new NoSuchElementException(); - return get(--pos); - } - - public void remove() { throw new UnsupportedOperationException(); } - public void set(Object e) { throw new UnsupportedOperationException(); } - public void add(Object e) { throw new UnsupportedOperationException(); } - }; // end ListIterator - } - - /** Fake writable that can be reset with different bytes. */ - private static class ByteBufferWritable extends BinaryComparable implements Writable { - byte[] bytes = null; - - @Override - public byte[] getBytes() { - return bytes; - } - - @Override - public int getLength() { - return bytes.length; - } - - public void setBytes(byte[] bytes) { - this.bytes = bytes; - } - - public void readFields(DataInput arg0) { throw new UnsupportedOperationException(); } - public void write(DataOutput arg0) { throw new UnsupportedOperationException(); } - } // end ByteBufferWritable - - - @Override - public int indexOf(Object o) { - checkSingleRow(); - for (int i = 0; i < array.length; ++i) { - if (o == null) { - if (array[i] == null) return i; - } else { - if (o.equals(array[i])) return i; - } - } - return -1; - } - - private void checkSingleRow() throws AssertionError { - if (array.length != rowLength) { - throw new AssertionError("Incorrect list usage, not single-row"); - } - } - - @Override - public int lastIndexOf(Object o) { - checkSingleRow(); - for (int i = array.length - 1; i >= 0; --i) { - if (o == null) { - if (array[i] == null) return i; - } else { - if (o.equals(array[i])) return i; - } - } - return -1; - } - - @Override - public List subList(int fromIndex, int toIndex) { - checkSingleRow(); - return new ReadOnlySubList(fromIndex, toIndex - fromIndex); - } - - public boolean addAll(int index, Collection c) { - throw new UnsupportedOperationException(); - } - public Object set(int index, Object element) { throw new UnsupportedOperationException(); } - public void add(int index, Object element) { throw new UnsupportedOperationException(); } - public Object remove(int index) { throw new UnsupportedOperationException(); } -} - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java index 962af39..6a3c300 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -57,22 +57,11 @@ public abstract void write(MapJoinObjectSerDeContext context, ObjectOutputStream public abstract boolean hasAnyNulls(int fieldCount, boolean[] nullsafes); @SuppressWarnings("deprecation") - public static MapJoinKey read(Output output, MapJoinKey key, - MapJoinObjectSerDeContext context, Writable writable, boolean mayReuseKey) - throws SerDeException, HiveException { + public static MapJoinKey read(Output output, MapJoinObjectSerDeContext context, + Writable writable) throws SerDeException, HiveException { SerDe serde = context.getSerDe(); Object obj = serde.deserialize(writable); - boolean useOptimized = useOptimizedKeyBasedOnPrev(key); - if (useOptimized || key == null) { - byte[] structBytes = serialize(output, obj, serde.getObjectInspector(), !useOptimized); - if (structBytes != null) { - return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes); - } else if (useOptimized) { - throw new SerDeException( - "Failed to serialize " + obj + " even though optimized keys are used"); - } - } - MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject(); + MapJoinKeyObject result = new MapJoinKeyObject(); result.read(serde.getObjectInspector(), obj); return result; } @@ -98,35 +87,6 @@ public static MapJoinKey read(Output output, MapJoinKey key, SUPPORTED_PRIMITIVES.add(PrimitiveCategory.CHAR); } - private static byte[] serialize(Output byteStream, - Object obj, ObjectInspector oi, boolean checkTypes) throws HiveException { - if (null == obj || !(oi instanceof StructObjectInspector)) { - return null; // not supported - } - StructObjectInspector soi = (StructObjectInspector)oi; - List fields = soi.getAllStructFieldRefs(); - int size = fields.size(); - if (size > 8) { - return null; // not supported - } else if (size == 0) { - return EMPTY_BYTE_ARRAY; // shortcut for null keys - } - Object[] fieldData = new Object[size]; - List fieldOis = new ArrayList(size); - for (int i = 0; i < size; ++i) { - StructField field = fields.get(i); - ObjectInspector foi = field.getFieldObjectInspector(); - if (checkTypes && !isSupportedField(foi)) { - return null; - } - fieldData[i] = soi.getStructFieldData(obj, field); - fieldOis.add(foi); - } - - byteStream = serializeRow(byteStream, fieldData, fieldOis, null); - return Arrays.copyOf(byteStream.getData(), byteStream.getLength()); - } - public static boolean isSupportedField(ObjectInspector foi) { if (foi.getCategory() != Category.PRIMITIVE) return false; // not supported PrimitiveCategory pc = ((PrimitiveObjectInspector)foi).getPrimitiveCategory(); @@ -136,19 +96,6 @@ public static boolean isSupportedField(ObjectInspector foi) { public static MapJoinKey readFromVector(Output output, MapJoinKey key, Object[] keyObject, List keyOIs, boolean mayReuseKey) throws HiveException { - boolean useOptimized = useOptimizedKeyBasedOnPrev(key); - if (useOptimized || key == null) { - if (keyObject.length <= 8) { - output = serializeRow(output, keyObject, keyOIs, null); - return MapJoinKeyBytes.fromBytes(key, mayReuseKey, - Arrays.copyOf(output.getData(), output.getLength())); - } - if (useOptimized) { - throw new HiveException( - "Failed to serialize " + Arrays.toString(keyObject) + - " even though optimized keys are used"); - } - } MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject(); result.setKeyObjects(keyObject); return result; @@ -178,32 +125,11 @@ public static Output serializeVector(Output byteStream, VectorHashKeyWrapper kw, public static MapJoinKey readFromRow(Output output, MapJoinKey key, Object[] keyObject, List keyFieldsOI, boolean mayReuseKey) throws HiveException { - boolean useOptimized = useOptimizedKeyBasedOnPrev(key); - if (useOptimized || key == null) { - if (keyObject.length <= 8) { - byte[] structBytes; - if (keyObject.length == 0) { - structBytes = EMPTY_BYTE_ARRAY; // shortcut for null keys - } else { - output = serializeRow(output, keyObject, keyFieldsOI, null); - structBytes = Arrays.copyOf(output.getData(), output.getLength()); - } - return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes); - } - if (useOptimized) { - throw new HiveException( - "Failed to serialize " + Arrays.toString(keyObject) + - " even though optimized keys are used"); - } - } MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject(); result.readFromRow(keyObject, keyFieldsOI); return result; } - private static final Log LOG = LogFactory.getLog(MapJoinKey.class); - - /** * Serializes row to output. * @param byteStream Output to reuse. Can be null, in that case a new one would be created. @@ -228,8 +154,4 @@ public static Output serializeRow(Output byteStream, Object[] fieldData, } return byteStream; } - - private static boolean useOptimizedKeyBasedOnPrev(MapJoinKey key) { - return (key != null) && (key instanceof MapJoinKeyBytes); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyBytes.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyBytes.java deleted file mode 100644 index 20d2bfd..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyBytes.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.ObjectOutputStream; -import java.util.Arrays; - -import org.apache.hadoop.util.hash.MurmurHash; - -/** - * Size-optimized implementation of MapJoinKeyBase. MJK only needs to support equality and - * hashCode, so for simple cases we can write the requisite writables that are part of the - * key into byte array and retain the functionality without storing the writables themselves. - */ -@SuppressWarnings("deprecation") -public class MapJoinKeyBytes extends MapJoinKey { - private static final MurmurHash hash = (MurmurHash)MurmurHash.getInstance(); - /** - * First byte is field count. The rest is written using BinarySortableSerDe. - */ - private byte[] array; - - private void setBytes(byte[] array) { - this.array = array; - } - - @Override - public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) { - throw new UnsupportedOperationException(this.getClass().getName() + " cannot be serialized"); - } - - @Override - public boolean equals(Object obj) { - if (obj == null || !(obj instanceof MapJoinKeyBytes)) return false; - MapJoinKeyBytes other = (MapJoinKeyBytes)obj; - return Arrays.equals(this.array, other.array); - } - - @Override - public int hashCode() { - return hash.hash(array); - } - - @Override - public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { - if (this.array.length == 0) return false; // null key - byte nulls = (byte)(this.array[0]); - for (int i = 0; i < fieldCount; ++i) { - if (((nulls & 1) == 0) && (nullsafes == null || !nullsafes[i])) return true; - nulls >>>= 1; - } - return false; - } - - public static MapJoinKey fromBytes(MapJoinKey key, boolean mayReuseKey, byte[] structBytes) { - MapJoinKeyBytes result = (mayReuseKey && key != null) - ? (MapJoinKeyBytes)key : new MapJoinKeyBytes(); - result.setBytes(structBytes); - return result; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 105a3db..7402ba3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -57,8 +57,6 @@ private ExecMapperContext context; private Configuration hconf; private MapJoinDesc desc; - private MapJoinKey lastKey = null; - private int rowCount = 0; @Override public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) { @@ -111,8 +109,7 @@ public void load( : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { - rowCount++; - lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), + tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), valCtx, (Writable)kvReader.getCurrentValue()); }