diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 79fa1b3..55c5079 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -691,11 +691,11 @@ protected void checkAndGenObject() throws HiveException { Byte alias = order[i]; AbstractRowContainer> alw = storage[alias]; - if (alw.rowCount() != 1) { + if (!alw.isSingleRow()) { allOne = false; } - if (alw.rowCount() == 0) { + if (!alw.hasRows()) { alw.addRow(dummyObj[i]); hasNulls = true; } else if (condn[i].getPreserved()) { @@ -721,16 +721,16 @@ protected void checkAndGenObject() throws HiveException { AbstractRowContainer> alw = storage[alias]; if (noOuterJoin) { - if (alw.rowCount() == 0) { + if (!alw.hasRows()) { return; - } else if (alw.rowCount() > 1) { + } else if (!alw.isSingleRow()) { mayHasMoreThanOne = true; } } else { - if (alw.rowCount() == 0) { + if (!alw.hasRows()) { hasEmpty = true; alw.addRow(dummyObj[i]); - } else if (!hasEmpty && alw.rowCount() == 1) { + } else if (!hasEmpty && alw.isSingleRow()) { if (hasAnyFiltered(alias, alw.rowIter().first())) { hasEmpty = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index cf0b9f0..fcc6c93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -383,7 +383,7 @@ private void joinFinalLeftData() throws HiveException { if (candidateStorage[pos] == null) { continue; } - if (this.candidateStorage[pos].rowCount() > 0) { + if (this.candidateStorage[pos].hasRows()) { dataInCache = true; break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index ab17821..1833f51 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -342,7 +342,7 @@ private void joinFinalLeftData() throws HiveException { joinOneGroup(); dataInCache = false; for (byte pos = 0; pos < order.length; pos++) { - if (this.candidateStorage[pos].rowCount() > 0) { + if (this.candidateStorage[pos].hasRows()) { dataInCache = true; break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java index 7ef5ebd..c3aa2e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java @@ -37,6 +37,17 @@ public void addRow(ROW t) throws HiveException; /** + * @return whether the row container has at least 1 row. + * NOTE: Originally we named this isEmpty, but that name conflicted with another interface. + */ + public boolean hasRows() throws HiveException; + + /** + * @return whether the row container has 1 row. + */ + public boolean isSingleRow() throws HiveException; + + /** * @return number of elements in the RowContainer */ public int rowCount() throws HiveException; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 5488f8c..0958ca3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -184,6 +184,243 @@ public BytesBytesMultiHashMap(int initialCapacity, this(initialCapacity, loadFactor, wbSize, -1); } + /** + * The result of looking up a key in the multi-hash map. + * + * This object can read through the 0, 1, or more values found for the key. + */ + public static class Result { + + // Whether there are more than 0 rows. + private boolean hasRows; + + // We need a pointer to the hash map since this class must be static to support having + // multiple hash tables with Hybrid Grace partitioning. + private BytesBytesMultiHashMap hashMap; + + // These values come from setValueResult when it finds a key. These values allow this + // class to read (and re-read) the values. + private long firstOffset; + private boolean hasList; + private long offsetAfterListRecordKeyLen; + + // When we have multiple values, we save the next value record's offset here. + private long tailOffset; + + // Whether we have read the current value have its ByteSegmentRef. + private boolean haveReadCurrent; + + // 0-based index of which row we are on. + private long readIndex; + + // A reference to the current row. + private WriteBuffers.ByteSegmentRef byteSegmentRef; + + public Result() { + hasRows = false; + byteSegmentRef = new WriteBuffers.ByteSegmentRef(); + } + + /** + * @return Whether there are 1 or more values. + */ + public boolean hasRows() { + // NOTE: Originally we named this isEmpty, but that name conflicted with another interface. + return hasRows; + } + + /** + * @return Whether there is just 1 value row. + */ + public boolean isSingleRow() { + return !hasList; + } + + /** + * Set internal values for reading the values after finding a key. + * + * @param hashMap + * The hash map we found the key in. + * @param firstOffset + * The absolute offset of the first record in the write buffers. + * @param hasList + * Whether there are multiple values (true) or just a single value (false). + * @param offsetAfterListRecordKeyLen + * The offset of just after the key length in the list record. Or, 0 when single row. + */ + public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasList, + long offsetAfterListRecordKeyLen) { + + this.hashMap = hashMap; + + this.firstOffset = firstOffset; + this.hasList = hasList; + this.offsetAfterListRecordKeyLen = offsetAfterListRecordKeyLen; + + // Position at first row. + readIndex = 0; + haveReadCurrent = false; + tailOffset = -1; + + hasRows = true; + } + + /** + * Read the current value. The byteSegmentRef function will return the ByteSegmentRef to + * the current value read. + */ + public void readCurrent() { + if (!hasRows) { + throw new Error("Hash table result has no rows"); + } + + if (haveReadCurrent) { + return; + } + + if (!hasList) { + + /* + * Single value. + */ + + // For a non-list (i.e. single value), the offset is for the variable length long (VLong) + // holding the value length (followed by the key length). + hashMap.writeBuffers.setReadPoint(firstOffset); + int valueLength = (int) hashMap.writeBuffers.readVLong(); + + // The value is before the offset. Make byte segment reference absolute. + byteSegmentRef.reset(firstOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + haveReadCurrent = true; + return; + } + + /* + * Multiple values. + */ + + if (readIndex == 0) { + // For a list, the value and key lengths of 1st record were overwritten with the + // relative offset to a new list record. + long relativeOffset = hashMap.writeBuffers.readFiveByteULong(firstOffset); + + // At the beginning of the list record will be the value length. + hashMap.writeBuffers.setReadPoint(firstOffset + relativeOffset); + int valueLength = (int) hashMap.writeBuffers.readVLong(); + + // The value is before the list record offset. Make byte segment reference absolute. + byteSegmentRef.reset(firstOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + haveReadCurrent = true; + return; + } + + if (readIndex == 1) { + // We remembered the offset of just after the key length in the list record. + // Read the absolute offset to the 2nd value. + tailOffset = hashMap.writeBuffers.readFiveByteULong(offsetAfterListRecordKeyLen); + if (tailOffset <= 0) { + throw new Error("Expecting a second value"); + } + } + + hashMap.writeBuffers.setReadPoint(tailOffset); + + // Get the value length. + int valueLength = (int) hashMap.writeBuffers.readVLong(); + + // Now read the relative offset to next record. Next record is always be after the + // previous record in the write buffers (see writeBuffers javadoc). + long delta = hashMap.writeBuffers.readVLong(); + long nextTailOffset = delta == 0 ? 0 : (tailOffset - delta); + + // The value is before the value record offset. Make byte segment reference absolute. + byteSegmentRef.reset(tailOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + tailOffset = nextTailOffset; + haveReadCurrent = true; + } + + /** + * @return The ByteSegmentRef to the current value read with readCurrent. + */ + public WriteBuffers.ByteSegmentRef byteSegmentRef() { + return byteSegmentRef; + } + + /** + * Try to advance to the next value. + * @return Whether the was a next value or not. + */ + public boolean advance() { + + haveReadCurrent = false; + + // Single value? + if (!hasList) { + readIndex++; + return false; + } + + // Multiple values. + readIndex++; + if (readIndex == 1) { + return true; + } else { + return (tailOffset > 0); + } + + } + + /** + * @return Whether we have read all the values or not. + */ + public boolean isEof() { + if (!hasRows) { + return true; + } + if (!hasList) { + return (readIndex > 0); + } else { + // Multiple values. + if (readIndex <= 1) { + return false; + } else { + return (tailOffset <= 0); + } + } + } + + /** + * Repositioning value reading to the beginning. + */ + public void rewind() { + // Position at first row. + readIndex = 0; + haveReadCurrent = false; + tailOffset = -1; + + byteSegmentRef.reset(0, 0); + } + + /** + * Lets go of any references to a hash map. + */ + public void forget() { + hashMap = null; + byteSegmentRef.reset(0, 0); + hasRows = false; + readIndex = 0; + haveReadCurrent = false; + tailOffset = -1; + } + + } + /** The source of keys and values to put into hashtable; avoids byte copying. */ public static interface KvSource { /** Write key into output. */ @@ -248,51 +485,34 @@ public void put(KvSource kv, int keyHashCode) throws SerDeException { } /** - * Gets "lazy" values for a key (as a set of byte segments in underlying buffer). + * Finds a key. Values can be read with the supplied result object. + * * @param key Key buffer. + * @param offset the offset to the key in the buffer * @param length Length of the key in buffer. - * @param result The list to use to store the results. - * @return the state byte for the key (see class description). + * @param hashMapResult The object to fill in that can read the values. + * @return The state byte. */ - public byte getValueRefs(byte[] key, int length, List result) { + public byte getValueResult(byte[] key, int offset, int length, + BytesBytesMultiHashMap.Result hashMapResult) { + + hashMapResult.forget(); + // First, find first record for the key. - result.clear(); - long ref = findKeyRefToRead(key, length); + long ref = findKeyRefToRead(key, offset, length); if (ref == 0) { return 0; } + boolean hasList = Ref.hasList(ref); // This relies on findKeyRefToRead doing key equality check and leaving read ptr where needed. - long lrPtrOffset = hasList ? writeBuffers.getReadPoint() : 0; + long offsetAfterListRecordKeyLen = hasList ? writeBuffers.getReadPoint() : 0; - writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref)); - int valueLength = (int)writeBuffers.readVLong(); - // LOG.info("Returning value at " + (Ref.getOffset(ref) - valueLength) + " length " + valueLength); - result.add(new WriteBuffers.ByteSegmentRef(Ref.getOffset(ref) - valueLength, valueLength)); - byte stateByte = Ref.getStateByte(ref); - if (!hasList) { - return stateByte; - } - - // There're multiple records for the key; get the offset of the next one. - long nextTailOffset = writeBuffers.readFiveByteULong(lrPtrOffset); - // LOG.info("Next tail offset " + nextTailOffset); - - while (nextTailOffset > 0) { - writeBuffers.setReadPoint(nextTailOffset); - valueLength = (int)writeBuffers.readVLong(); - // LOG.info("Returning value at " + (nextTailOffset - valueLength) + " length " + valueLength); - result.add(new WriteBuffers.ByteSegmentRef(nextTailOffset - valueLength, valueLength)); - // Now read the relative offset to next record. Next record is always before the - // previous record in the write buffers (see writeBuffers javadoc). - long delta = writeBuffers.readVLong(); - nextTailOffset = delta == 0 ? 0 : (nextTailOffset - delta); - // LOG.info("Delta " + delta + ", next tail offset " + nextTailOffset); - } - return stateByte; - } + hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen); + return Ref.getStateByte(ref); + } /** * Take the segment reference from {@link #getValueRefs(byte[], int, List)} @@ -399,9 +619,9 @@ private int findKeySlotToWrite(long keyOffset, int keyLength, int hashCode) { * @param length Read key length. * @return The ref to use for reading. */ - private long findKeyRefToRead(byte[] key, int length) { + private long findKeyRefToRead(byte[] key, int offset, int length) { final int bucketMask = (refs.length - 1); - int hashCode = writeBuffers.hashCode(key, 0, length); + int hashCode = writeBuffers.hashCode(key, offset, length); int slot = hashCode & bucketMask; // LOG.info("Read hash code for " + Utils.toStringBinary(key, 0, length) // + " is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -413,7 +633,7 @@ private long findKeyRefToRead(byte[] key, int length) { if (ref == 0) { return 0; } - if (isSameKey(key, length, ref, hashCode)) { + if (isSameKey(key, offset, length, ref, hashCode)) { return ref; } ++metricGetConflict; @@ -483,7 +703,7 @@ private boolean isSameKey(long cmpOffset, int cmpLength, long ref, int hashCode) /** * Same as {@link #isSameKey(long, int, long, int)} but for externally stored key. */ - private boolean isSameKey(byte[] key, int length, long ref, int hashCode) { + private boolean isSameKey(byte[] key, int offset, int length, long ref, int hashCode) { if (!compareHashBits(ref, hashCode)) { return false; // Hash bits don't match. } @@ -491,7 +711,11 @@ private boolean isSameKey(byte[] key, int length, long ref, int hashCode) { int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong(); long keyOffset = Ref.getOffset(ref) - (valueLength + keyLength); // See the comment in the other isSameKey - return writeBuffers.isEqual(key, length, keyOffset, keyLength); + if (offset == 0) { + return writeBuffers.isEqual(key, length, keyOffset, keyLength); + } else { + return writeBuffers.isEqual(key, offset, length, keyOffset, keyLength); + } } private boolean compareHashBits(long ref, int hashCode) { @@ -651,7 +875,6 @@ private long writeValueAndLength(KvSource kv) throws SerDeException { public void debugDumpTable() { StringBuilder dump = new StringBuilder(keysAssigned + " keys\n"); TreeMap byteIntervals = new TreeMap(); - List results = new ArrayList(); int examined = 0; for (int slot = 0; slot < refs.length; ++slot) { long ref = refs[slot]; @@ -673,9 +896,21 @@ public void debugDumpTable() { byteIntervals.put(keyOffset - 4, keyLength + 4); writeBuffers.populateValue(fakeRef); System.arraycopy(fakeRef.getBytes(), (int)fakeRef.getOffset(), key, 0, keyLength); - getValueRefs(key, key.length, results); dump.append(Utils.toStringBinary(key, 0, key.length)).append(" ref [").append(dumpRef(ref)) - .append("]: ").append(results.size()).append(" rows\n"); + .append("]: "); + Result hashMapResult = new Result(); + getValueResult(key, 0, key.length, hashMapResult); + List results = new ArrayList(); + if (!hashMapResult.isEof()) { + while (true) { + hashMapResult.readCurrent(); + results.add(hashMapResult.byteSegmentRef); + if (!hashMapResult.advance()) { + break; + } + } + } + dump.append(results.size()).append(" rows\n"); for (int i = 0; i < results.size(); ++i) { WriteBuffers.ByteSegmentRef segment = results.get(i); byteIntervals.put(segment.getOffset(), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java index 80315c3..70f5605 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java @@ -208,6 +208,16 @@ private void read(SerDe serde, Writable writable, int rowOffset) throws HiveExce } @Override + public boolean hasRows() throws HiveException { + return rowCount() > 0; + } + + @Override + public boolean isSingleRow() throws HiveException { + return rowCount() == 1; + } + + @Override public int rowCount() throws HiveException { return rowLength > 0 ? (array.length / rowLength) : -rowLength; // see rowLength javadoc } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 3ad7655..7b61112 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -20,6 +20,7 @@ import com.esotericsoftware.kryo.Kryo; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -566,7 +567,7 @@ public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { @Override public MapJoinRowContainer getCurrentRows() { - return currentValue.isEmpty() ? null : currentValue; + return !currentValue.hasRows() ? null : currentValue; } @Override @@ -579,8 +580,8 @@ public MapJoinRowContainer getCurrentRows() { private class ReusableRowContainer implements MapJoinRowContainer, AbstractRowContainer.RowIterator> { private byte aliasFilter; - private List refs; - private int currentRow; + private BytesBytesMultiHashMap.Result hashMapResult; + /** * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. * This container does not normally support adding rows; this is for the dummy row. @@ -600,6 +601,7 @@ public ReusableRowContainer() { valueStruct = null; // No rows? } uselessIndirection = new ByteArrayRef(); + hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } @@ -611,57 +613,59 @@ public ReusableRowContainer() { * the evaluation for this big table row will be postponed. */ public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { - if (refs == null) { - refs = new ArrayList(0); - } - int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); partitionId = keyHash & (hashPartitions.length - 1); // If the target hash table is on disk, spill this row to disk as well to be processed later if (isOnDisk(partitionId)) { toSpillPartitionId = partitionId; - refs.clear(); + hashMapResult.forget(); return JoinUtil.JoinResult.SPILL; } else { - byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs( - output.getData(), output.getLength(), refs); - this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; - this.dummyRow = null; - if (refs.isEmpty()) { - return JoinUtil.JoinResult.NOMATCH; - } - else { + aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); + dummyRow = null; + if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; + } else { + aliasFilter = (byte) 0xff; + return JoinUtil.JoinResult.NOMATCH; } } } - public boolean isEmpty() { - return refs.isEmpty() && (dummyRow == null); + @Override + public boolean hasRows() { + return hashMapResult.hasRows() || (dummyRow != null); + } + + @Override + public boolean isSingleRow() { + if (!hashMapResult.hasRows()) { + return (dummyRow != null); + } + return hashMapResult.isSingleRow(); } // Implementation of row container @Override - public RowIterator> rowIter() throws HiveException { - currentRow = -1; + public AbstractRowContainer.RowIterator> rowIter() throws HiveException { + hashMapResult.rewind(); return this; } @Override public int rowCount() throws HiveException { - return dummyRow != null ? 1 : refs.size(); + // For performance reasons we do not want to chase the values to the end to determine + // the count. Use hasRows and isSingleRow instead. + throw new HiveException("Getting the row count not supported"); } @Override public void clearRows() { // Doesn't clear underlying hashtable - if (refs != null) { - refs.clear(); - } + hashMapResult.forget(); dummyRow = null; - currentRow = -1; aliasFilter = (byte) 0xff; } @@ -678,36 +682,42 @@ public MapJoinRowContainer copy() throws HiveException { // Implementation of row iterator @Override public List first() throws HiveException { - currentRow = 0; - return next(); + hashMapResult.rewind(); + return readCurrent(); } - @Override public List next() throws HiveException { + if (!hashMapResult.advance()) { + return null; + } + return readCurrent(); + } + + private List readCurrent() throws HiveException { if (dummyRow != null) { List result = dummyRow; dummyRow = null; return result; } - if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); - if (refs.size() == currentRow) return null; - WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + + if (hashMapResult.isEof()) { + throw new HiveException("No more rows"); + } + + hashMapResult.readCurrent(); + WriteBuffers.ByteSegmentRef ref = hashMapResult.byteSegmentRef(); if (ref.getLength() == 0) { return EMPTY_LIST; // shortcut, 0 length means no fields } - if (ref.getBytes() == null) { - // partitionId is derived from previously calculated value in setFromOutput() - hashPartitions[partitionId].hashMap.populateValue(ref); - } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); - return valueStruct.getFieldsAsList(); + return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? } @Override public void addRow(List t) { - if (dummyRow != null || !refs.isEmpty()) { + if (dummyRow != null || hashMapResult.hasRows()) { throw new RuntimeException("Cannot add rows when not empty"); } dummyRow = t; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index b323e8e..3d6c071 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -417,7 +417,7 @@ public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { @Override public MapJoinRowContainer getCurrentRows() { - return currentValue.isEmpty() ? null : currentValue; + return !currentValue.hasRows() ? null : currentValue; } @Override @@ -430,8 +430,8 @@ public MapJoinRowContainer getCurrentRows() { private class ReusableRowContainer implements MapJoinRowContainer, AbstractRowContainer.RowIterator> { private byte aliasFilter; - private List refs; - private int currentRow; + private BytesBytesMultiHashMap.Result hashMapResult; + /** * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. * This container does not normally support adding rows; this is for the dummy row. @@ -449,48 +449,55 @@ public ReusableRowContainer() { valueStruct = null; // No rows? } uselessIndirection = new ByteArrayRef(); + hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } public JoinUtil.JoinResult setFromOutput(Output output) { - if (refs == null) { - refs = new ArrayList(); - } - byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs); - this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; - this.dummyRow = null; - if (refs.isEmpty()) { - return JoinUtil.JoinResult.NOMATCH; - } - else { + + aliasFilter = hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); + dummyRow = null; + if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; + } else { + aliasFilter = (byte) 0xff; + return JoinUtil.JoinResult.NOMATCH; } + } - public boolean isEmpty() { - return refs.isEmpty() && (dummyRow == null); + @Override + public boolean hasRows() { + return hashMapResult.hasRows() || (dummyRow != null); + } + + @Override + public boolean isSingleRow() { + if (!hashMapResult.hasRows()) { + return (dummyRow != null); + } + return hashMapResult.isSingleRow(); } // Implementation of row container @Override public AbstractRowContainer.RowIterator> rowIter() throws HiveException { - currentRow = -1; + hashMapResult.rewind(); return this; } @Override public int rowCount() throws HiveException { - return dummyRow != null ? 1 : refs.size(); + // For performance reasons we do not want to chase the values to the end to determine + // the count. Use isEmpty and isSingleRow instead. + throw new HiveException("Getting the row count not supported"); } @Override public void clearRows() { // Doesn't clear underlying hashtable - if (refs != null) { - refs.clear(); - } + hashMapResult.forget(); dummyRow = null; - currentRow = -1; aliasFilter = (byte) 0xff; } @@ -507,30 +514,37 @@ public MapJoinRowContainer copy() throws HiveException { // Implementation of row iterator @Override public List first() throws HiveException { - currentRow = 0; - return nextInternal(); + hashMapResult.rewind(); + if (hashMapResult.isEof()) { + return null; + } + return readCurrent(); } @Override public List next() throws HiveException { - return nextInternal(); + if (!hashMapResult.advance()) { + return null; + } + return readCurrent(); } - private List nextInternal() throws HiveException { + private List readCurrent() throws HiveException { if (dummyRow != null) { List result = dummyRow; dummyRow = null; return result; } - if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); - if (refs.size() == currentRow) return null; - WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + + if (hashMapResult.isEof()) { + throw new HiveException("No more rows"); + } + + hashMapResult.readCurrent(); + WriteBuffers.ByteSegmentRef ref = hashMapResult.byteSegmentRef(); if (ref.getLength() == 0) { return EMPTY_LIST; // shortcut, 0 length means no fields } - if (ref.getBytes() == null) { - hashMap.populateValue(ref); - } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? @@ -538,7 +552,7 @@ public MapJoinRowContainer copy() throws HiveException { @Override public void addRow(List t) { - if (dummyRow != null || !refs.isEmpty()) { + if (dummyRow != null || hashMapResult.hasRows()) { throw new RuntimeException("Cannot add rows when not empty"); } dummyRow = t; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java index 2d2448d..eaeae31 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java @@ -81,6 +81,16 @@ public void addRow(Object[] t) { return null; } + @Override + public boolean hasRows() { + return list.size() > 0; + } + + @Override + public boolean isSingleRow() { + return list.size() == 1; + } + /** * Get the number of elements in the RowContainer. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index b07fdcf..15bc592 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -331,6 +331,17 @@ private void spillBlock(ROW[] block, int length) throws HiveException { } } + + @Override + public boolean hasRows() { + return size > 0; + } + + @Override + public boolean isSingleRow() { + return size == 1; + } + /** * Get the number of elements in the RowContainer. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java index 5c39e11..e7771e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java @@ -95,6 +95,17 @@ public void addRow(List t) throws HiveException { internal.addRow(t); } + + @Override + public boolean hasRows() throws HiveException { + return internal.hasRows(); + } + + @Override + public boolean isSingleRow() throws HiveException { + return internal.isSingleRow(); + } + @Override public int rowCount() throws HiveException { return internal.rowCount(); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java index 7f0a2e6..fc847e8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java @@ -50,10 +50,10 @@ public void testPutGetOne() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 0); map.put(kv, -1); - verifyResults(map, kv.getLastKey(), kv.getLastValue()); + verifyHashMapResult(map, kv.getLastKey(), kv.getLastValue()); kv = new RandomKvSource(10, 100); map.put(kv, -1); - verifyResults(map, kv.getLastKey(), kv.getLastValue()); + verifyHashMapResult(map, kv.getLastKey(), kv.getLastValue()); } @Test @@ -61,12 +61,12 @@ public void testPutGetMultiple() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 100); map.put(kv, -1); - verifyResults(map, kv.getLastKey(), kv.getLastValue()); + verifyHashMapResult(map, kv.getLastKey(), kv.getLastValue()); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); kv2.values.add(kv.getLastValue()); for (int i = 0; i < 3; ++i) { map.put(kv2, -1); - verifyResults(map, kv2.key, kv2.values.toArray(new byte[kv2.values.size()][])); + verifyHashMapResult(map, kv2.key, kv2.values.toArray(new byte[kv2.values.size()][])); } } @@ -80,11 +80,11 @@ public void testGetNonExistent() throws Exception { FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); map.put(kv2, -1); key[0] = (byte)(key[0] + 1); - List results = new ArrayList(0); - map.getValueRefs(key, key.length, results); - assertTrue(results.isEmpty()); - map.getValueRefs(key, 0, results); - assertTrue(results.isEmpty()); + BytesBytesMultiHashMap.Result hashMapResult = new BytesBytesMultiHashMap.Result(); + map.getValueResult(key, 0, key.length, hashMapResult); + assertTrue(!hashMapResult.hasRows()); + map.getValueResult(key, 0, 0, hashMapResult); + assertTrue(!hashMapResult.hasRows()); } @Test @@ -96,13 +96,12 @@ public void testPutWithFullMap() throws Exception { map.put(kv, -1); } for (int i = 0; i < kv.keys.size(); ++i) { - verifyResults(map, kv.keys.get(i), kv.values.get(i)); + verifyHashMapResult(map, kv.keys.get(i), kv.values.get(i)); } assertEquals(CAPACITY, map.getCapacity()); // Get of non-existent key should terminate.. - List results = new ArrayList(0); - map.getValueRefs(new byte[0], 0, results); - assertTrue(results.isEmpty()); + BytesBytesMultiHashMap.Result hashMapResult = new BytesBytesMultiHashMap.Result(); + map.getValueResult(new byte[0], 0, 0, hashMapResult); } @Test @@ -113,23 +112,33 @@ public void testExpand() throws Exception { for (int i = 0; i < 18; ++i) { map.put(kv, -1); for (int j = 0; j <= i; ++j) { - verifyResults(map, kv.keys.get(j), kv.values.get(j)); + verifyHashMapResult(map, kv.keys.get(j), kv.values.get(j)); } } assertEquals(1 << 18, map.getCapacity()); } - private void verifyResults(BytesBytesMultiHashMap map, byte[] key, byte[]... values) { - List results = new ArrayList(0); - byte state = map.getValueRefs(key, key.length, results); - assertEquals(state, results.size()); - assertEquals(values.length, results.size()); + private void verifyHashMapResult(BytesBytesMultiHashMap map, byte[] key, byte[]... values) { + BytesBytesMultiHashMap.Result hashMapResult = new BytesBytesMultiHashMap.Result(); + byte state = map.getValueResult(key, 0, key.length, hashMapResult); HashSet hs = new HashSet(); - for (int i = 0; i < results.size(); ++i) { - WriteBuffers.ByteSegmentRef result = results.get(i); - map.populateValue(result); - hs.add(result.copy()); + int count = 0; + if (hashMapResult.hasRows()) { + while (true) { + hashMapResult.readCurrent(); + count++; + WriteBuffers.ByteSegmentRef result = hashMapResult.byteSegmentRef(); + hs.add(result.copy()); + if (!hashMapResult.advance()) { + assertTrue(hashMapResult.isEof()); + break; + } + } + } else { + assertTrue(hashMapResult.isEof()); } + assertEquals(state, count); + assertEquals(values.length, count); for (int i = 0; i < values.length; ++i) { assertTrue(hs.contains(ByteBuffer.wrap(values[i]))); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java index fe7169e..7916a6f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.serde2; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream; import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream; import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.ByteStream.Output; /** * Extensions to bytearrayinput/output streams. @@ -90,14 +92,32 @@ public void writeInt(long offset, int value) { } @Override + public void writeByte(long offset, byte value) { + getData()[(int) offset] = value; + } + + @Override public void reserve(int byteCount) { for (int i = 0; i < byteCount; ++i) { write(0); } } + + public boolean arraysEquals(Output output) { + if (count != output.count) { + return false; + } + for (int i = 0; i < count; i++) { + if (buf[i] != output.buf[i]) { + return false; + } + } + return true; + } } public static interface RandomAccessOutput { + public void writeByte(long offset, byte value); public void writeInt(long offset, int value); public void reserve(int byteCount); public void write(int b); diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index f9ab964..0437380 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -282,6 +282,38 @@ public boolean isEqual(byte[] left, int leftLength, long rightOffset, int rightL return true; } + /** + * Compares part of the buffer with a part of an external byte array. + * Does not modify readPoint. + */ + public boolean isEqual(byte[] left, int leftOffset, int leftLength, long rightOffset, int rightLength) { + if (rightLength != leftLength) { + return false; + } + int rightIndex = getBufferIndex(rightOffset), rightFrom = getOffset(rightOffset); + byte[] rightBuffer = writeBuffers.get(rightIndex); + if (rightFrom + rightLength <= wbSize) { + // TODO: allow using unsafe optionally. + for (int i = 0; i < leftLength; ++i) { + if (left[leftOffset + i] != rightBuffer[rightFrom + i]) { + return false; + } + } + return true; + } + for (int i = 0; i < rightLength; ++i) { + if (rightFrom == wbSize) { + ++rightIndex; + rightBuffer = writeBuffers.get(rightIndex); + rightFrom = 0; + } + if (left[leftOffset + i] != rightBuffer[rightFrom++]) { + return false; + } + } + return true; + } + public void clear() { writeBuffers.clear(); currentWriteBuffer = currentReadBuffer = null; @@ -493,6 +525,21 @@ public void writeInt(long offset, int v) { currentWriteOffset = prevOffset; } + + @Override + public void writeByte(long offset, byte value) { + int prevIndex = currentWriteBufferIndex, prevOffset = currentWriteOffset; + setWritePoint(offset); + if (isAllInOneWriteBuffer(1)) { + currentWriteBuffer[currentWriteOffset] = value; + } else { + setByte(offset, value); + } + currentWriteBufferIndex = prevIndex; + currentWriteBuffer = writeBuffers.get(currentWriteBufferIndex); + currentWriteOffset = prevOffset; + } + // Lifted from org.apache.hadoop.util.hash.MurmurHash... but supports offset. public static int murmurHash(byte[] data, int offset, int length) { int m = 0x5bd1e995;