diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 00e4158..26855f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -23,8 +23,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -54,7 +53,7 @@ protected transient byte posBigTable = -1; // one of the tables that is not in memory - protected transient RowContainer> emptyList = null; + protected transient RowContainer> emptyList = null; transient int numMapRowsRead; @@ -95,7 +94,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { // all other tables are small, and are cached in the hash table posBigTable = (byte) conf.getPosBigTable(); - emptyList = new RowContainer>(1, hconf, reporter); + emptyList = new RowContainer>(1, hconf, reporter); RowContainer bigPosRC = JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[posBigTable], @@ -160,7 +159,7 @@ protected boolean hasAnyNulls(Object[] key) { } // returns true if there are elements in key list and any of them is null - protected boolean hasAnyNulls(AbstractMapJoinKey key) { + protected boolean hasAnyNulls(MapJoinKey key) { return key.hasAnyNulls(nullsafes); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index ca99f9b..d97c7a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -93,7 +93,7 @@ protected transient ArrayList[] dummyObj; // empty rows for each table - protected transient RowContainer>[] dummyObjVectors; + protected transient RowContainer>[] dummyObjVectors; protected transient int totalSz; // total size of the composite object @@ -108,7 +108,7 @@ // input is too large // to fit in memory - AbstractRowContainer>[] storage; // map b/w table alias + AbstractRowContainer>[] storage; // map b/w table alias // to RowContainer int joinEmitInterval = -1; int joinCacheSize = 0; @@ -274,7 +274,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } dummyObj[pos] = nr; // there should be only 1 dummy object in the RowContainer - RowContainer> values = JoinUtil.getRowContainer(hconf, + RowContainer> values = JoinUtil.getRowContainer(hconf, rowContainerStandardObjectInspectors[pos], alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter); @@ -328,7 +328,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { public void startGroup() throws HiveException { LOG.trace("Join: Starting new group"); newGroupStarted = true; - for (AbstractRowContainer> alw : storage) { + for (AbstractRowContainer> alw : storage) { alw.clear(); } } @@ -442,7 +442,7 @@ private void createForwardJoinObject(boolean[] skip) throws HiveException { private void genJoinObject() throws HiveException { boolean rightFirst = true; boolean hasFilter = hasFilter(order[0]); - AbstractRowContainer> aliasRes = storage[order[0]]; + AbstractRowContainer> aliasRes = storage[order[0]]; for (List rightObj = aliasRes.first(); rightObj != null; rightObj = aliasRes.next()) { boolean rightNull = rightObj == dummyObj[0]; if (hasFilter) { @@ -470,7 +470,7 @@ private void genObject(int aliasNum, boolean allLeftFirst, boolean allLeftNull) int right = joinCond.getRight(); // search for match in the rhs table - AbstractRowContainer> aliasRes = storage[order[aliasNum]]; + AbstractRowContainer> aliasRes = storage[order[aliasNum]]; boolean done = false; boolean loopAgain = false; @@ -640,8 +640,8 @@ public void endGroup() throws HiveException { private void genUniqueJoinObject(int aliasNum, int forwardCachePos) throws HiveException { - AbstractRowContainer> alias = storage[order[aliasNum]]; - for (ArrayList row = alias.first(); row != null; row = alias.next()) { + AbstractRowContainer> alias = storage[order[aliasNum]]; + for (List row = alias.first(); row != null; row = alias.next()) { int sz = joinValues[order[aliasNum]].size(); int p = forwardCachePos; for (int j = 0; j < sz; j++) { @@ -661,7 +661,7 @@ private void genAllOneUniqueJoinObject() int p = 0; for (int i = 0; i < numAliases; i++) { int sz = joinValues[order[i]].size(); - ArrayList obj = storage[order[i]].first(); + List obj = storage[order[i]].first(); for (int j = 0; j < sz; j++) { forwardCache[p++] = obj.get(j); } @@ -683,7 +683,7 @@ protected void checkAndGenObject() throws HiveException { boolean allOne = true; for (int i = 0; i < numAliases; i++) { Byte alias = order[i]; - AbstractRowContainer> alw = storage[alias]; + AbstractRowContainer> alw = storage[alias]; if (alw.size() != 1) { allOne = false; @@ -716,7 +716,7 @@ protected void checkAndGenObject() throws HiveException { boolean hasEmpty = false; for (int i = 0; i < numAliases; i++) { Byte alias = order[i]; - AbstractRowContainer> alw = storage[alias]; + AbstractRowContainer> alw = storage[alias]; if (noOuterJoin) { if (alw.size() == 0) { @@ -736,7 +736,7 @@ protected void checkAndGenObject() throws HiveException { } else { mayHasMoreThanOne = true; if (!hasEmpty) { - for (ArrayList row = alw.first(); row != null; row = alw.next()) { + for (List row = alw.first(); row != null; row = alw.next()) { reportProgress(); if (hasAnyFiltered(alias, row)) { hasEmpty = true; @@ -783,7 +783,7 @@ protected void reportProgress() { @Override public void closeOp(boolean abort) throws HiveException { LOG.trace("Join Op close"); - for (AbstractRowContainer> alw : storage) { + for (AbstractRowContainer> alw : storage) { if (alw != null) { alw.clear(); // clean up the temp files } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index fb6ecb4..7c21627 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec; -import java.io.File; +import java.io.BufferedOutputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -28,10 +29,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; @@ -41,10 +45,8 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.util.ReflectionUtils; @@ -54,128 +56,68 @@ private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName()); - protected static MapJoinMetaData metadata = new MapJoinMetaData(); - // from abstract map join operator /** * The expressions for join inputs's join keys. */ - protected transient List[] joinKeys; + private transient List[] joinKeys; /** * The ObjectInspectors for the join inputs's join keys. */ - protected transient List[] joinKeysObjectInspectors; + private transient List[] joinKeysObjectInspectors; /** * The standard ObjectInspectors for the join inputs's join keys. */ - protected transient List[] joinKeysStandardObjectInspectors; + private transient List[] joinKeysStandardObjectInspectors; - protected transient int posBigTableAlias = -1; // one of the tables that is not in memory + private transient int posBigTableAlias = -1; // one of the tables that is not in memory - protected transient RowContainer> emptyList = null; + private transient RowContainer> emptyList = null; transient int numMapRowsRead; - protected transient int totalSz; // total size of the composite object + private transient int totalSz; // total size of the composite object transient boolean firstRow; /** * The filters for join */ - protected transient List[] joinFilters; + private transient List[] joinFilters; - protected transient int[][] filterMaps; + private transient int[][] filterMaps; - protected transient int numAliases; // number of aliases + private transient int numAliases; // number of aliases /** * The expressions for join outputs. */ - protected transient List[] joinValues; + private transient List[] joinValues; /** * The ObjectInspectors for the join inputs. */ - protected transient List[] joinValuesObjectInspectors; + private transient List[] joinValuesObjectInspectors; /** * The ObjectInspectors for join filters. */ - protected transient List[] joinFilterObjectInspectors; + private transient List[] joinFilterObjectInspectors; /** * The standard ObjectInspectors for the join inputs. */ - protected transient List[] joinValuesStandardObjectInspectors; + private transient List[] joinValuesStandardObjectInspectors; - protected transient List[] rowContainerStandardObjectInspectors; + private transient List[] rowContainerStandardObjectInspectors; - protected transient Byte[] order; // order in which the results should - Configuration hconf; - protected transient Byte alias; - protected transient TableDesc[] spillTableDesc; // spill tables are + private transient Byte[] order; // order in which the results should + private Configuration hconf; + private transient Byte alias; + private transient TableDesc[] spillTableDesc; // spill tables are - protected transient HashMapWrapper[] mapJoinTables; - protected transient boolean noOuterJoin; + private transient MapJoinTableContainer[] mapJoinTables; + private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; + + private transient boolean noOuterJoin; private long rowNumber = 0; - protected transient LogHelper console; + private transient LogHelper console; private long hashTableScale; - private boolean isAbort = false; - - public static class HashTableSinkObjectCtx { - ObjectInspector standardOI; - SerDe serde; - TableDesc tblDesc; - Configuration conf; - boolean hasFilter; - - /** - * @param standardOI - * @param serde - */ - public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc, - boolean hasFilter, Configuration conf) { - this.standardOI = standardOI; - this.serde = serde; - this.tblDesc = tblDesc; - this.hasFilter = hasFilter; - this.conf = conf; - } - - /** - * @return the standardOI - */ - public ObjectInspector getStandardOI() { - return standardOI; - } - - /** - * @return the serde - */ - public SerDe getSerDe() { - return serde; - } - - public TableDesc getTblDesc() { - return tblDesc; - } - - public boolean hasFilterTag() { - return hasFilter; - } - - public Configuration getConf() { - return conf; - } - - } - - public static MapJoinMetaData getMetadata() { - return metadata; - } - - private static final transient String[] FATAL_ERR_MSG = { - null, // counter value 0 means no error - "Mapside join exceeds available memory. " - + "Please try removing the mapjoin hint."}; - private final int metadataKeyTag = -1; - transient int[] metadataValueTag; - - + private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; + public HashTableSinkOperator() { } @@ -189,6 +131,7 @@ public HashTableSinkOperator(MapJoinOperator mjop) { protected void initializeOp(Configuration hconf) throws HiveException { boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); + memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); numMapRowsRead = 0; firstRow = true; @@ -248,40 +191,42 @@ protected void initializeOp(Configuration hconf) throws HiveException { rowContainerStandardObjectInspectors = getStandardObjectInspectors( rowContainerObjectInspectors, tagLen); } - - metadataValueTag = new int[numAliases]; - for (int pos = 0; pos < numAliases; pos++) { - metadataValueTag[pos] = -1; - } - mapJoinTables = new HashMapWrapper[tagLen]; - + mapJoinTables = new MapJoinTableContainer[tagLen]; + mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD); float hashTableLoadFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); - float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage(); - hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE); if (hashTableScale <= 0) { hashTableScale = 1; } - - // initialize the hash tables for other tables - for (Byte pos : order) { - if (pos == posBigTableAlias) { - continue; + try { + TableDesc keyTableDesc = conf.getKeyTblDesc(); + SerDe keySerde = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), + null); + keySerde.initialize(null, keyTableDesc.getProperties()); + MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false); + for (Byte pos : order) { + if (pos == posBigTableAlias) { + continue; + } + mapJoinTables[pos] = new HashMapWrapper(hashTableThreshold, hashTableLoadFactor); + TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(pos); + SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null); + valueSerDe.initialize(null, valueTableDesc.getProperties()); + mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, new MapJoinObjectSerDeContext( + valueSerDe, hasFilter(pos))); } - - HashMapWrapper hashTable = new HashMapWrapper( - hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage); - - mapJoinTables[pos] = hashTable; + } catch (SerDeException e) { + throw new HiveException(e); } } - protected static List[] getStandardObjectInspectors( + private static List[] getStandardObjectInspectors( List[] aliasToObjectInspectors, int maxTag) { + @SuppressWarnings("unchecked") List[] result = new List[maxTag]; for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) { List oiList = aliasToObjectInspectors[alias]; @@ -299,104 +244,34 @@ protected void initializeOp(Configuration hconf) throws HiveException { } - private void setKeyMetaData() throws SerDeException { - TableDesc keyTableDesc = conf.getKeyTblDesc(); - SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), - null); - keySerializer.initialize(null, keyTableDesc.getProperties()); - - metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx( - ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf)); - } - - private boolean hasFilter(int alias) { - return filterMaps != null && filterMaps[alias] != null; - } /* * This operator only process small tables Read the key/value pairs Load them into hashtable */ @Override public void processOp(Object row, int tag) throws HiveException { - // let the mapJoinOp process these small tables - try { - if (firstRow) { - // generate the map metadata - setKeyMetaData(); - firstRow = false; - } - alias = (byte)tag; - - // compute keys and values as StandardObjects - AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias], - joinKeysObjectInspectors[alias]); - - Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias], - joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias], - filterMaps == null ? null : filterMaps[alias]); - - HashMapWrapper hashTable = mapJoinTables[alias]; - - MapJoinObjectValue o = hashTable.get(keyMap); - MapJoinRowContainer res = null; - - boolean needNewKey = true; - if (o == null) { - res = new MapJoinRowContainer(); - res.add(value); - - if (metadataValueTag[tag] == -1) { - metadataValueTag[tag] = order[tag]; - setValueMetaData(tag); - } - - // Construct externalizable objects for key and value - if (needNewKey) { - MapJoinObjectValue valueObj = new MapJoinObjectValue( - metadataValueTag[tag], res); - - rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - isAbort = hashTable.isAbort(rowNumber, console); - if (isAbort) { - throw new HiveException("RunOutOfMeomoryUsage"); - } - } - hashTable.put(keyMap, valueObj); - } - - } else { - res = o.getObj(); - res.add(value); + alias = (byte)tag; + // compute keys and values as StandardObjects + MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias], + joinKeysObjectInspectors[alias]); + Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias], + joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias], + filterMaps == null ? null : filterMaps[alias]); + MapJoinTableContainer tableContainer = mapJoinTables[alias]; + MapJoinRowContainer rowContainer = tableContainer.get(key); + if (rowContainer == null) { + rowContainer = new MapJoinRowContainer(); + rowContainer.add(value); + rowNumber++; + if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { + memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); } - - - } catch (SerDeException e) { - throw new HiveException(e); + tableContainer.put(key, rowContainer); + } else { + rowContainer.add(value); } - } - - private void setValueMetaData(int tag) throws SerDeException { - TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag); - SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), - null); - - valueSerDe.initialize(null, valueTableDesc.getProperties()); - - List newFields = rowContainerStandardObjectInspectors[alias]; - int length = newFields.size(); - List newNames = new ArrayList(length); - for (int i = 0; i < length; i++) { - String tmp = new String("tmp_" + i); - newNames.add(tmp); - } - StandardStructObjectInspector standardOI = ObjectInspectorFactory - .getStandardStructObjectInspector(newNames, newFields); - - int alias = Integer.valueOf(metadataValueTag[tag]); - metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx( - standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf)); + private boolean hasFilter(int alias) { + return filterMaps != null && filterMaps[alias] != null; } @Override @@ -406,41 +281,36 @@ public void closeOp(boolean abort) throws HiveException { // get tmp file URI String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI(); LOG.info("Get TMP URI: " + tmpURI); - long fileLength; for (byte tag = 0; tag < mapJoinTables.length; tag++) { // get the key and value - HashMapWrapper hashTable = mapJoinTables[tag]; - if (hashTable == null) { + MapJoinTableContainer tableContainer = mapJoinTables[tag]; + if (tableContainer == null) { continue; } - // get current input file name String bigBucketFileName = getExecContext().getCurrentBigBucketFile(); - String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName); - // get the tmp URI path; it will be a hdfs path if not local mode String dumpFilePrefix = conf.getDumpFilePrefix(); String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); - hashTable.isAbort(rowNumber, console); console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); // get the hashtable file and path Path path = new Path(tmpURIPath); FileSystem fs = path.getFileSystem(hconf); - File file = new File(path.toUri().getPath()); - fs.create(path); - fileLength = hashTable.flushMemoryCacheToPersistent(file); + ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096)); + try { + mapJoinTableSerdes[tag].persist(out, tableContainer); + } finally { + out.close(); + } + tableContainer.clear(); console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: " - + fileLength); - - hashTable.close(); + + fs.getFileStatus(path).getLen()); } } - super.closeOp(abort); } catch (Exception e) { LOG.error("Generate Hashtable error", e); - e.printStackTrace(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 31dbf41..1d853c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -23,10 +23,7 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -147,42 +144,22 @@ public static int populateJoinKeyValue(List[] outMap, /** * Return the key as a standard object. StandardObject can be inspected by a - * standard ObjectInspector. + * standard ObjectInspector. The first parameter a MapJoinKey can + * be null if the caller would like a new object to be instantiated. */ - public static AbstractMapJoinKey computeMapJoinKeys(Object row, + public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row, List keyFields, List keyFieldsOI) throws HiveException { - int size = keyFields.size(); - if(size == 1){ - Object obj = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0) - .evaluate(row), keyFieldsOI.get(0), - ObjectInspectorCopyOption.WRITABLE)); - MapJoinSingleKey key = new MapJoinSingleKey(obj); - return key; - }else if(size == 2){ - Object obj1 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0) - .evaluate(row), keyFieldsOI.get(0), - ObjectInspectorCopyOption.WRITABLE)); - - Object obj2 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(1) - .evaluate(row), keyFieldsOI.get(1), - ObjectInspectorCopyOption.WRITABLE)); - - MapJoinDoubleKeys key = new MapJoinDoubleKeys(obj1,obj2); - return key; - }else{ - // Compute the keys - Object[] nr = new Object[keyFields.size()]; - for (int i = 0; i < keyFields.size(); i++) { - - nr[i] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(i) - .evaluate(row), keyFieldsOI.get(i), - ObjectInspectorCopyOption.WRITABLE)); - } - MapJoinObjectKey key = new MapJoinObjectKey(nr); - return key; - } + if(key == null || key.getKey().length != size) { + key = new MapJoinKey(new Object[size]); + } + Object[] array = key.getKey(); + for (int keyIndex = 0; keyIndex < size; keyIndex++) { + array[keyIndex] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(keyIndex) + .evaluate(row), keyFieldsOI.get(keyIndex), ObjectInspectorCopyOption.WRITABLE)); + } + return key; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java deleted file mode 100644 index ec73022..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; - -public class MapJoinMetaData { - transient Map mapMetadata = - new HashMap(); - static ArrayList list = new ArrayList(); - - public MapJoinMetaData(){ - - } - public void put(Integer key, HashTableSinkObjectCtx value){ - mapMetadata.put(key, value); - } - public HashTableSinkObjectCtx get(Integer key){ - return mapMetadata.get(key); - } - - public void clear(){ - mapMetadata.clear(); - } - - public static ArrayList getList(){ - list.clear(); - return list; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 4da1be8..30aaa98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; @@ -27,20 +30,17 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.ReflectionUtils; @@ -51,23 +51,15 @@ private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); - - protected transient HashMapWrapper[] mapJoinTables; - - protected static MapJoinMetaData metadata = new MapJoinMetaData(); - public static MapJoinMetaData getMetadata() { - return metadata; - } - private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error "Mapside join exceeds available memory. " + "Please try removing the mapjoin hint."}; - protected transient MapJoinRowContainer>[] rowContainerMap; - transient int metadataKeyTag; - transient int[] metadataValueTag; - transient boolean hashTblInitedOnce; + private transient MapJoinTableContainer[] mapJoinTables; + private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; + private transient boolean hashTblInitedOnce; + private transient MapJoinKey key; public MapJoinOperator() { } @@ -77,35 +69,11 @@ public MapJoinOperator(AbstractMapJoinOperator mjop) { } @Override - @SuppressWarnings("unchecked") protected void initializeOp(Configuration hconf) throws HiveException { - super.initializeOp(hconf); - - metadataValueTag = new int[numAliases]; - for (int pos = 0; pos < numAliases; pos++) { - metadataValueTag[pos] = -1; - } - - metadataKeyTag = -1; - int tagLen = conf.getTagLength(); - - mapJoinTables = new HashMapWrapper[tagLen]; - rowContainerMap = new MapJoinRowContainer[tagLen]; - // initialize the hash tables for other tables - for (int pos = 0; pos < numAliases; pos++) { - if (pos == posBigTable) { - continue; - } - - HashMapWrapper hashTable = new HashMapWrapper(); - - mapJoinTables[pos] = hashTable; - MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); - rowContainerMap[pos] = rowContainer; - } - + mapJoinTables = new MapJoinTableContainer[tagLen]; + mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; hashTblInitedOnce = false; } @@ -118,14 +86,12 @@ protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { public void generateMapMetaData() throws HiveException, SerDeException { // generate the meta data for key // index for key is -1 + TableDesc keyTableDesc = conf.getKeyTblDesc(); SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null); keySerializer.initialize(null, keyTableDesc.getProperties()); - metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx( - ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf)); - + MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false); for (int pos = 0; pos < order.length; pos++) { if (pos == posBigTable) { continue; @@ -139,16 +105,12 @@ public void generateMapMetaData() throws HiveException, SerDeException { SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null); valueSerDe.initialize(null, valueTableDesc.getProperties()); - - ObjectInspector inspector = valueSerDe.getObjectInspector(); - metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils - .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE), - valueSerDe, valueTableDesc, hasFilter(pos), hconf)); + MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos)); + mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext); } } private void loadHashTable() throws HiveException { - if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { if (hashTblInitedOnce) { return; @@ -158,12 +120,9 @@ private void loadHashTable() throws HiveException { } String baseDir = null; - String currentInputFile = getExecContext().getCurrentInputFile(); LOG.info("******* Load from HashTable File: input : " + currentInputFile); - String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile); - try { if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); @@ -183,18 +142,25 @@ private void loadHashTable() throws HiveException { baseDir = archiveLocalLink.toUri().getPath(); } } - for (byte pos = 0; pos < mapJoinTables.length; pos++) { - HashMapWrapper hashtable = mapJoinTables[pos]; - if (hashtable == null) { + for (int pos = 0; pos < mapJoinTables.length; pos++) { + if (pos == posBigTable) { continue; } - String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName); + if(baseDir == null) { + throw new IllegalStateException("baseDir cannot be null"); + } + String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName); Path path = new Path(filePath); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); - hashtable.initilizePersistentHash(path.toUri().getPath()); + LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path); + ObjectInputStream in = new ObjectInputStream(new BufferedInputStream( + new FileInputStream(path.toUri().getPath()), 4096)); + try{ + mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in); + } finally { + in.close(); + } } } catch (Exception e) { - LOG.error("Load Distributed Cache Error", e); throw new HiveException(e); } } @@ -208,43 +174,33 @@ public void cleanUpInputFileChangedOp() throws HiveException { generateMapMetaData(); firstRow = false; } - loadHashTable(); } catch (SerDeException e) { - e.printStackTrace(); throw new HiveException(e); } } @Override public void processOp(Object row, int tag) throws HiveException { - try { if (firstRow) { // generate the map metadata generateMapMetaData(); firstRow = false; } - alias = (byte)tag; - if ((lastAlias == null) || (!lastAlias.equals(alias))) { nextSz = joinEmitInterval; } - // compute keys and values as StandardObjects - AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias], + key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias], joinKeysObjectInspectors[alias]); - boolean joinNeeded = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { - - MapJoinObjectValue o = mapJoinTables[pos].get(key); - MapJoinRowContainer> rowContainer = rowContainerMap[pos]; - + MapJoinRowContainer rowContainer = mapJoinTables[pos].get(key); // there is no join-value or join-key has all null elements - if (o == null || key.hasAnyNulls(nullsafes)) { + if (rowContainer == null || key.hasAnyNulls(nullsafes)) { if (!noOuterJoin) { joinNeeded = true; storage[pos] = dummyObjVectors[pos]; @@ -253,45 +209,36 @@ public void processOp(Object row, int tag) throws HiveException { } } else { joinNeeded = true; - rowContainer.reset(o.getObj()); - storage[pos] = rowContainer; - aliasFilterTags[pos] = o.getAliasFilter(); + storage[pos] = rowContainer.copy(); + aliasFilterTags[pos] = rowContainer.getAliasFilter(); } } } - if (joinNeeded) { ArrayList value = getFilteredValue(alias, row); - // Add the value to the ArrayList storage[alias].add(value); - // generate the output records checkAndGenObject(); } - // done with the row storage[tag].clear(); - for (byte pos = 0; pos < order.length; pos++) { if (pos != tag) { storage[pos] = null; } } - } catch (SerDeException e) { - e.printStackTrace(); throw new HiveException(e); } } @Override public void closeOp(boolean abort) throws HiveException { - if (mapJoinTables != null) { - for (HashMapWrapper hashTable : mapJoinTables) { - if (hashTable != null) { - hashTable.close(); + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + tableContainer.clear(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 1a784b2..097f650 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -64,10 +64,10 @@ private MapredLocalWork localWork = null; private Map aliasToMergeQueue = Collections.emptyMap(); - transient ArrayList[] keyWritables; - transient ArrayList[] nextKeyWritables; - RowContainer>[] nextGroupStorage; - RowContainer>[] candidateStorage; + transient List[] keyWritables; + transient List[] nextKeyWritables; + RowContainer>[] nextGroupStorage; + RowContainer>[] candidateStorage; transient String[] tagToAlias; private transient boolean[] fetchDone; @@ -435,7 +435,7 @@ private void fetchNextGroup(Byte t) throws HiveException { private void promoteNextGroupToCandidate(Byte t) throws HiveException { this.keyWritables[t] = this.nextKeyWritables[t]; this.nextKeyWritables[t] = null; - RowContainer> oldRowContainer = this.candidateStorage[t]; + RowContainer> oldRowContainer = this.candidateStorage[t]; oldRowContainer.clear(); this.candidateStorage[t] = this.nextGroupStorage[t]; this.nextGroupStorage[t] = oldRowContainer; @@ -479,10 +479,10 @@ private void putDummyOrEmpty(Byte i) { private int[] findSmallestKey() { int[] result = new int[order.length]; - ArrayList smallestOne = null; + List smallestOne = null; for (byte pos = 0; pos < order.length; pos++) { - ArrayList key = keyWritables[pos]; + List key = keyWritables[pos]; if (key == null) { continue; } @@ -501,7 +501,7 @@ private void putDummyOrEmpty(Byte i) { private boolean processKey(byte alias, ArrayList key) throws HiveException { - ArrayList keyWritable = keyWritables[alias]; + List keyWritable = keyWritables[alias]; if (keyWritable == null) { //the first group. keyWritables[alias] = key; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java new file mode 100644 index 0000000..dbe00b6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + + + +public class MapJoinMemoryExhaustionException extends HiveException { + private static final long serialVersionUID = 3678353959830506881L; + public MapJoinMemoryExhaustionException(String msg) { + super(msg); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java new file mode 100644 index 0000000..33760aa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.text.NumberFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +/** + * Handles the logic around deciding when to throw an {@see MapJoinMemoryExhaustionException} + * for {@see HashTableSinkOperator}. + */ +public class MapJoinMemoryExhaustionHandler { + private static final Log LOG = LogFactory.getLog(MapJoinMemoryExhaustionHandler.class); + + public static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + + /** + * The percentage of overall heap that the JVM is allowed + * to allocate before failing a MapJoin local task. + */ + private final double maxMemoryUsage; + /** + * The max heap of the JVM in bytes. + */ + private final long maxHeapSize; + private final LogHelper console; + private final NumberFormat percentageNumberFormat; + /** + * Constructor expects a LogHelper object in addition to the max percent + * of heap memory which can be consumed before a {@see MapJoinMemoryExhaustionException} + * is thrown. + */ + public MapJoinMemoryExhaustionHandler(LogHelper console, double maxMemoryUsage) { + this.console = console; + this.maxMemoryUsage = maxMemoryUsage; + long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax(); + /* + * getMax() can return -1. In this case default to 200MB. This + * likely will never happen.s + */ + if(maxHeapSize == -1) { + this.maxHeapSize = 200L * 1024L * 1024L; + LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " + + "defaulting maxHeapSize to 200MB"); + } else { + this.maxHeapSize = maxHeapSize; + } + percentageNumberFormat = NumberFormat.getInstance(); + percentageNumberFormat.setMinimumFractionDigits(2); + LOG.info("JVM Max Heap Size: " + this.maxHeapSize); + } + /** + * Throws {@see MapJoinMemoryExhaustionException} when the JVM has consumed the + * configured percentage of memory. The arguments are used simply for the error + * message. + * + * @param tableContainerSize currently table container size + * @param numRows number of rows processed + * @throws MapJoinMemoryExhaustionException + */ + public void checkMemoryStatus(long tableContainerSize, long numRows) + throws MapJoinMemoryExhaustionException { + long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); + double percentage = (double) usedMemory / (double) maxHeapSize; + String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage); + console.printInfo(msg); + if(percentage > maxMemoryUsage) { + throw new MapJoinMemoryExhaustionException(msg); + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index f72ecfb..4d88d39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.lang.management.ManagementFactory; @@ -51,9 +52,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; -import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.FetchWork; @@ -306,14 +306,13 @@ public int executeFromChildJVM(DriverContext driverContext) { long elapsed = currentTime - startTime; console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " + Utilities.showTime(elapsed) + " sec."); - } catch (Throwable e) { - if (e instanceof OutOfMemoryError - || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) { - // Don't create a new object if we are already out of memory + } catch (Throwable throwable) { + if (throwable instanceof OutOfMemoryError + || (throwable instanceof MapJoinMemoryExhaustionException)) { + l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; } else { - l4j.error("Hive Runtime Error: Map local work failed"); - e.printStackTrace(); + l4j.error("Hive Runtime Error: Map local work failed", throwable); return 2; } } @@ -323,7 +322,6 @@ public int executeFromChildJVM(DriverContext driverContext) { private void startForward(boolean inputFileChangeSenstive, String bigTableBucket) throws Exception { for (Map.Entry entry : fetchOperators.entrySet()) { - int fetchOpRows = 0; String alias = entry.getKey(); FetchOperator fetchOp = entry.getValue(); @@ -351,7 +349,6 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket forwardOp.close(false); break; } - fetchOpRows++; forwardOp.process(row.o, 0); // check if any operator had a fatal error or early exit during // execution @@ -412,7 +409,8 @@ private void initializeOperators(Map fetchOpJobConfMap) } } - private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException { + private void generateDummyHashTable(String alias, String bigBucketFileName) + throws HiveException,IOException { // find the (byte)tag for the map join(HashTableSinkOperator) Operator parentOp = work.getAliasToWork().get(alias); Operator childOp = parentOp.getChildOperators().get(0); @@ -429,8 +427,6 @@ private void generateDummyHashTable(String alias, String bigBucketFileName) thro // generate empty hashtable for this (byte)tag String tmpURI = this.getWork().getTmpFileURI(); - HashMapWrapper hashTable = - new HashMapWrapper(); String fileName = work.getBucketFileName(bigBucketFileName); @@ -440,12 +436,14 @@ private void generateDummyHashTable(String alias, String bigBucketFileName) thro console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); Path path = new Path(tmpURIPath); FileSystem fs = path.getFileSystem(job); - File file = new File(path.toUri().getPath()); - fs.create(path); - long fileLength = hashTable.flushMemoryCacheToPersistent(file); + ObjectOutputStream out = new ObjectOutputStream(fs.create(path)); + try { + MapJoinTableContainerSerDe.persistDummyTable(out); + } finally { + out.close(); + } console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: " - + fileLength); - hashTable.close(); + + fs.getFileStatus(path).getLen()); } private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java deleted file mode 100644 index e30da7c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinKey.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - -/** - * Map Join Object used for both key. - */ -public abstract class AbstractMapJoinKey implements Externalizable { - - protected static int metadataTag = -1; - - public AbstractMapJoinKey() { - } - - @Override - public abstract boolean equals(Object o); - - @Override - public abstract int hashCode(); - - public abstract void readExternal(ObjectInput in) throws IOException, ClassNotFoundException; - - public abstract void writeExternal(ObjectOutput out) throws IOException; - - public abstract boolean hasAnyNulls(boolean[] nullsafes); - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java new file mode 100644 index 0000000..8854b19 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractMapJoinTableContainer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.util.Collections; +import java.util.Map; + +public abstract class AbstractMapJoinTableContainer implements MapJoinTableContainer { + private final Map metaData; + + protected AbstractMapJoinTableContainer(Map metaData) { + this.metaData = metaData; + } + @Override + public Map getMetaData() { + return Collections.unmodifiableMap(metaData); + } + + protected void putMetaData(String key, String value) { + metaData.put(key, value); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java index 00021c9..6306e6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java @@ -20,17 +20,17 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; -public abstract class AbstractRowContainer { +public abstract class AbstractRowContainer { public AbstractRowContainer() { } - public abstract void add(Row t) throws HiveException; + public abstract void add(ROW t) throws HiveException; - public abstract Row first() throws HiveException; + public abstract ROW first() throws HiveException; - public abstract Row next() throws HiveException; + public abstract ROW next() throws HiveException; /** * Get the number of elements in the RowContainer. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java deleted file mode 100644 index a919cd2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -/** - * Doubly circular linked list item. - */ -public class DCLLItem { - - DCLLItem prev; - DCLLItem next; - - DCLLItem() { - prev = next = this; - } - - /** - * Get the next item. - * - * @return the next item. - */ - public DCLLItem getNext() { - return next; - } - - /** - * Get the previous item. - * - * @return the previous item. - */ - public DCLLItem getPrev() { - return prev; - } - - /** - * Set the next item as itm. - * - * @param itm - * the item to be set as next. - */ - public void setNext(DCLLItem itm) { - next = itm; - } - - /** - * Set the previous item as itm. - * - * @param itm - * the item to be set as previous. - */ - public void setPrev(DCLLItem itm) { - prev = itm; - } - - /** - * Remove the current item from the doubly circular linked list. - */ - public void remove() { - next.prev = prev; - prev.next = next; - prev = next = null; - } - - /** - * Add v as the previous of the current list item. - * - * @param v - * inserted item. - */ - public void insertBefore(DCLLItem v) { - prev.next = v; - v.prev = prev; - v.next = this; - prev = v; - } - - /** - * Add v as the previous of the current list item. - * - * @param v - * inserted item. - */ - public void insertAfter(DCLLItem v) { - next.prev = v; - v.next = next; - v.prev = this; - next = v; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index 9d5233d..61545b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -18,26 +18,14 @@ package org.apache.hadoop.hive.ql.exec.persistence; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.text.NumberFormat; import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** @@ -47,26 +35,17 @@ * hash table. */ -public class HashMapWrapper implements Serializable { +public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable { private static final long serialVersionUID = 1L; - protected Log LOG = LogFactory.getLog(this.getClass().getName()); + protected static final Log LOG = LogFactory.getLog(HashMapWrapper.class); // default threshold for using main memory based HashMap - + private static final String THESHOLD_NAME = "threshold"; + private static final String LOAD_NAME = "load"; private static final int THRESHOLD = 1000000; private static final float LOADFACTOR = 0.75f; - private static final float MEMORYUSAGE = 1; - - private float maxMemoryUsage; - private HashMap mHash; // main memory HashMap - protected transient LogHelper console; - - private File dumpFile; - public static MemoryMXBean memoryMXBean; - private long maxMemory; - private long currentMemory; - private NumberFormat num; + private HashMap mHash; // main memory HashMap /** * Constructor. @@ -74,163 +53,53 @@ * @param threshold * User specified threshold to store new values into persistent storage. */ - public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) { - maxMemoryUsage = memoryUsage; - mHash = new HashMap(threshold, loadFactor); - memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); - LOG.info("maximum memory: " + maxMemory); - num = NumberFormat.getInstance(); - num.setMinimumFractionDigits(2); + public HashMapWrapper(int threshold, float loadFactor) { + super(createConstructorMetaData(threshold, loadFactor)); + mHash = new HashMap(threshold, loadFactor); + + } + + public HashMapWrapper(Map metaData) { + super(metaData); + int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME)); + float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME)); + mHash = new HashMap(threshold, loadFactor); } public HashMapWrapper(int threshold) { - this(threshold, LOADFACTOR, MEMORYUSAGE); + this(threshold, LOADFACTOR); } public HashMapWrapper() { - this(THRESHOLD, LOADFACTOR, MEMORYUSAGE); + this(THRESHOLD, LOADFACTOR); } - public V get(K key) { + @Override + public MapJoinRowContainer get(MapJoinKey key) { return mHash.get(key); } - public boolean put(K key, V value) throws HiveException { - // isAbort(); + @Override + public void put(MapJoinKey key, MapJoinRowContainer value) { mHash.put(key, value); - return false; - } - - - public void remove(K key) { - mHash.remove(key); - } - - /** - * Flush the main memory hash table into the persistent cache file - * - * @return persistent cache file - */ - public long flushMemoryCacheToPersistent(File file) throws IOException { - ObjectOutputStream outputStream = null; - outputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4096)); - outputStream.writeObject(mHash); - outputStream.flush(); - outputStream.close(); - - return file.length(); - } - - public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException { - ObjectInputStream inputStream = null; - inputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(fileName), 4096)); - HashMap hashtable = (HashMap) inputStream.readObject(); - this.setMHash(hashtable); - - inputStream.close(); } + @Override public int size() { return mHash.size(); } - - public Set keySet() { - return mHash.keySet(); - } - - - /** - * Close the persistent hash table and clean it up. - * - * @throws HiveException - */ - public void close() throws HiveException { - mHash.clear(); + @Override + public Set> entrySet() { + return mHash.entrySet(); } - - public void clear() throws HiveException { + @Override + public void clear() { mHash.clear(); } - - public int getKeySize() { - return mHash.size(); - } - - public boolean isAbort(long numRows,LogHelper console) { - int size = mHash.size(); - long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - double rate = (double) usedMemory / (double) maxMemory; - console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" - + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate)); - if (rate > (double) maxMemoryUsage) { - return true; - } - return false; - } - - public void setLOG(Log log) { - LOG = log; + private static Map createConstructorMetaData(int threshold, float loadFactor) { + Map metaData = new HashMap(); + metaData.put(THESHOLD_NAME, String.valueOf(threshold)); + metaData.put(LOAD_NAME, String.valueOf(loadFactor)); + return metaData; } - - public HashMap getMHash() { - return mHash; - } - - public void setMHash(HashMap hash) { - mHash = hash; - } - - public LogHelper getConsole() { - return console; - } - - public void setConsole(LogHelper console) { - this.console = console; - } - - public File getDumpFile() { - return dumpFile; - } - - public void setDumpFile(File dumpFile) { - this.dumpFile = dumpFile; - } - - public static MemoryMXBean getMemoryMXBean() { - return memoryMXBean; - } - - public static void setMemoryMXBean(MemoryMXBean memoryMXBean) { - HashMapWrapper.memoryMXBean = memoryMXBean; - } - - public long getMaxMemory() { - return maxMemory; - } - - public void setMaxMemory(long maxMemory) { - this.maxMemory = maxMemory; - } - - public long getCurrentMemory() { - return currentMemory; - } - - public void setCurrentMemory(long currentMemory) { - this.currentMemory = currentMemory; - } - - public NumberFormat getNum() { - return num; - } - - public void setNum(NumberFormat num) { - this.num = num; - } - - public static int getTHRESHOLD() { - return THRESHOLD; - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java deleted file mode 100644 index ee644f7..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -/** - * An MRU (Most Recently Used) cache implementation. This implementation - * maintains a doubly circular linked list and it can be used with an auxiliary - * data structure such as a HashMap to locate the item quickly. - */ -public class MRU { - - T head; // head of the linked list -- MRU; tail (head.prev) will be the LRU - - public MRU() { - head = null; - } - - /** - * Insert a value into the MRU. It will appear as the head. - */ - public T put(T item) { - addToHead(item); - return item; - } - - /** - * Remove a item from the MRU list. - * - * @param v - * linked list item. - */ - public void remove(T v) { - if (v == null) { - return; - } - if (v == head) { - if (head != head.getNext()) { - head = (T) head.getNext(); - } else { - head = null; - } - } - v.remove(); - } - - /** - * Get the most recently used. - * - * @return the most recently used item. - */ - public T head() { - return head; - } - - /** - * Get the least recently used. - * - * @return the least recently used item. - */ - public T tail() { - return (T) head.getPrev(); - } - - /** - * Insert a new item as the head. - * - * @param v - * the new linked list item to be added to the head. - */ - private void addToHead(T v) { - if (head == null) { - head = v; - } else { - head.insertBefore(v); - head = v; - } - } - - /** - * Move an existing item to the head. - * - * @param v - * the linked list item to be moved to the head. - */ - public void moveToHead(T v) { - assert (head != null); - if (head != v) { - v.remove(); - head.insertBefore(v); - head = v; - } - } - - /** - * Clear all elements in the MRU list. This is not very efficient (linear) - * since it will call remove() to every item in the list. - */ - public void clear() { - while (head.getNext() != head) { - head.getNext().remove(); - } - head.remove(); - head = null; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java deleted file mode 100644 index fc4780e..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.io.Writable; - -public class MapJoinDoubleKeys extends AbstractMapJoinKey { - - protected transient Object obj1; - protected transient Object obj2; - - - public MapJoinDoubleKeys() { - } - - /** - * @param obj1 - * @param obj2 - */ - public MapJoinDoubleKeys(Object obj1, Object obj2) { - this.obj1 = obj1; - this.obj2 = obj2; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapJoinDoubleKeys) { - MapJoinDoubleKeys mObj = (MapJoinDoubleKeys) o; - Object key1 = mObj.getObj1(); - Object key2 = mObj.getObj2(); - - if ((obj1 == null) && (key1 == null)) { - if ((obj2 == null) && (key2 == null)) { - return true; - } - } - if ((obj1 != null) && (key1 != null)) { - if (obj1.equals(key1)) { - if ((obj2 != null) && (key2 != null)) { - if (obj2.equals(key2)) { - return true; - } - } - } - } - } - return false; - } - - @Override - public int hashCode() { - int hashCode = 1; - if (obj1 == null) { - hashCode = metadataTag; - } else { - hashCode += (31 + obj1.hashCode()); - } - if (obj2 == null) { - hashCode += metadataTag; - } else { - hashCode += (31 + obj2.hashCode()); - } - return hashCode; - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - try { - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(Integer.valueOf(metadataTag)); - - Writable val = ctx.getSerDe().getSerializedClass().newInstance(); - val.readFields(in); - - - - ArrayList list = (ArrayList) ObjectInspectorUtils.copyToStandardObject(ctx - .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE); - - if (list == null) { - obj1 = null; - obj2 = null; - - } else { - obj1 = list.get(0); - obj2 = list.get(1); - } - - } catch (Exception e) { - throw new IOException(e); - } - - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - try { - // out.writeInt(metadataTag); - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - ArrayList list = MapJoinMetaData.getList(); - list.add(obj1); - list.add(obj2); - // Different processing for key and value - Writable outVal = ctx.getSerDe().serialize(list, ctx.getStandardOI()); - outVal.write(out); - - } catch (SerDeException e) { - throw new IOException(e); - } - } - - - - /** - * @return the obj - */ - public Object getObj1() { - return obj1; - } - - /** - * @param obj1 - * the obj to set - */ - public void setObj1(Object obj1) { - this.obj1 = obj1; - } - - /** - * @return the obj - */ - public Object getObj2() { - return obj2; - } - - /** - * @param obj2 - * the obj to set - */ - public void setObj2(Object obj2) { - this.obj2 = obj2; - } - - - @Override - public boolean hasAnyNulls(boolean[] nullsafes) { - if (obj1 == null && (nullsafes == null || !nullsafes[0])) { - return true; - } - if (obj2 == null && (nullsafes == null || !nullsafes[1])) { - return true; - } - return false; - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java new file mode 100644 index 0000000..b2c5ddc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.Writable; + +@SuppressWarnings("deprecation") +public class MapJoinKey { + private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + + private Object[] key; + + public MapJoinKey(Object[] key) { + this.key = key; + } + public MapJoinKey() { + this(EMPTY_OBJECT_ARRAY); + } + + public Object[] getKey() { + return key; + } + public boolean hasAnyNulls(boolean[] nullsafes){ + if (key != null && key.length > 0) { + for (int i = 0; i < key.length; i++) { + if (key[i] == null && (nullsafes == null || !nullsafes[i])) { + return true; + } + } + } + return false; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(key); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MapJoinKey other = (MapJoinKey) obj; + if (!Arrays.equals(key, other.key)) + return false; + return true; + } + @SuppressWarnings("unchecked") + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + throws IOException, SerDeException { + SerDe serde = context.getSerDe(); + container.readFields(in); + List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), + serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if(value == null) { + key = EMPTY_OBJECT_ARRAY; + } else { + key = value.toArray(); + } + } + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + throws IOException, SerDeException { + SerDe serde = context.getSerDe(); + ObjectInspector objectInspector = context.getStandardOI(); + Writable container = serde.serialize(key, objectInspector); + container.write(out); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java deleted file mode 100644 index 50aa9e9..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.io.Writable; - -/** - * Map Join Object used for both key. - */ -public class MapJoinObjectKey extends AbstractMapJoinKey { - - - protected transient Object[] obj; - - public MapJoinObjectKey() { - } - - /** - * @param obj - */ - public MapJoinObjectKey(Object[] obj) { - this.obj = obj; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapJoinObjectKey) { - MapJoinObjectKey mObj = (MapJoinObjectKey) o; - Object[] mObjArray = mObj.getObj(); - if ((obj == null) && (mObjArray == null)) { - return true; - } - if ((obj != null) && (mObjArray != null)) { - if (obj.length == mObjArray.length) { - for (int i = 0; i < obj.length; i++) { - if (obj[i] == null) { - return mObjArray[i] == null; - } - if (!obj[i].equals(mObjArray[i])) { - return false; - } - } - return true; - } - } - } - return false; - } - - @Override - public int hashCode() { - int hashCode; - if (obj == null) { - hashCode = metadataTag; - } else { - hashCode = 1; - - for (int i = 0; i < obj.length; i++) { - Object o = obj[i]; - hashCode = 31 * hashCode + (o == null ? 0 : o.hashCode()); - } - - } - return hashCode; - } - - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - try { - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - Writable val = ctx.getSerDe().getSerializedClass().newInstance(); - val.readFields(in); - ArrayList list = (ArrayList) ObjectInspectorUtils.copyToStandardObject(ctx - .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE); - if(list == null){ - obj = new ArrayList(0).toArray(); - }else{ - obj = list.toArray(); - } - - } catch (Exception e) { - throw new IOException(e); - } - - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - try { - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - // Different processing for key and value - Writable outVal = ctx.getSerDe().serialize(obj, ctx.getStandardOI()); - outVal.write(out); - } catch (SerDeException e) { - throw new IOException(e); - } - } - - - /** - * @return the obj - */ - public Object[] getObj() { - return obj; - } - - /** - * @param obj - * the obj to set - */ - public void setObj(Object[] obj) { - this.obj = obj; - } - - @Override - public boolean hasAnyNulls(boolean[] nullsafes){ - if (obj != null && obj.length> 0) { - for (int i = 0; i < obj.length; i++) { - if (obj[i] == null && (nullsafes == null || !nullsafes[i])) { - return true; - } - } - } - return false; - - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java new file mode 100644 index 0000000..f47d481 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectSerDeContext.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; + +@SuppressWarnings("deprecation") +public class MapJoinObjectSerDeContext { + private final ObjectInspector standardOI; + private final SerDe serde; + private final boolean hasFilter; + + public MapJoinObjectSerDeContext(SerDe serde, boolean hasFilter) + throws SerDeException { + this.serde = serde; + this.hasFilter = hasFilter; + this.standardOI = ObjectInspectorUtils.getStandardObjectInspector(serde.getObjectInspector(), + ObjectInspectorCopyOption.WRITABLE); + } + + /** + * @return the standardOI + */ + public ObjectInspector getStandardOI() { + return standardOI; + } + + /** + * @return the serde + */ + public SerDe getSerDe() { + return serde; + } + + public boolean hasFilterTag() { + return hasFilter; + } + + @Override + public String toString() { + return "MapJoinObjectSerDeContext [standardOI=" + standardOI + ", serde=" + serde + + ", hasFilter=" + hasFilter + "]"; + } + +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java deleted file mode 100644 index e9e7d5c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.io.Writable; - -/** - * Map Join Object used for both key and value. - */ -public class MapJoinObjectValue implements Externalizable { - - protected transient int metadataTag; - protected transient MapJoinRowContainer obj; - - protected transient byte aliasFilter = (byte) 0xff; - - public MapJoinObjectValue() { - - } - - /** - * @param metadataTag - * @param obj - */ - public MapJoinObjectValue(int metadataTag, MapJoinRowContainer obj) { - this.metadataTag = metadataTag; - this.obj = obj; - } - - public byte getAliasFilter() { - return aliasFilter; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapJoinObjectValue) { - MapJoinObjectValue mObj = (MapJoinObjectValue) o; - - if (mObj.getMetadataTag() == metadataTag) { - if ((obj == null) && (mObj.getObj() == null)) { - return true; - } - if ((obj != null) && (mObj.getObj() != null) && (mObj.getObj().equals(obj))) { - return true; - } - } - } - return false; - } - - @Override - public int hashCode() { - return (obj == null) ? 0 : obj.hashCode(); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - try { - - metadataTag = in.readInt(); - - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - int sz = in.readInt(); - MapJoinRowContainer res = new MapJoinRowContainer(); - if (sz > 0) { - int numCols = in.readInt(); - if (numCols > 0) { - for (int pos = 0; pos < sz; pos++) { - Writable val = ctx.getSerDe().getSerializedClass().newInstance(); - val.readFields(in); - - ArrayList memObj = (ArrayList) ObjectInspectorUtils - .copyToStandardObject(ctx.getSerDe().deserialize(val), ctx.getSerDe() - .getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); - - if (memObj == null) { - res.add(new ArrayList(0).toArray()); - } else { - Object[] array = memObj.toArray(); - res.add(array); - if (ctx.hasFilterTag()) { - aliasFilter &= ((ShortWritable)array[array.length - 1]).get(); - } - } - } - } else { - for (int i = 0; i < sz; i++) { - res.add(new ArrayList(0).toArray()); - } - } - } - obj = res; - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - try { - - out.writeInt(metadataTag); - - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - // Different processing for key and value - MapJoinRowContainer v = obj; - out.writeInt(v.size()); - if (v.size() > 0) { - Object[] row = v.first(); - out.writeInt(row.length); - - if (row.length > 0) { - for (; row != null; row = v.next()) { - Writable outVal = ctx.getSerDe().serialize(row, ctx.getStandardOI()); - outVal.write(out); - } - } - } - } catch (SerDeException e) { - throw new IOException(e); - } catch (HiveException e) { - throw new IOException(e); - } - } - - /** - * @return the metadataTag - */ - public int getMetadataTag() { - return metadataTag; - } - - /** - * @param metadataTag - * the metadataTag to set - */ - public void setMetadataTag(int metadataTag) { - this.metadataTag = metadataTag; - } - - /** - * @return the obj - */ - public MapJoinRowContainer getObj() { - return obj; - } - - /** - * @param obj - * the obj to set - */ - public void setObj(MapJoinRowContainer obj) { - this.obj = obj; - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java index 67aa108..971d168 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java @@ -18,30 +18,46 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.AbstractList; import java.util.ArrayList; +import java.util.ConcurrentModificationException; import java.util.List; -import org.apache.hadoop.hive.ql.metadata.HiveException; - -public class MapJoinRowContainer extends AbstractRowContainer { - - private List list; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.Writable; +@SuppressWarnings("deprecation") +public class MapJoinRowContainer extends AbstractRowContainer> { + private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + + private final List> list; private int index; + private byte aliasFilter = (byte) 0xff; public MapJoinRowContainer() { index = 0; - list = new ArrayList(1); - } + list = new ArrayList>(1); + } @Override - public void add(Row t) throws HiveException { + public void add(List t) { list.add(t); } + public void add(Object[] t) { + add(toList(t)); + } @Override - public Row first() throws HiveException { + public List first() { index = 0; if (index < list.size()) { return list.get(index); @@ -50,13 +66,12 @@ public Row first() throws HiveException { } @Override - public Row next() throws HiveException { + public List next() { index++; if (index < list.size()) { return list.get(index); } return null; - } /** @@ -73,28 +88,88 @@ public int size() { * Remove all elements in the RowContainer. */ @Override - public void clear() throws HiveException { + public void clear() { list.clear(); index = 0; } - - public List getList() { - return list; + + public byte getAliasFilter() { + return aliasFilter; } - - public void setList(List list) { - this.list = list; + + public MapJoinRowContainer copy() { + MapJoinRowContainer result = new MapJoinRowContainer(); + for(List item : list) { + result.add(item); + } + return result; } - - public void reset(MapJoinRowContainer other) throws HiveException { - list.clear(); - Object[] obj; - for (obj = other.first(); obj != null; obj = other.next()) { - ArrayList ele = new ArrayList(obj.length); - for (int i = 0; i < obj.length; i++) { - ele.add(obj[i]); + + @SuppressWarnings({"unchecked"}) + public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) + throws IOException, SerDeException { + clear(); + SerDe serde = context.getSerDe(); + int numRows = in.readInt(); + for (int rowIndex = 0; rowIndex < numRows; rowIndex++) { + container.readFields(in); + List value = (List)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container), + serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE); + if(value == null) { + add(toList(EMPTY_OBJECT_ARRAY)); + } else { + Object[] valuesArray = value.toArray(); + if (context.hasFilterTag()) { + aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get(); + } + add(toList(valuesArray)); } - list.add((Row) ele); } } + + public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) + throws IOException, SerDeException { + SerDe serde = context.getSerDe(); + ObjectInspector valueObjectInspector = context.getStandardOI(); + int numRows = size(); + int numRowsWritten = 0; + out.writeInt(numRows); + for (List row = first(); row != null; row = next()) { + serde.serialize(row.toArray(), valueObjectInspector).write(out); + ++numRowsWritten; + } + if(numRows != size()) { + throw new ConcurrentModificationException("Values was modifified while persisting"); + } + if(numRowsWritten != numRows) { + throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten); + } + } + + private List toList(Object[] array) { + return new NoCopyingArrayList(array); + } + /** + * In this use case our objects will not be modified + * so we don't care about copying in and out. + */ + private static class NoCopyingArrayList extends AbstractList { + private Object[] array; + public NoCopyingArrayList(Object[] array) { + this.array = array; + } + @Override + public Object get(int index) { + return array[index]; + } + + @Override + public int size() { + return array.length; + } + + public Object[] toArray() { + return array; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java deleted file mode 100644 index 4bff936..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.persistence; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.io.Writable; - -public class MapJoinSingleKey extends AbstractMapJoinKey { - - - protected transient Object obj; - - public MapJoinSingleKey() { - } - - /** - * @param obj - */ - public MapJoinSingleKey(Object obj) { - this.obj = obj; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapJoinSingleKey) { - MapJoinSingleKey mObj = (MapJoinSingleKey) o; - Object key = mObj.getObj(); - if ((obj == null) && (key == null)) { - return true; - } - if ((obj != null) && (key != null)) { - if (obj.equals(key)) { - return true; - } - } - } - return false; - } - - @Override - public int hashCode() { - int hashCode; - if (obj == null) { - hashCode = metadataTag; - } else { - hashCode = 31 + obj.hashCode(); - } - return hashCode; - } - - @Override - public void readExternal(ObjectInput in) - throws IOException, ClassNotFoundException { - try { - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - Writable val = ctx.getSerDe().getSerializedClass().newInstance(); - val.readFields(in); - - - - ArrayList list = (ArrayList) ObjectInspectorUtils.copyToStandardObject(ctx - .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE); - - if (list == null) { - obj = null; - System.out.println("read empty back"); - } else { - obj = list.get(0); - } - - } catch (Exception e) { - throw new IOException(e); - } - - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - try { - // out.writeInt(metadataTag); - // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get( - Integer.valueOf(metadataTag)); - - ArrayList list = MapJoinMetaData.getList(); - list.add(obj); - - // Different processing for key and value - Writable outVal = ctx.getSerDe().serialize(list, ctx.getStandardOI()); - outVal.write(out); - - } catch (SerDeException e) { - throw new IOException(e); - } - } - - - - /** - * @return the obj - */ - public Object getObj() { - return obj; - } - - /** - * @param obj - * the obj to set - */ - public void setObj(Object obj) { - this.obj = obj; - } - - @Override - public boolean hasAnyNulls(boolean[] nullsafes) { - if (obj == null && (nullsafes == null || !nullsafes[0])) { - return true; - } - return false; - } - - - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java new file mode 100644 index 0000000..9ce0ae6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.util.Map; +import java.util.Set; + +public interface MapJoinTableContainer { + + public int size(); + + public MapJoinRowContainer get(MapJoinKey key); + + public void put(MapJoinKey key, MapJoinRowContainer value); + + public Set> entrySet(); + + public Map getMetaData(); + + public void clear(); + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java new file mode 100644 index 0000000..06151d5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Constructor; +import java.util.ConcurrentModificationException; +import java.util.Map; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.Writable; + +@SuppressWarnings("deprecation") +public class MapJoinTableContainerSerDe { + + private final MapJoinObjectSerDeContext keyContext; + private final MapJoinObjectSerDeContext valueContext; + public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext, + MapJoinObjectSerDeContext valueContext) { + this.keyContext = keyContext; + this.valueContext = valueContext; + } + @SuppressWarnings({"unchecked"}) + public MapJoinTableContainer load(ObjectInputStream in) + throws HiveException { + SerDe keySerDe = keyContext.getSerDe(); + SerDe valueSerDe = valueContext.getSerDe(); + MapJoinTableContainer tableContainer; + try { + String name = in.readUTF(); + Map metaData = (Map) in.readObject(); + tableContainer = create(name, metaData); + } catch (IOException e) { + throw new HiveException("IO error while trying to create table container", e); + } catch (ClassNotFoundException e) { + throw new HiveException("Class Initialization error while trying to create table container", e); + } + try { + Writable keyContainer = keySerDe.getSerializedClass().newInstance(); + Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); + int numKeys = in.readInt(); + for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { + MapJoinKey key = new MapJoinKey(); + key.read(keyContext, in, keyContainer); + MapJoinRowContainer values = new MapJoinRowContainer(); + values.read(valueContext, in, valueContainer); + tableContainer.put(key, values); + } + return tableContainer; + } catch (IOException e) { + throw new HiveException("IO error while trying to create table container", e); + } catch(Exception e) { + throw new HiveException("Error while trying to create table container", e); + } + } + public void persist(ObjectOutputStream out, MapJoinTableContainer tableContainer) + throws HiveException { + int numKeys = tableContainer.size(); + try { + out.writeUTF(tableContainer.getClass().getName()); + out.writeObject(tableContainer.getMetaData()); + out.writeInt(numKeys); + for(Map.Entry entry : tableContainer.entrySet()) { + entry.getKey().write(keyContext, out); + entry.getValue().write(valueContext, out); + } + } catch (SerDeException e) { + String msg = "SerDe error while attempting to persist table container"; + throw new HiveException(msg, e); + } catch(IOException e) { + String msg = "IO error while attempting to persist table container"; + throw new HiveException(msg, e); + } + if(numKeys != tableContainer.size()) { + throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer); + } + } + + public static void persistDummyTable(ObjectOutputStream out) throws IOException { + MapJoinTableContainer tableContainer = new HashMapWrapper(); + out.writeUTF(tableContainer.getClass().getName()); + out.writeObject(tableContainer.getMetaData()); + out.writeInt(tableContainer.size()); + } + + private MapJoinTableContainer create(String name, Map metaData) throws HiveException { + try { + @SuppressWarnings("unchecked") + Class clazz = (Class) Class.forName(name); + Constructor constructor = clazz.getDeclaredConstructor(Map.class); + return constructor.newInstance(metaData); + } catch (Exception e) { + String msg = "Error while attemping to create table container" + + " of type: " + name + ", with metaData: " + metaData; + throw new HiveException(msg, e); + } + } +} \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java deleted file mode 100644 index 60967fc..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.util.HashMap; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; -import org.apache.hadoop.hive.ql.metadata.HiveException; - -/** - * TestHashMapWrapper. - * - */ -public class TestHashMapWrapper extends TestCase { - - public void testHashMapWrapper() throws Exception { - - HashMap mem_map = new HashMap(); - mem_map.put("k1", "v1"); - mem_map.put("k2", "v2"); - mem_map.put("k3", "v3"); - mem_map.put("k4", "v4"); - - try { - // NO cache - HashMapWrapper wrapper = new HashMapWrapper(0); - insertAll(wrapper, mem_map); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // cache size = 1 - wrapper = new HashMapWrapper(1); - insertAll(wrapper, mem_map); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // cache size = 2 - wrapper = new HashMapWrapper(2); - insertAll(wrapper, mem_map); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // cache size = 4 - wrapper = new HashMapWrapper(4); - insertAll(wrapper, mem_map); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // default cache size (25000) - wrapper = new HashMapWrapper(); - insertAll(wrapper, mem_map); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // check mixed put/remove/get functions - wrapper = new HashMapWrapper(2); - insertAll(wrapper, mem_map); - wrapper.remove("k3"); // k3 is in HTree - mem_map.remove("k3"); - assertTrue(mem_map.size() == 3); - checkAll(wrapper, mem_map); - - wrapper.remove("k1"); - mem_map.remove("k1"); - checkAll(wrapper, mem_map); - - String v4 = wrapper.get("k4"); - assertTrue(v4 != null); - assert (v4.equals("v4")); - - wrapper.remove("k4"); - mem_map.remove("k4"); - checkAll(wrapper, mem_map); - - wrapper.put("k5", "v5"); - mem_map.put("k5", "v5"); - checkAll(wrapper, mem_map); - - wrapper.put("k6", "v6"); - mem_map.put("k6", "v6"); - checkAll(wrapper, mem_map); - - wrapper.put("k6", "v61"); - mem_map.put("k6", "v61"); - checkAll(wrapper, mem_map); - - wrapper.remove("k6"); - mem_map.remove("k6"); - checkAll(wrapper, mem_map); - - // get k1, k2 to main memory - wrapper.get("k1"); - wrapper.get("k2"); - // delete k1 so that cache is half empty - wrapper.remove("k1"); - mem_map.remove("k1"); - // put new pair (k6, v7) so that it will be in persistent hash - wrapper.put("k6", "v7"); - mem_map.put("k6", "v7"); - checkAll(wrapper, mem_map); - - // test clear - wrapper.clear(); - mem_map.clear(); - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - - // insert 3,000 pairs random testing - wrapper = new HashMapWrapper(1000); - for (int i = 0; i < 3000; ++i) { - String k = "k" + i; - String v = "v" + i; - wrapper.put(k, v); - mem_map.put(k, v); - } - checkAll(wrapper, mem_map); - System.out.println("Finished inserting 3000 pairs."); - - // do 10,000 random get/remove operations - Random rand = new Random(12345678); - for (int i = 0; i < 10000; ++i) { - int j = rand.nextInt(3000); - String k = "k" + j; - String v; - - int command = rand.nextInt(3); - switch (command) { - case 0: // remove - // System.out.println("removing " + k);// uncomment this for debugging - wrapper.remove(k); - mem_map.remove(k); - break; - case 1: // get - // System.out.println("getting " + k);// uncomment this for debugging - v = wrapper.get(k); - String v2 = mem_map.get(k); - assertTrue( - "one of them doesn't exists or different values from two hash tables", - v == null && v2 == null || v.equals(v2)); - break; - case 2: // put - v = "v" + rand.nextInt(3000); - // System.out.println("putting (" + k + ", " + v);// uncomment this - // for debugging - wrapper.put(k, v); - mem_map.put(k, v); - break; - } - // checkAll(wrapper, mem_map); // uncomment this for debugging - } - checkAll(wrapper, mem_map); - wrapper.close(); // clean up temporary files - } catch (Exception e) { - e.printStackTrace(); - System.out.println(e.toString()); - assertTrue("Exception should not be thrown.", false); - } - System.out.println("TestHashMapWrapper successful"); - } - - private void insertAll(HashMapWrapper hashTable, - HashMap map) throws HiveException { - - for (String k : map.keySet()) { - String v = map.get(k); - hashTable.put(k, v); - } - } - - private void checkAll(HashMapWrapper hashTable, - HashMap map) throws HiveException { - - // check each item in the HashMapWrapper was actually inserted - for (String k : hashTable.keySet()) { - String map_val = hashTable.get(k); - String val = map.get(k); - assertTrue( - "some HashMapWrapper value is not in main memory HashMap: map_val = " - + map_val + "; val = " + val, map_val != null && val != null); - assertTrue( - "value in HashMapWrapper is not the same as MM HashMap: map_val = " - + map_val + "; val = " + val, val.equals(map_val)); - } - - // check all inserted elements are in HashMapWrapper - for (String k : map.keySet()) { - String map_val = hashTable.get(k); - String val = map.get(k); - assertTrue("Some MM HashMap key is not in HashMapWrapper: map_val = " - + map_val + "; val = " + val, map_val != null && val != null); - assertTrue("Value in MM HashMap is not in HashMapWrapper: map_val = " - + map_val + "; val = " + val, val.equals(map_val)); - } - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java new file mode 100644 index 0000000..595ffa6 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.junit.Before; +import org.junit.Test; + +public class TestMapJoinMemoryExhaustionHandler { + private static final Log LOG = LogFactory.getLog(TestMapJoinMemoryExhaustionHandler.class); + + private LogHelper logHelper; + + @Before + public void setup() { + logHelper = new LogHelper(LOG); + } + @Test(expected=MapJoinMemoryExhaustionException.class) + public void testAbort() throws MapJoinMemoryExhaustionException { + MapJoinMemoryExhaustionHandler handler = new MapJoinMemoryExhaustionHandler(logHelper, 0.01d); + List memoryConsumer = new ArrayList(); + while(true) { + handler.checkMemoryStatus(1, 1); + memoryConsumer.add(new byte[5 * 1024 * 1024]); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java new file mode 100644 index 0000000..c541ad2 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinEqualityTableContainer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import junit.framework.Assert; + +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +public class TestMapJoinEqualityTableContainer { + + private static final MapJoinKey KEY1 = new MapJoinKey(new Object[] {new Text("key1")}); + private static final MapJoinKey KEY2 = new MapJoinKey(new Object[] {new Text("key2")}); + private static final MapJoinKey KEY3 = new MapJoinKey(new Object[] {new Text("key3")}); + private static final MapJoinKey KEY4 = new MapJoinKey(new Object[] {new Text("key4")}); + private static final Object[] VALUE = new Object[] {new Text("value")}; + private MapJoinTableContainer container; + private MapJoinRowContainer rowContainer; + @Before + public void setup() throws Exception { + rowContainer = new MapJoinRowContainer(); + rowContainer.add(VALUE); + container = new HashMapWrapper(); + } + @Test + public void testContainerBasics() throws Exception { + container.put(KEY1, rowContainer); + container.put(KEY2, rowContainer); + container.put(KEY3, rowContainer); + container.put(KEY4, rowContainer); + Assert.assertEquals(4, container.size()); + Map localContainer = new HashMap(); + for(Entry entry : container.entrySet()) { + localContainer.put(entry.getKey(), entry.getValue()); + } + Utilities.testEquality(container.get(KEY1), localContainer.get(KEY1)); + Utilities.testEquality(container.get(KEY2), localContainer.get(KEY2)); + Utilities.testEquality(container.get(KEY3), localContainer.get(KEY3)); + Utilities.testEquality(container.get(KEY4), localContainer.get(KEY4)); + container.clear(); + Assert.assertEquals(0, container.size()); + Assert.assertTrue(container.entrySet().isEmpty()); + } +} \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java new file mode 100644 index 0000000..a103a51 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinKey.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import junit.framework.Assert; + +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class TestMapJoinKey { + + @Test + public void testEqualityHashCode() throws Exception { + MapJoinKey key1 = new MapJoinKey(new String[] {"key"}); + MapJoinKey key2 = new MapJoinKey(new String[] {"key"}); + Utilities.testEquality(key1, key2); + key1 = new MapJoinKey(new Object[] {148, null}); + key2 = new MapJoinKey(new Object[] {148, null}); + Utilities.testEquality(key1, key2); + key1 = new MapJoinKey(new Object[] {null, "key1"}); + key2 = new MapJoinKey(new Object[] {null, "key2"}); + Assert.assertFalse(key1.equals(key2)); + } + @Test + public void testHasAnyNulls() throws Exception { + MapJoinKey key = new MapJoinKey(new String[] {"key", null}); + Assert.assertTrue(key.hasAnyNulls(null)); + // field 1 is not null safe + Assert.assertTrue(key.hasAnyNulls(new boolean[] { false, false })); + // field 1 is null safe + Assert.assertFalse(key.hasAnyNulls(new boolean[] { false, true })); + Assert.assertFalse(key.hasAnyNulls(new boolean[] { true, true })); + } + @Test + public void testSerialization() throws Exception { + MapJoinKey key1 = new MapJoinKey(new Object[] {new Text("field0"), null, new Text("field2")}); + MapJoinKey key2 = Utilities.serde(key1, "f0,f1,f2", "string,string,string"); + Utilities.testEquality(key1, key2); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java new file mode 100644 index 0000000..21de0f5 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinRowContainer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.util.Arrays; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class TestMapJoinRowContainer { + + @Test + public void testSerialization() throws Exception { + MapJoinRowContainer container1 = new MapJoinRowContainer(); + container1.add(new Object[]{ new Text("f0"), null, new ShortWritable((short)0xf)}); + container1.add(Arrays.asList(new Object[]{ null, new Text("f1"), new ShortWritable((short)0xf)})); + container1.add(new Object[]{ null, null, new ShortWritable((short)0xf)}); + container1.add(Arrays.asList(new Object[]{ new Text("f0"), new Text("f1"), new ShortWritable((short)0x1)})); + MapJoinRowContainer container2 = Utilities.serde(container1, "f0,f1,filter", "string,string,smallint"); + Utilities.testEquality(container1, container2); + Assert.assertEquals(4, container1.size()); + Assert.assertEquals(1, container2.getAliasFilter()); + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java new file mode 100644 index 0000000..61c5741 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestMapJoinTableContainer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Properties; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +public class TestMapJoinTableContainer { + + private static final Object[] KEY = new Object[] {new Text("key")}; + private static final Object[] VALUE = new Object[] {new Text("value")}; + private ByteArrayOutputStream baos; + private ObjectOutputStream out; + private ObjectInputStream in; + private MapJoinTableContainer container; + private MapJoinTableContainerSerDe containerSerde; + private MapJoinKey key; + private MapJoinRowContainer rowContainer; + @Before + public void setup() throws Exception { + key = new MapJoinKey(KEY); + rowContainer = new MapJoinRowContainer(); + rowContainer.add(VALUE); + baos = new ByteArrayOutputStream(); + out = new ObjectOutputStream(baos); + + LazyBinarySerDe keySerde = new LazyBinarySerDe(); + Properties keyProps = new Properties(); + keyProps.put(serdeConstants.LIST_COLUMNS, "v1"); + keyProps.put(serdeConstants.LIST_COLUMN_TYPES, "string"); + keySerde.initialize(null, keyProps); + LazyBinarySerDe valueSerde = new LazyBinarySerDe(); + Properties valueProps = new Properties(); + valueProps.put(serdeConstants.LIST_COLUMNS, "v1"); + valueProps.put(serdeConstants.LIST_COLUMN_TYPES, "string"); + valueSerde.initialize(null, keyProps); + containerSerde = new MapJoinTableContainerSerDe( + new MapJoinObjectSerDeContext(keySerde, false), + new MapJoinObjectSerDeContext(valueSerde, false)); + container = new HashMapWrapper(); + } + + @Test + public void testSerialization() throws Exception { + container.put(key, rowContainer); + containerSerde.persist(out, container); + out.close(); + in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); + container = containerSerde.load(in); + Utilities.testEquality(rowContainer, container.get(key)); + } + @Test + public void testDummyContainer() throws Exception { + MapJoinTableContainerSerDe.persistDummyTable(out); + out.close(); + in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); + container = containerSerde.load(in); + Assert.assertEquals(0, container.size()); + Assert.assertTrue(container.entrySet().isEmpty()); + } +} \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java new file mode 100644 index 0000000..2cb1ac3 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/Utilities.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import java.util.Properties; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.io.BytesWritable; + +class Utilities { + + static void testEquality(MapJoinKey key1, MapJoinKey key2) { + Assert.assertEquals(key1.hashCode(), key2.hashCode()); + Assert.assertEquals(key1, key2); + Assert.assertEquals(key1.getKey().length, key2.getKey().length); + int length = key1.getKey().length; + for (int i = 0; i row1 = container1.first(); + List row2 = container2.first(); + for (; row1 != null && row2 != null; row1 = container1.next(), row2 = container2.next()) { + Assert.assertEquals(row1, row2); + } + } + + static MapJoinRowContainer serde(MapJoinRowContainer container, String columns, String types) + throws Exception { + MapJoinRowContainer result = new MapJoinRowContainer(); + ByteArrayInputStream bais; + ObjectInputStream in; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + LazyBinarySerDe serde = new LazyBinarySerDe(); + Properties props = new Properties(); + props.put(serdeConstants.LIST_COLUMNS, columns); + props.put(serdeConstants.LIST_COLUMN_TYPES, types); + serde.initialize(null, props); + MapJoinObjectSerDeContext context = new MapJoinObjectSerDeContext(serde, true); + container.write(context, out); + out.close(); + bais = new ByteArrayInputStream(baos.toByteArray()); + in = new ObjectInputStream(bais); + result.read(context, in, new BytesWritable()); + return result; + } +} diff --git a/ql/src/test/results/clientpositive/join_nullsafe.q.out b/ql/src/test/results/clientpositive/join_nullsafe.q.out index 2844571..1831b87 100644 --- a/ql/src/test/results/clientpositive/join_nullsafe.q.out +++ b/ql/src/test/results/clientpositive/join_nullsafe.q.out @@ -1032,7 +1032,15 @@ NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +NULL 10 NULL 10 +NULL 35 NULL 35 +NULL 110 NULL 110 +NULL 135 NULL 135 +10 NULL 10 NULL +48 NULL 48 NULL 100 100 100 100 +110 NULL 110 NULL +148 NULL 148 NULL 200 200 200 200 PREHOOK: query: SELECT /*+ MAPJOIN(a) */ * FROM smb_input1 a RIGHT OUTER JOIN smb_input1 b ON a.key <=> b.key ORDER BY a.key, a.value, b.key, b.value PREHOOK: type: QUERY