diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index e205c08d84..6e46d387ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -38,9 +38,11 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.common.type.TimestampTZ; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -126,6 +128,7 @@ public static void setGlobalHook(Hook hook) { */ private static class KryoWithHooks extends Kryo { private Hook globalHook; + private Configuration conf; @SuppressWarnings({"unchecked", "rawtypes"}) private static final class SerializerWithHook extends com.esotericsoftware.kryo.Serializer { @@ -219,6 +222,15 @@ public Registration readClass(Input input) { T result = super.readObject(input, type, serializer); return ponderGlobalPostReadHook(hook, result); } + + public Kryo withConfig(Configuration conf) { + this.conf = conf; + return this; + } + + public Configuration getConf(){ + return conf; + } } private static final Object FAKE_REFERENCE = new Object(); @@ -277,7 +289,11 @@ public Kryo create() { * @return kryo instance */ public static Kryo borrowKryo() { - Kryo kryo = kryoPool.borrow(); + return borrowKryo(null); + } + + public static Kryo borrowKryo(Configuration conf) { + Kryo kryo = ((KryoWithHooks)kryoPool.borrow()).withConfig(conf); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); return kryo; } @@ -288,6 +304,7 @@ public static Kryo borrowKryo() { * @param kryo - kryo instance to be released */ public static void releaseKryo(Kryo kryo) { + ((KryoWithHooks)kryo).getConf().clear(); //cleanup, it's safe as it was copied while borrowing kryoPool.release(kryo); } @@ -579,10 +596,12 @@ public Map read(Kryo kryo, Input input, Class type) { @Override public MapWork read(Kryo kryo, Input input, Class type) { MapWork mapWork = super.read(kryo, input, type); - // The set methods in MapWork intern the any duplicate strings which is why we call them - // during de-serialization - mapWork.setPathToPartitionInfo(mapWork.getPathToPartitionInfo()); - mapWork.setPathToAliases(mapWork.getPathToAliases()); + + Configuration conf = ((KryoWithHooks) kryo).getConf(); + if (conf != null && conf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "").equalsIgnoreCase("spark")) { + mapWork.internFields(); + } + return mapWork; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index aa62e90c0e..e6ff610651 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -444,7 +444,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } InputStream in = null; - Kryo kryo = SerializationUtilities.borrowKryo(); + Kryo kryo = SerializationUtilities.borrowKryo(conf); try { String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE); if (engine.equals("spark")) { @@ -595,7 +595,7 @@ public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScra } private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { - Kryo kryo = SerializationUtilities.borrowKryo(); + Kryo kryo = SerializationUtilities.borrowKryo(conf); try { setPlanPath(conf, hiveScratchDir); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index ef7e956fc7..9e4efc525f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -913,4 +913,16 @@ public MapExplainVectorization getMapExplainVectorization() { } return new MapExplainVectorization(this); } + + public void internFields() { + LOG.info("interning MapWork fields for optimizing memory consumption"); + // The set methods in MapWork intern the any duplicate strings which is why we call them + // during de-serialization + for (Path p : pathToPartitionInfo.keySet()) { + StringInternUtils.internUriStringsInPath(p); + } + for (Path p : pathToAliases.keySet()) { + StringInternUtils.internUriStringsInPath(p); + } + } }