diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java new file mode 100644 index 0000000..36ae56f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +/** + * Interface that can be used to provide size estimates based on data structures held in memory for an object instance. + */ +public interface MemoryEstimate { + /** + * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel} + * + * @return estimated memory size in bytes + */ + long getEstimatedMemorySize(); +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3400560..705478e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3158,6 +3158,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" + "executors and configured max concurrency."), + LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL("hive.llap.mapjoin.memory.monitor.check.interval", 100000L, + "Check memory usage of mapjoin hash tables after every interval of this many rows. If map join hash table\n" + + "memory usage exceeds (hive.auto.convert.join.noconditionaltask.size * hive.hash.table.inflation.factor)\n" + + "when running in LLAP, tasks will get killed and not retried. Set the value to 0 to disable this feature."), LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + "executors in llap daemon, it would be set to number of executors at runtime.", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java index ff6e7ce..6cf8dbb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java @@ -116,7 +116,7 @@ public static void createEstimators(Object rootObj, HashMap, ObjectEsti addToProcessing(byType, stack, fieldObj, fieldClass); } } - estimator.directSize = JavaDataModel.alignUp( + estimator.directSize = (int) JavaDataModel.alignUp( estimator.directSize, memoryModel.memoryAlign()); } } @@ -454,7 +454,7 @@ protected int estimate(Object obj, HashMap, ObjectEstimator> parent, if (len != 0) { int elementSize = getPrimitiveSize(e.field.getType().getComponentType()); arraySize += elementSize * len; - arraySize = JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); + arraySize = (int) JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign()); } referencedSize += arraySize; break; diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt index 4393c3b..46cbb5b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt @@ -471,7 +471,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt index 7468c2f..2261e1b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt @@ -442,7 +442,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt index 57b7ea5..58d2d22 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt @@ -458,7 +458,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt index 749e97e..515692e 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt @@ -441,7 +441,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt index 9dfc147..c210e4c 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt @@ -81,7 +81,7 @@ public class extends VectorAggregateExpression { @Override public int getVariableSize() { JavaDataModel model = JavaDataModel.get(); - return model.lengthForByteArrayOfSize(bytes.length); + return (int) model.lengthForByteArrayOfSize(bytes.length); } @Override @@ -388,7 +388,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt index 32ecb34..074aefd 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt @@ -443,7 +443,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt index bd0f14d..a89ae0a 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt @@ -433,7 +433,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object(), diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt index dc9d4b1..1e3516b 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt @@ -513,7 +513,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt index 01062a9..b3ec7e9 100644 --- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt +++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt @@ -467,7 +467,7 @@ public class extends VectorAggregateExpression { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + @@ -488,4 +488,4 @@ public class extends VectorAggregateExpression { public void setInputExpression(VectorExpression inputExpression) { this.inputExpression = inputExpression; } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java new file mode 100644 index 0000000..4ad4f98 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +/** + * When this Error is thrown, better not retry. + */ +public class MapJoinMemoryExhaustionError extends Error { + private static final long serialVersionUID = 3678353959830506881L; + public MapJoinMemoryExhaustionError(String msg) { + super(msg); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java deleted file mode 100644 index dbe00b6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.mapjoin; - -import org.apache.hadoop.hive.ql.metadata.HiveException; - - - -public class MapJoinMemoryExhaustionException extends HiveException { - private static final long serialVersionUID = 3678353959830506881L; - public MapJoinMemoryExhaustionException(String msg) { - super(msg); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java index 7fc3226..d5e81e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java @@ -86,17 +86,17 @@ private static long getMaxHeapSize(MemoryMXBean bean) { * * @param tableContainerSize currently table container size * @param numRows number of rows processed - * @throws MapJoinMemoryExhaustionException + * @throws MapJoinMemoryExhaustionError */ public void checkMemoryStatus(long tableContainerSize, long numRows) - throws MapJoinMemoryExhaustionException { + throws MapJoinMemoryExhaustionError { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double percentage = (double) usedMemory / (double) maxHeapSize; String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage); console.printInfo(msg); if(percentage > maxMemoryUsage) { - throw new MapJoinMemoryExhaustionException(msg); + throw new MapJoinMemoryExhaustionError(msg); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 595d1bd..c5d4f9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -385,7 +384,7 @@ public int executeInProcess(DriverContext driverContext) { + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { if (throwable instanceof OutOfMemoryError - || (throwable instanceof MapJoinMemoryExhaustionException)) { + || (throwable instanceof MapJoinMemoryExhaustionError)) { l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 04e24bd..360b639 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -24,6 +24,8 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,7 +48,7 @@ * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class); /* @@ -521,7 +523,18 @@ public int getNumValues() { * @return number of bytes */ public long memorySize() { - return writeBuffers.size() + refs.length * 8 + 100; + return getEstimatedMemorySize(); + } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers.getEstimatedMemorySize(); + size += jdm.lengthForLongArrayOfSize(refs.length); + // 11 primitive1 fields, 2 refs above with alignment + size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign()); + return size; } public void seal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index a3bccc6..adf1a90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -53,7 +53,7 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable { private static final long serialVersionUID = 1L; protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class); - + private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L; // default threshold for using main memory based HashMap private static final int THRESHOLD = 1000000; private static final float LOADFACTOR = 0.75f; @@ -140,6 +140,14 @@ public ReusableGetAdaptor createGetter(MapJoinKey keyTypeFromLoader) { return new GetAdaptor(keyTypeFromLoader); } + @Override + public long getEstimatedMemorySize() { + // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately + // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which + // is very intrusive. So assuming default entry size here. + return size() * DEFAULT_HASHMAP_ENTRY_SIZE; + } + private class GetAdaptor implements ReusableGetAdaptor { private Object[] currentKey; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 04e89e8..6523f00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -118,6 +118,11 @@ private final String spillLocalDirs; + @Override + public long getEstimatedMemorySize() { + return memoryUsed; + } + /** * This class encapsulates the triplet together since they are closely related to each other * The triplet: hashmap (either in memory or on disk), small table container, big table container diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index c86e5f5..014d17a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -72,6 +74,11 @@ implements MapJoinTableContainer, MapJoinTableContainerDirectAccess { private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class); + // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every + // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared + // to writing an instrumentation agent for object size estimation + public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L; + private final BytesBytesMultiHashMap hashMap; /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -147,7 +154,7 @@ public void setNotNullMarkers(byte[] notNullMarkers) { this.notNullMarkers = notNullMarkers; } - public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate { void setKeyValue(Writable key, Writable val) throws SerDeException; /** Get hash value from the key. */ int getHashFromKey() throws SerDeException; @@ -216,6 +223,22 @@ public byte updateStateByte(Byte previousValue) { public int getHashFromKey() throws SerDeException { throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += keySerDe == null ? 0 : jdm.object(); + size += valSerDe == null ? 0 : jdm.object(); + size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object(); + size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object(); + size += jdm.primitive1(); + return size; + } } static class LazyBinaryKvWriter implements KeyValueHelper { @@ -319,6 +342,15 @@ public byte updateStateByte(Byte previousValue) { aliasFilter &= filterGetter.getShort(); return aliasFilter; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += (4 * jdm.object()); + size += jdm.primitive1(); + return size; + } } /* @@ -361,6 +393,15 @@ public int getHashFromKey() throws SerDeException { int keyLength = key.getLength(); return HashCodeUtil.murmurHash(keyBytes, 0, keyLength); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += jdm.object() + (key == null ? 0 : key.getCapacity()); + size += jdm.object() + (val == null ? 0 : val.getCapacity()); + return size; + } } @Override @@ -768,4 +809,19 @@ public boolean hasSpill() { public int size() { return hashMap.size(); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += hashMap.getEstimatedMemorySize(); + size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize(); + size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize(); + size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length); + size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length); + size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length); + size += jdm.arrayList(); // empty list + size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 6d71fef..5ca5ff6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -public interface MapJoinTableContainer { +public interface MapJoinTableContainer extends MemoryEstimate { /** * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7b13e90..7011d23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -146,7 +147,20 @@ public void load(MapJoinTableContainer[] mapJoinTables, } nwayConf.setNumberOfPartitions(numPartitions); } - + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + + "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); + } else { + LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", + noCondTaskSize, inflationFactor); + } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -205,12 +219,32 @@ public void load(MapJoinTableContainer[] mapJoinTables, tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName()); + LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { - tableContainer.putRow( - (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue()); + tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + final long estMemUsage = tableContainer.getEstimatedMemorySize(); + final long threshold = (long) (inflationFactor * noCondTaskSize); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); + if (estMemUsage > effectiveThreshold) { + String msg = "Hash table loading exceeded memory limits." + + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + + " inflationFactor: " + inflationFactor + " threshold: " + threshold + + " effectiveThreshold: " + effectiveThreshold; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + + "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + } + } + } } tableContainer.seal(); LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 486d43a..4242262 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; +import com.google.common.base.Throwables; + /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -189,8 +193,11 @@ protected void initializeAndRunProcessor(Map inputs, } catch (Throwable t) { originalThrowable = t; } finally { - if (originalThrowable != null && originalThrowable instanceof Error) { - LOG.error(StringUtils.stringifyException(originalThrowable)); + if (originalThrowable != null && (originalThrowable instanceof Error || + Throwables.getRootCause(originalThrowable) instanceof Error)) { + LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable)); + getContext().reportFailure(TaskFailureType.FATAL, originalThrowable, + "Cannot recover from this error"); throw new RuntimeException(originalThrowable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java index 630046d..84128e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java @@ -57,7 +57,7 @@ /** * Memory consumed by a set of aggregation buffers */ - private int aggregatorsFixedSize; + private long aggregatorsFixedSize; /** * Array of indexes for aggregators that have variable size @@ -76,7 +76,7 @@ public boolean getHasVariableSize() { * Returns the fixed size consumed by the aggregation buffers * @return */ - public int getAggregatorsFixedSize() { + public long getAggregatorsFixedSize() { return aggregatorsFixedSize; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 5b4c7c3..30916a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -286,7 +286,7 @@ public void close(boolean aborted) throws HiveException { /** * Total per hashtable entry fixed memory (does not depend on key/agg values). */ - private int fixedHashEntrySize; + private long fixedHashEntrySize; /** * Average per hashtable entry variable size memory (depends on key/agg value). diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java index 0866f63..7ab4473 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java @@ -52,7 +52,7 @@ public abstract void aggregateInputSelection(VectorAggregationBufferRow[] aggreg public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException; public abstract ObjectInspector getOutputObjectInspector(); - public abstract int getAggregationBufferFixedSize(); + public abstract long getAggregationBufferFixedSize(); public boolean hasVariableSize() { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java index 74e25ae..4aac9d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java @@ -492,7 +492,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java index 483d9dc..365dcf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java @@ -464,7 +464,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java index 2139eae..52b05ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java @@ -383,7 +383,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { if (bitSetSize < 0) { // Not pretty, but we need a way to get the size try { @@ -396,7 +396,7 @@ public int getAggregationBufferFixedSize() { // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int) JavaDataModel model = JavaDataModel.get(); - int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize), + long bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize), model.memoryAlign()); return JavaDataModel.alignUp( model.object() + bloomFilterSize + model.primitive1() + model.primitive1(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java index d2446d5..b986eb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java @@ -339,7 +339,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { if (aggBufferSize < 0) { // Not pretty, but we need a way to get the size try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java index 494febc..cadb6dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java @@ -259,7 +259,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java index dec88cb..c489f8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java @@ -385,7 +385,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java index 337ba0a..3b66030 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java @@ -142,7 +142,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java index 8cd3506..5388d37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java @@ -508,7 +508,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java index 61d6977..1769dc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java @@ -508,7 +508,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java index b10f66f..a37e3f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java @@ -431,7 +431,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java index 2709b07..61cdeaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java @@ -508,7 +508,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java index 03dce1e..c375461 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java @@ -508,7 +508,7 @@ public ObjectInspector getOutputObjectInspector() { } @Override - public int getAggregationBufferFixedSize() { + public long getAggregationBufferFixedSize() { JavaDataModel model = JavaDataModel.get(); return JavaDataModel.alignUp( model.object() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index 6242daf..b5eab8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -107,4 +107,9 @@ public VectorMapJoinFastBytesHashMap( // Share the same write buffers with our value store. keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers()); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index 1a41961..e779762 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -97,4 +97,9 @@ public VectorMapJoinFastBytesHashMultiSet( keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 331867c..d493319 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -84,4 +84,9 @@ public VectorMapJoinFastBytesHashSet( keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index b93e977..10bc902 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable; @@ -218,4 +219,9 @@ public VectorMapJoinFastBytesHashTable( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); allocateBucketArray(); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index b6db3bc..1f182ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; @@ -88,4 +89,10 @@ public VectorMapJoinFastHashTable( public int size() { return keysAssigned; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + return JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign()); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index 49ecdd1..b015e43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -28,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.tez.TezContext; @@ -68,6 +68,21 @@ public void load(MapJoinTableContainer[] mapJoinTables, Map parentToInput = desc.getParentToInput(); Map parentKeyCounts = desc.getParentKeyCounts(); + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + + "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); + } else { + LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", + noCondTaskSize, inflationFactor); + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -93,15 +108,41 @@ public void load(MapJoinTableContainer[] mapJoinTables, VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = new VectorMapJoinFastTableContainer(desc, hconf, keyCount); + LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName()); + vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(), (BytesWritable)kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && numEntries >= memoryCheckInterval) { + if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); + final long threshold = (long) (inflationFactor * noCondTaskSize); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); + if (estMemUsage > effectiveThreshold) { + String msg = "VectorMapJoin Hash table loading exceeded memory limits." + + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + + " inflationFactor: " + inflationFactor + " threshold: " + threshold + + " effectiveThreshold: " + effectiveThreshold; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + } + } + } + } } vectorMapJoinFastTableContainer.seal(); - mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer; - + mapJoinTables[pos] = vectorMapJoinFastTableContainer; + LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + + ". Small table position: " + pos); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java index be51693..3e9ff84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.WriteBuffers; // Optimized for sequential key lookup. -public class VectorMapJoinFastKeyStore { +public class VectorMapJoinFastKeyStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName()); @@ -165,4 +166,12 @@ public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) { this.writeBuffers = writeBuffers; unsafeReadPos = new WriteBuffers.Position(); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize(); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java index 6fe98f9..d4847b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java @@ -20,6 +20,8 @@ import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -37,7 +39,7 @@ */ public class VectorMapJoinFastLongHashMap extends VectorMapJoinFastLongHashTable - implements VectorMapJoinLongHashMap { + implements VectorMapJoinLongHashMap, MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class); @@ -112,4 +114,9 @@ public VectorMapJoinFastLongHashMap( initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java index 9140aee..566cfa2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java @@ -100,4 +100,9 @@ public VectorMapJoinFastLongHashMultiSet( super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java index d3efb11..fb7ae62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java @@ -96,4 +96,9 @@ public VectorMapJoinFastLongHashSet( super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index 8bfa07c..54e667c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -280,4 +281,18 @@ public VectorMapJoinFastLongHashTable( min = Long.MAX_VALUE; max = Long.MIN_VALUE; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = super.getEstimatedMemorySize(); + size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length); + size += (2 * jdm.primitive2()); + size += (2 * jdm.primitive1()); + size += jdm.object(); + // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024L); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java index add4788..eb08aa9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java @@ -53,4 +53,9 @@ public VectorMapJoinFastMultiKeyHashMap( int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java index faefdbb..56964bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java @@ -52,4 +52,8 @@ public VectorMapJoinFastMultiKeyHashMultiSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java index 5328910..46bafe0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java @@ -52,5 +52,8 @@ public VectorMapJoinFastMultiKeyHashSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } - + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java index f13034f..d04590a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java @@ -43,4 +43,13 @@ public VectorMapJoinFastStringHashMap( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java index 53ad7b4..b24bfdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java @@ -43,4 +43,12 @@ public VectorMapJoinFastStringHashMultiSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java index 723c729..75fae25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java @@ -43,4 +43,12 @@ public VectorMapJoinFastStringHashSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 05f1cf1..2fe4b93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -26,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.tez.runtime.library.api.KeyValueReader; /** * HashTableLoader for Tez constructs the hashtable from records read from @@ -46,7 +45,7 @@ */ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer { - private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName()); private final MapJoinDesc desc; private final Configuration hconf; @@ -219,6 +218,17 @@ public int size() { } @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += vectorMapJoinFastHashTable.getEstimatedMemorySize(); + size += (4 * jdm.primitive1()); + size += (2 * jdm.object()); + size += (jdm.primitive2()); + return size; + } + + @Override public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx) throws SerDeException { // Do nothing in this case. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java index f9c5b34..3cd06e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; @@ -30,7 +31,7 @@ // Supports random access. -public class VectorMapJoinFastValueStore { +public class VectorMapJoinFastValueStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName()); @@ -113,6 +114,11 @@ public WriteBuffers writeBuffers() { return writeBuffers; } + @Override + public long getEstimatedMemorySize() { + return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + } + public static class HashMapResult extends VectorMapJoinHashMapResult { private VectorMapJoinFastValueStore valueStore; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java index c7e585c..9cc9ad4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; @@ -28,7 +29,7 @@ * Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or * hash set). */ -public interface VectorMapJoinHashTable { +public interface VectorMapJoinHashTable extends MemoryEstimate { /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java index 93a89d7..1560807 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java @@ -75,4 +75,9 @@ public VectorMapJoinOptimizedHashSet( MapJoinTableContainer originalTableContainer, ReusableGetAdaptor hashMapRowGetter) { super(originalTableContainer, hashMapRowGetter); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java index 5fe7861..5275e1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -96,4 +97,12 @@ public VectorMapJoinOptimizedHashTable( public int size() { return originalTableContainer.size(); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + size += originalTableContainer == null ? 0 : originalTableContainer.getEstimatedMemorySize(); + size += (2 * JavaDataModel.get().object()); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java index f921b9c..4b46ce0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java @@ -60,4 +60,12 @@ public VectorMapJoinOptimizedStringHashSet(boolean isOuterJoin, super(originalTableContainer, hashMapRowGetter); stringCommon = new VectorMapJoinOptimizedStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024L); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index ad77e87..d0fdb52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -100,6 +100,7 @@ // adjust noconditional task size threshold for LLAP maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf); + joinOp.getConf().setNoConditionalTaskSize(maxSize); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -280,7 +281,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), - joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize()); mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index b2893e7..85d46f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; -import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -80,8 +79,6 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import com.clearspring.analytics.util.Lists; - /** * Implementation of one of the rule-based map join optimization. User passes hints to specify * map-joins and during this optimization, all user specified map joins are converted to MapJoins - @@ -434,7 +431,8 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, smbJoinDesc.getValueTblDescs(), smbJoinDesc.getValueTblDescs(), smbJoinDesc.getOutputColumnNames(), bigTablePos, smbJoinDesc.getConds(), - smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix()); + smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(), + smbJoinDesc.getNoConditionalTaskSize()); mapJoinDesc.setStatistics(smbJoinDesc.getStatistics()); @@ -1187,7 +1185,7 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op - .getConf().getNoOuterJoin(), dumpFilePrefix); + .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getNoConditionalTaskSize()); mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); mapJoinDescriptor.setTagOrder(tagOrder); mapJoinDescriptor.setNullSafes(desc.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index d375d1b..40c0f3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -1005,7 +1005,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio // 4. We create the join operator with its descriptor JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, - filters, joinExpressions); + filters, joinExpressions, 0); desc.setSemiJoinHints(semiJoinHints); desc.setReversedExprs(reversedExprs); desc.setFilterMap(filterMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 93b8a5d..f78bd7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -281,7 +281,8 @@ public static void processSkewJoin(JoinOperator joinOp, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc, newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor .getOutputColumnNames(), i, joinDescriptor.getConds(), - joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix); + joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix, + joinDescriptor.getNoConditionalTaskSize()); mapJoinDescriptor.setTagOrder(tags); mapJoinDescriptor.setHandleSkewJoin(false); mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 405c3ca..c970611 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -240,7 +240,8 @@ public static void processSkewJoin(JoinOperator joinOp, Task semiJoinHints; + // non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP + protected long noConditionalTaskSize; + public JoinDesc() { } public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, final JoinCondDesc[] conds, final Map> filters, - ExprNodeDesc[][] joinKeys) { + ExprNodeDesc[][] joinKeys, final long noConditionalTaskSize) { this.exprs = exprs; this.outputColumnNames = outputColumnNames; this.noOuterJoin = noOuterJoin; this.conds = conds; this.filters = filters; this.joinKeys = joinKeys; - + this.noConditionalTaskSize = noConditionalTaskSize; resetOrder(); } @@ -152,6 +155,7 @@ public Object clone() { ret.setHandleSkewJoin(handleSkewJoin); ret.setSkewKeyDefinition(getSkewKeyDefinition()); ret.setTagOrder(getTagOrder().clone()); + ret.setNoConditionalTaskSize(getNoConditionalTaskSize()); if (getKeyTableDesc() != null) { ret.setKeyTableDesc((TableDesc) getKeyTableDesc().clone()); } @@ -203,6 +207,7 @@ public JoinDesc(JoinDesc clone) { this.residualFilterExprs = clone.residualFilterExprs; this.statistics = clone.statistics; this.semiJoinHints = clone.semiJoinHints; + this.noConditionalTaskSize = clone.noConditionalTaskSize; } public Map> getExprs() { @@ -700,4 +705,12 @@ public void setSemiJoinHints(Map semiJoinHints) { return semiJoinHints; } + + public long getNoConditionalTaskSize() { + return noConditionalTaskSize; + } + + public void setNoConditionalTaskSize(final long noConditionalTaskSize) { + this.noConditionalTaskSize = noConditionalTaskSize; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 940630c..8da85d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -109,11 +109,12 @@ public MapJoinDesc(MapJoinDesc clone) { } public MapJoinDesc(final Map> keys, - final TableDesc keyTblDesc, final Map> values, - final List valueTblDescs,final List valueFilteredTblDescs, List outputColumnNames, - final int posBigTable, final JoinCondDesc[] conds, - final Map> filters, boolean noOuterJoin, String dumpFilePrefix) { - super(values, outputColumnNames, noOuterJoin, conds, filters, null); + final TableDesc keyTblDesc, final Map> values, + final List valueTblDescs, final List valueFilteredTblDescs, List outputColumnNames, + final int posBigTable, final JoinCondDesc[] conds, + final Map> filters, boolean noOuterJoin, String dumpFilePrefix, + final long noConditionalTaskSize) { + super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize); vectorDesc = null; this.keys = keys; this.keyTblDesc = keyTblDesc; @@ -585,5 +586,4 @@ public SMBJoinOperatorExplainVectorization getSMBJoinVectorization() { } return new SMBJoinOperatorExplainVectorization((SMBJoinDesc) this, vectorDesc); } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java index ec80641..2ebfcb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java @@ -191,7 +191,7 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) @Override public int estimate() { JavaDataModel model = JavaDataModel.get(); - return model.primitive2() * 3 + model.lengthFor(columnType); + return (int) (model.primitive2() * 3 + model.lengthFor(columnType)); } }; @@ -433,11 +433,11 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveExc @Override public int estimate() { JavaDataModel model = JavaDataModel.get(); - return model.lengthFor(columnType) - + model.primitive1() - + model.primitive2() - + ((numDV == null) ? NumDistinctValueEstimator.lengthFor(model, null) : - numDV.lengthFor(model)); + return (int) (model.lengthFor(columnType) + + model.primitive1() + + model.primitive2() + + ((numDV == null) ? NumDistinctValueEstimator.lengthFor(model, null) : + numDV.lengthFor(model))); } protected void initNDVEstimator(int numBitVectors) { @@ -853,10 +853,10 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveExc @Override public int estimate() { JavaDataModel model = JavaDataModel.get(); - return model.primitive1() * 2 + model.primitive2() * 4 + - model.lengthFor(columnType) + - ((numDV == null) ? NumDistinctValueEstimator.lengthFor(model, null) : - numDV.lengthFor(model)); + return (int) (model.primitive1() * 2 + model.primitive2() * 4 + + model.lengthFor(columnType) + + ((numDV == null) ? NumDistinctValueEstimator.lengthFor(model, null) : + numDV.lengthFor(model))); } }; @@ -1160,7 +1160,7 @@ public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveExc @Override public int estimate() { JavaDataModel model = JavaDataModel.get(); - return model.primitive2() * 4 + model.lengthFor(columnType); + return (int) (model.primitive2() * 4 + model.lengthFor(columnType)); } }; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java index 16b5b17..1f1227e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/mapjoin/TestMapJoinMemoryExhaustionHandler.java @@ -35,8 +35,8 @@ public void setup() { logHelper = new LogHelper(LOG); } - @Test(expected=MapJoinMemoryExhaustionException.class) - public void testAbort() throws MapJoinMemoryExhaustionException { + @Test(expected=MapJoinMemoryExhaustionError.class) + public void testAbort() throws MapJoinMemoryExhaustionError { MapJoinMemoryExhaustionHandler handler = new MapJoinMemoryExhaustionHandler(logHelper, 0.01d); List memoryConsumer = new ArrayList(); while(true) { diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index a4ecd9f..ece04e0 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.io.WritableUtils; @@ -31,7 +33,7 @@ * The structure storing arbitrary amount of data as a set of fixed-size byte buffers. * Maintains read and write pointers for convenient single-threaded writing/reading. */ -public final class WriteBuffers implements RandomAccessOutput { +public final class WriteBuffers implements RandomAccessOutput, MemoryEstimate { private final ArrayList writeBuffers = new ArrayList(1); /** Buffer size in writeBuffers */ private final int wbSize; @@ -39,7 +41,7 @@ private final long offsetMask; private final long maxSize; - public static class Position { + public static class Position implements MemoryEstimate { private byte[] buffer = null; private int bufferIndex = 0; private int offset = 0; @@ -47,6 +49,14 @@ public void clear() { buffer = null; bufferIndex = offset = -1; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long memSize = buffer == null ? 0 : jdm.lengthForByteArrayOfSize(buffer.length); + memSize += (2 * jdm.primitive1()); + return memSize; + } } Position writePos = new Position(); // Position where we'd write @@ -610,6 +620,17 @@ public long size() { return writeBuffers.size() * (long) wbSize; } + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers == null ? 0 : jdm.arrayList() + (writeBuffers.size() * jdm.lengthForByteArrayOfSize(wbSize)); + size += (3 * jdm.primitive2()); + size += writePos == null ? 0 : writePos.getEstimatedMemorySize(); + size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize(); + return size; + } + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ public Position getUnsafeReadPosition() { return unsafeReadPos; diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java index 4a745e4..df952cb 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java @@ -212,7 +212,7 @@ public int memoryAlign() { public abstract int memoryAlign(); // ascii string - public int lengthFor(String string) { + public long lengthFor(String string) { return lengthForStringOfLength(string.length()); } @@ -228,8 +228,8 @@ public int primitive2() { return PRIMITIVES2; } - public static int alignUp(int value, int align) { - return (value + align - 1) & ~(align - 1); + public static long alignUp(long value, long align) { + return (value + align - 1L) & ~(align - 1L); } private static final Logger LOG = LoggerFactory.getLogger(JavaDataModel.class); @@ -282,35 +282,35 @@ public static int round(int size) { return ((size + 8) >> 3) << 3; } - private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) { + private long lengthForPrimitiveArrayOfSize(int primitiveSize, long length) { return alignUp(array() + primitiveSize*length, memoryAlign()); } - public int lengthForByteArrayOfSize(int length) { + public long lengthForByteArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length); } - public int lengthForObjectArrayOfSize(int length) { + public long lengthForObjectArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(ref(), length); } - public int lengthForLongArrayOfSize(int length) { + public long lengthForLongArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(primitive2(), length); } - public int lengthForDoubleArrayOfSize(int length) { + public long lengthForDoubleArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(primitive2(), length); } - public int lengthForIntArrayOfSize(int length) { + public long lengthForIntArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(primitive1(), length); } - public int lengthForBooleanArrayOfSize(int length) { + public long lengthForBooleanArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length); } - public int lengthForTimestampArrayOfSize(int length) { + public long lengthForTimestampArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(lengthOfTimestamp(), length); } - public int lengthForDateArrayOfSize(int length) { + public long lengthForDateArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(lengthOfDate(), length); } - public int lengthForDecimalArrayOfSize(int length) { + public long lengthForDecimalArrayOfSize(long length) { return lengthForPrimitiveArrayOfSize(lengthOfDecimal(), length); }