diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1d6a93a..07bca36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -35,6 +35,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; @@ -818,6 +820,42 @@ public Path read(Kryo kryo, Input input, Class type) { } } + private static final Set WARNED_CLASS = new HashSet(); + + private static class SerializableSerializer extends com.esotericsoftware.kryo.Serializer { + + @Override + public void write(Kryo kryo, Output output, Serializable object) { + if (!WARNED_CLASS.add(object.getClass().getName())) { + LOG.warn(object.getClass() + " is not supported by kryo.. Needs custom serializer for it"); + } + try { + ObjectOutputStream oo = new ObjectOutputStream(output); + try { + oo.writeObject(object); + } finally { + oo.close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Serializable read(Kryo kryo, Input input, Class type) { + try { + ObjectInputStream oi = new ObjectInputStream(input); + try { + return (Serializable)oi.readObject(); + } finally { + oi.close(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + public static Set> cloneOperatorTree(Configuration conf, Set> roots) { ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); serializePlan(roots, baos, conf, true); @@ -971,6 +1009,7 @@ protected synchronized Kryo initialValue() { kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); + kryo.register(Serializable.class, new SerializableSerializer()); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); @@ -994,6 +1033,7 @@ protected synchronized Kryo initialValue() { kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); + kryo.register(Serializable.class, new SerializableSerializer()); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; };