diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b707d85..b4e8996 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -22,6 +22,7 @@ import java.beans.Encoder; import java.beans.ExceptionListener; import java.beans.Expression; +import java.beans.PersistenceDelegate; import java.beans.Statement; import java.beans.XMLDecoder; import java.beans.XMLEncoder; @@ -50,12 +51,14 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLTransientException; +import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -376,6 +379,41 @@ protected void initialize(Class type, Object oldInstance, Object newInstance, } + /** + * DatePersistenceDelegate. Needed to serialize java.util.Date + * since it is not serialization friendly. + * Also works for java.sql.Date since it derives from java.util.Date. + */ + public static class DatePersistenceDelegate extends PersistenceDelegate { + + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Date dateVal = (Date)oldInstance; + Object[] args = { dateVal.getTime() }; + return new Expression(dateVal, dateVal.getClass(), "new", args); + } + + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + if (oldInstance == null || newInstance == null) { + return false; + } + return oldInstance.getClass() == newInstance.getClass(); + } + } + + /** + * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since + * it is not serialization friendly. + */ + public static class TimestampPersistenceDelegate extends DatePersistenceDelegate { + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + Timestamp ts = (Timestamp)oldInstance; + Object[] args = { ts.getNanos() }; + Statement stmt = new Statement(oldInstance, "setNanos", args); + out.writeStatement(stmt); + } + } + public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) { try { @@ -424,6 +462,8 @@ public static String getHiveJobID(Configuration job) { public static String serializeExpression(ExprNodeDesc expr) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); XMLEncoder encoder = new XMLEncoder(baos); + encoder.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + encoder.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); try { encoder.writeObject(expr); } finally { @@ -466,7 +506,8 @@ public static void serializeTasks(Task t, OutputStream o e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); - + e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); e.writeObject(t); } finally { if (null != e) { @@ -505,6 +546,8 @@ public void exceptionThrown(Exception e) { e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); + e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); e.setPersistenceDelegate(org.datanucleus.sco.backed.Map.class, new MapDelegate()); e.setPersistenceDelegate(org.datanucleus.sco.backed.List.class, new ListDelegate()); @@ -540,6 +583,8 @@ public static void serializeMapRedWork(MapredWork w, OutputStream out) { // workaround for java 1.5 e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); + e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); e.writeObject(w); } finally { if (null != e) { @@ -573,6 +618,8 @@ public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) // workaround for java 1.5 e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); + e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); e.writeObject(w); } finally { if (null != e) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index df31181..732788b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -18,11 +18,18 @@ package org.apache.hadoop.hive.ql.exec; +import java.sql.Date; +import java.sql.Timestamp; + import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension; import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; public class TestUtilities extends TestCase { @@ -53,4 +60,14 @@ public void testGetFileExtension() { assertEquals("Custom extension for uncompressed text format", extension, getFileExtension(jc, true, new HiveIgnoreKeyTextOutputFormat())); } + + public void testSerializeTimestamp() { + Timestamp ts = new Timestamp(1374554702000L); + ts.setNanos(123456); + ExprNodeConstantDesc constant = new ExprNodeConstantDesc( + TypeInfoFactory.timestampTypeInfo, ts); + String serialized = Utilities.serializeExpression(constant); + ExprNodeDesc deserialized = Utilities.deserializeExpression(serialized, new Configuration()); + assertEquals(constant.getExprString(), deserialized.getExprString()); + } }