diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 200a1ad..dc610f1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1480,6 +1480,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVEQUERYNAME ("hive.query.name", null, "This named is used by Tez to set the dag name. This name in turn will appear on \n" + "the Tez UI representing the work that was done."), + HIVE_KRYO_TRACE_ENABLE("hive.kryo.trace.enable", false, "Enable kryo serialization trace. Trace log messages will" + + " go to System.out by default."), HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true, "Don't create a reducer for enforcing \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index b77948b..225ab5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -126,6 +126,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.esotericsoftware.minlog.Log; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -349,6 +350,11 @@ public Driver(QueryState queryState, String userName) { isParallelEnabled = (conf != null) && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); this.userName = userName; + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_KRYO_TRACE_ENABLE)) { + Log.TRACE(); + } else { + Log.ERROR(); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 7be628e..6335521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -70,6 +70,7 @@ import com.esotericsoftware.kryo.pool.KryoFactory; import com.esotericsoftware.kryo.pool.KryoPool; import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.minlog.Log; /** * Utilities related to serialization and deserialization. @@ -217,6 +218,10 @@ public Registration readClass(Input input) { } } + // NOTE: Do not add any removeFields here. Add it to removeUnwantedFields() method instead. Since the same kryo + // factory is used for cloning of operator tree and serialization of plan, cloning should retain all fields and + // serialization does not need some fields. Fields added to removeUnwantedFields() will get invoked only during plan + // serialization/deserialization. private static KryoFactory factory = new KryoFactory() { public Kryo create() { KryoWithHooks kryo = new KryoWithHooks(); @@ -228,8 +233,6 @@ public Kryo create() { ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( new StdInstantiatorStrategy()); - removeField(kryo, Operator.class, "colExprMap"); - removeField(kryo, AbstractOperatorDesc.class, "statistics"); kryo.register(MapWork.class); kryo.register(ReduceWork.class); kryo.register(TableDesc.class); @@ -279,6 +282,13 @@ public static void releaseKryo(Kryo kryo) { kryoPool.release(kryo); } + private static void removeUnwantedFields(Kryo kryo) { + // these fields are required for cloning of operator during optimization but are not required during serialization + // and deserialization of plan for task execution + removeField(kryo, Operator.class, "colExprMap"); + removeField(kryo, AbstractOperatorDesc.class, "statistics"); + } + private static void removeField(Kryo kryo, Class type, String fieldName) { FieldSerializer fld = new FieldSerializer(kryo, type); fld.removeField(fieldName); @@ -452,6 +462,7 @@ private static void serializePlan(Kryo kryo, Object plan, OutputStream out, bool if (cloningPlan) { serializeObjectByKryo(kryo, plan, out); } else { + removeUnwantedFields(kryo); serializeObjectByKryo(kryo, plan, out); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); @@ -492,6 +503,7 @@ private static void serializePlan(Kryo kryo, Object plan, OutputStream out, bool if (cloningPlan) { plan = deserializeObjectByKryo(kryo, in, planClass); } else { + removeUnwantedFields(kryo); plan = deserializeObjectByKryo(kryo, in, planClass); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN);