commit 544634c35fc1e799cfc1025b8be873e442f69b09 Author: Sahil Takiar Date: Mon Jun 18 15:38:13 2018 -0500 HIVE-19937: Intern JobConf objects in Spark tasks diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java index 0d1c6888f4..c7af991932 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java @@ -96,7 +96,7 @@ private Path normalizePath(Path onefile, boolean schemaless) { return path; } - protected String getNominalPath(Path fpath) { + protected Path getNominalPath(Path fpath) { Path nominal = null; boolean schemaless = fpath.toUri().getScheme() == null; for (Path onefile : conf.getPathToAliases().keySet()) { @@ -119,7 +119,7 @@ protected String getNominalPath(Path fpath) { if (nominal == null) { throw new IllegalStateException("Invalid input path " + fpath); } - return nominal.toString(); + return nominal; } public abstract void initEmptyInputChildren(List> children, Configuration hconf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 16d7c519fa..06c68575d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -79,8 +79,7 @@ protected transient long logEveryNRows = 0; // input path --> {operator --> context} - private final Map, MapOpCtx>> opCtxMap = - new HashMap, MapOpCtx>>(); + private final Map, MapOpCtx>> opCtxMap = new HashMap<>(); // child operator --> object inspector (converted OI if it's needed) private final Map, StructObjectInspector> childrenOpToOI = new HashMap, StructObjectInspector>(); @@ -440,10 +439,8 @@ public void setChildren(Configuration hconf) throws Exception { LOG.debug("Adding alias " + alias + " to work list for file " + onefile); } - Map, MapOpCtx> contexts = opCtxMap.get(onefile.toString()); - if (contexts == null) { - opCtxMap.put(onefile.toString(), contexts = new LinkedHashMap, MapOpCtx>()); - } + Map, MapOpCtx> contexts = opCtxMap.computeIfAbsent(onefile, + k -> new LinkedHashMap<>()); if (contexts.containsKey(op)) { continue; } @@ -515,7 +512,7 @@ public void initializeMapOperator(Configuration hconf) throws HiveException { public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map, MapOpCtx> contexts = opCtxMap.get(nominalPath); if (LOG.isInfoEnabled()) { StringBuilder builder = new StringBuilder(); @@ -697,7 +694,7 @@ public OperatorType getType() { public void initializeContexts() { Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map, MapOpCtx> contexts = opCtxMap.get(nominalPath); currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index e03429bc37..d2eeba4612 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -233,13 +234,14 @@ public Kryo create() { kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new ArrayListSubListSerializer()); kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer()); + kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class)); + kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, PartitionDesc.class)); ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( new StdInstantiatorStrategy()); removeField(kryo, AbstractOperatorDesc.class, "colExprMap"); removeField(kryo, AbstractOperatorDesc.class, "statistics"); - kryo.register(MapWork.class); kryo.register(ReduceWork.class); kryo.register(TableDesc.class); kryo.register(UnionOperator.class); @@ -540,6 +542,50 @@ public Map read(Kryo kryo, Input input, Class type) { } } + /** + * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link MapWork} objects in + * order to invoke any string interning code present in the "setter" methods. The fields in {@link + * MapWork} often store paths that contain duplicate strings, so interning them can decrease + * memory significantly. + */ + private static class MapWorkSerializer extends FieldSerializer { + + MapWorkSerializer(Kryo kryo, Class type) { + super(kryo, type); + } + + @Override + public MapWork read(Kryo kryo, Input input, Class type) { + MapWork mapWork = super.read(kryo, input, type); + mapWork.setPathToPartitionInfo(mapWork.getPathToPartitionInfo()); + mapWork.setPathToAliases(mapWork.getPathToAliases()); + return mapWork; + } + } + + /** + * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link PartitionDesc} objects + * in order to invoke any string interning code present in the "setter" methods. {@link + * PartitionDesc} objects are usually stored by {@link MapWork} objects and contain duplicate info + * like input format class names, partition specs, etc. + */ + private static class PartitionDescSerializer extends FieldSerializer { + + PartitionDescSerializer(Kryo kryo, Class type) { + super(kryo, type); + } + + @Override + public PartitionDesc read(Kryo kryo, Input input, Class type) { + PartitionDesc partitionDesc = super.read(kryo, input, type); + partitionDesc.setBaseFileName(partitionDesc.getBaseFileName()); + partitionDesc.setPartSpec(partitionDesc.getPartSpec()); + partitionDesc.setInputFileFormatClass(partitionDesc.getInputFileFormatClass()); + partitionDesc.setOutputFileFormatClass(partitionDesc.getOutputFileFormatClass()); + return partitionDesc; + } + } + /** * Serializes the plan. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index bd70991803..5a903d3617 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -92,7 +92,7 @@ /* * Overall information on this vectorized Map operation. */ - private transient HashMap fileToPartitionContextMap; + private transient HashMap fileToPartitionContextMap; private transient Operator oneRootOperator; @@ -555,7 +555,7 @@ private void internalSetChildren(Configuration hconf) throws Exception { * The Vectorizer class enforces that there is only one TableScanOperator, so * we don't need the more complicated multiple root operator mapping that MapOperator has. */ - fileToPartitionContextMap = new HashMap(); + fileToPartitionContextMap = new HashMap<>(); // Temporary map so we only create one partition context entry. HashMap partitionContextMap = @@ -573,7 +573,7 @@ private void internalSetChildren(Configuration hconf) throws Exception { vectorPartitionContext = partitionContextMap.get(partDesc); } - fileToPartitionContextMap.put(path.toString(), vectorPartitionContext); + fileToPartitionContextMap.put(path, vectorPartitionContext); } // Create list of one. @@ -593,7 +593,7 @@ public void initializeMapOperator(Configuration hconf) throws HiveException { public void initializeContexts() throws HiveException { Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); setupPartitionContextVars(nominalPath); } @@ -602,7 +602,7 @@ public void initializeContexts() throws HiveException { public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); setupPartitionContextVars(nominalPath); @@ -641,7 +641,7 @@ private boolean flushDeserializerBatch() throws HiveException { /* * Setup the context for reading from the next partition file. */ - private void setupPartitionContextVars(String nominalPath) throws HiveException { + private void setupPartitionContextVars(Path nominalPath) throws HiveException { currentVectorPartContext = fileToPartitionContextMap.get(nominalPath); if (currentVectorPartContext == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index fd6cc867c8..0444562ce7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,8 @@ private void updateMrWork(final JobConf job) { pathToPartitionInfo.clear(); for (final Map.Entry entry : mapWork.getPathToPartitionInfo().entrySet()) { // key contains scheme (such as pfile://) and we want only the path portion fix in HIVE-6366 - pathToPartitionInfo.put(Path.getPathWithoutSchemeAndAuthority(entry.getKey()), entry.getValue()); + pathToPartitionInfo.put(StringInternUtils.internUriStringsInPath( + Path.getPathWithoutSchemeAndAuthority(entry.getKey())), entry.getValue()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 5bf0625c09..5a8507004c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -194,6 +194,7 @@ public void setPathToAliases(final LinkedHashMap> pathTo } public void addPathToAlias(Path path, ArrayList aliases){ + StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } @@ -201,6 +202,7 @@ public void addPathToAlias(Path path, String newAlias){ ArrayList aliases = pathToAliases.get(path); if (aliases == null) { aliases = new ArrayList<>(); + StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } aliases.add(newAlias.intern()); @@ -243,6 +245,9 @@ public void removePathToAlias(Path path){ } public void setPathToPartitionInfo(final LinkedHashMap pathToPartitionInfo) { + for (Path p : pathToPartitionInfo.keySet()) { + StringInternUtils.internUriStringsInPath(p); + } this.pathToPartitionInfo = pathToPartitionInfo; } @@ -693,6 +698,10 @@ public void setEventSourceColumnNameMap(Map> map) { return eventSourceColumnTypeMap; } + public void setEventSourceColumnTypeMap(Map> eventSourceColumnTypeMap) { + this.eventSourceColumnTypeMap = eventSourceColumnTypeMap; + } + public Map> getEventSourcePartKeyExprMap() { return eventSourcePartKeyExprMap; } @@ -739,7 +748,7 @@ public BitSet getIncludedBuckets() { public void setIncludedBuckets(BitSet includedBuckets) { // see comment next to the field - this.includedBuckets = includedBuckets.toByteArray(); + this.includedBuckets = includedBuckets == null ? null : includedBuckets.toByteArray(); } public void setVectorizedRowBatch(VectorizedRowBatch vectorizedRowBatch) { @@ -824,7 +833,8 @@ public void setVectorPartitionDescList(List vectorPartition } public void setVectorizationEnabledConditionsMet(ArrayList vectorizationEnabledConditionsMet) { - this.vectorizationEnabledConditionsMet = VectorizationCondition.addBooleans(vectorizationEnabledConditionsMet, true); + this.vectorizationEnabledConditionsMet = vectorizationEnabledConditionsMet == null ? null : VectorizationCondition.addBooleans( + vectorizationEnabledConditionsMet, true); } public List getVectorizationEnabledConditionsMet() { @@ -832,7 +842,8 @@ public void setVectorizationEnabledConditionsMet(ArrayList vectorization } public void setVectorizationEnabledConditionsNotMet(List vectorizationEnabledConditionsNotMet) { - this.vectorizationEnabledConditionsNotMet = VectorizationCondition.addBooleans(vectorizationEnabledConditionsNotMet, false); + this.vectorizationEnabledConditionsNotMet = vectorizationEnabledConditionsNotMet == null ? null : VectorizationCondition.addBooleans( + vectorizationEnabledConditionsNotMet, false); } public List getVectorizationEnabledConditionsNotMet() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 821e428eca..b226ab7b24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -67,7 +67,7 @@ private VectorPartitionDesc vectorPartitionDesc; public void setBaseFileName(String baseFileName) { - this.baseFileName = baseFileName.intern(); + this.baseFileName = StringInternUtils.internIfNotNull(baseFileName); } public PartitionDesc() {