Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy) @@ -79,6 +79,9 @@ private Class inputFormatClass; private URI uri; + /** + * Used only for serialization. + */ public Table() { } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (working copy) @@ -82,6 +82,12 @@ } /** + * Used only for serialization. + */ + public Partition() { + } + + /** * create an empty partition. * SemanticAnalyzer code requires that an empty partition when the table is not partitioned. */ @@ -254,9 +260,13 @@ final public Class getInputFormatClass() throws HiveException { if (inputFormatClass == null) { - String clsName = getSchema().getProperty( - org.apache.hadoop.hive.metastore.api.Constants.FILE_INPUT_FORMAT, - org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()); + String clsName = null; + if (tPartition != null && tPartition.getSd() != null) { + clsName = tPartition.getSd().getInputFormat(); + } + if (clsName == null) { + clsName = org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName(); + } try { inputFormatClass = ((Class) Class.forName(clsName, true, JavaUtils.getClassLoader())); @@ -270,9 +280,13 @@ final public Class getOutputFormatClass() throws HiveException { if (outputFormatClass == null) { - String clsName = getSchema().getProperty( - org.apache.hadoop.hive.metastore.api.Constants.FILE_OUTPUT_FORMAT, - HiveSequenceFileOutputFormat.class.getName()); + String clsName = null; + if (tPartition != null && tPartition.getSd() != null) { + clsName = tPartition.getSd().getOutputFormat(); + } + if (clsName == null) { + clsName = HiveSequenceFileOutputFormat.class.getName(); + } try { Class c = (Class)(Class.forName(clsName, true, JavaUtils.getClassLoader())); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (working copy) @@ -144,7 +144,7 @@ joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVESKEWJOINKEY)); - Map> bigKeysDirToTaskMap = + HashMap> bigKeysDirToTaskMap = new HashMap>(); List listWorks = new ArrayList(); List> listTasks = new ArrayList>(); Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import java.io.Serializable; import java.net.URI; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -27,7 +28,8 @@ * This class encapsulates an object that is being written to by the query. This * object may be a table, partition, dfs directory or a local directory. */ -public class WriteEntity { +public class WriteEntity implements Serializable { + private static final long serialVersionUID = 1L; /** * The type of the write entity. @@ -44,19 +46,71 @@ /** * The table. This is null if this is a directory. */ - private final Table t; + private Table t; /** * The partition.This is null if this object is not a partition. */ - private final Partition p; + private Partition p; /** * The directory if this is a directory. */ - private final String d; + private String d; /** + * This is derived from t and p, but we need to serialize this field to make sure + * WriteEntity.hashCode() does not need to recursively read into t and p. + */ + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Type getTyp() { + return typ; + } + + public void setTyp(Type typ) { + this.typ = typ; + } + + public Table getT() { + return t; + } + + public void setT(Table t) { + this.t = t; + } + + public Partition getP() { + return p; + } + + public void setP(Partition p) { + this.p = p; + } + + public String getD() { + return d; + } + + public void setD(String d) { + this.d = d; + } + + /** + * Only used by serialization. + */ + public WriteEntity() { + } + + /** * Constructor for a table. * * @param t @@ -67,6 +121,7 @@ p = null; this.t = t; typ = Type.TABLE; + name = computeName(); } /** @@ -80,6 +135,7 @@ this.p = p; t = p.getTable(); typ = Type.PARTITION; + name = computeName(); } /** @@ -99,6 +155,7 @@ } else { typ = Type.DFS_DIR; } + name = computeName(); } /** @@ -146,6 +203,10 @@ */ @Override public String toString() { + return name; + } + + private String computeName() { switch (typ) { case TABLE: return t.getDbName() + "@" + t.getTableName(); Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import java.io.Serializable; import java.net.URI; import java.util.Map; @@ -28,19 +29,57 @@ * This class encapsulates the information on the partition and tables that are * read by the query. */ -public class ReadEntity { +public class ReadEntity implements Serializable { + private static final long serialVersionUID = 1L; + /** + * The table. + */ + private Table t; + + /** * The partition. This is null for a non partitioned table. */ - private final Partition p; + private Partition p; /** - * The table. + * This is derived from t and p, but we need to serialize this field to make sure + * ReadEntity.hashCode() does not need to recursively read into t and p. */ - private final Table t; + private String name; + + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + + public void setP(Partition p) { + this.p = p; + } + + public void setT(Table t) { + this.t = t; + } + + public Partition getP() { + return p; + } + + public Table getT() { + return t; + } + /** + * For serialization only. + */ + public ReadEntity() { + } + + /** * Constructor. * * @param t @@ -49,6 +88,7 @@ public ReadEntity(Table t) { this.t = t; p = null; + name = computeName(); } /** @@ -60,8 +100,18 @@ public ReadEntity(Partition p) { t = p.getTable(); this.p = p; + name = computeName(); } + private String computeName() { + if (p != null) { + return p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + + p.getName(); + } else { + return t.getDbName() + "@" + t.getTableName(); + } + } + /** * Enum that tells what time of a read entity this is. */ @@ -117,12 +167,7 @@ */ @Override public String toString() { - if (p != null) { - return p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" - + p.getName(); - } else { - return t.getDbName() + "@" + t.getTableName(); - } + return name; } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -56,45 +57,83 @@ /** * FetchTask implementation. **/ -public class FetchOperator { +public class FetchOperator implements Serializable { - protected transient Log LOG; - protected transient LogHelper console; + static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); + static LogHelper console = new LogHelper(LOG); + public FetchOperator() { + } + public FetchOperator(FetchWork work, JobConf job) { - LOG = LogFactory.getLog(this.getClass().getName()); - console = new LogHelper(LOG); - this.work = work; + initialize(job); + } + + public void initialize(JobConf job) { this.job = job; - - currRecReader = null; - currPath = null; - currTbl = null; - currPart = null; - iterPath = null; - iterPartDesc = null; tblDataDone = false; rowWithPart = new Object[2]; } - private final FetchWork work; + public FetchWork getWork() { + return work; + } + + public void setWork(FetchWork work) { + this.work = work; + } + + public int getSplitNum() { + return splitNum; + } + + public void setSplitNum(int splitNum) { + this.splitNum = splitNum; + } + + public PartitionDesc getCurrPart() { + return currPart; + } + + public void setCurrPart(PartitionDesc currPart) { + this.currPart = currPart; + } + + public TableDesc getCurrTbl() { + return currTbl; + } + + public void setCurrTbl(TableDesc currTbl) { + this.currTbl = currTbl; + } + + public boolean isTblDataDone() { + return tblDataDone; + } + + public void setTblDataDone(boolean tblDataDone) { + this.tblDataDone = tblDataDone; + } + + private FetchWork work; private int splitNum; - private RecordReader currRecReader; - private InputSplit[] inputSplits; - private InputFormat inputFormat; - private final JobConf job; - private WritableComparable key; - private Writable value; - private Deserializer serde; - private Iterator iterPath; - private Iterator iterPartDesc; - private Path currPath; private PartitionDesc currPart; private TableDesc currTbl; private boolean tblDataDone; - private StructObjectInspector rowObjectInspector; - private final Object[] rowWithPart; + + private transient RecordReader currRecReader; + private transient InputSplit[] inputSplits; + private transient InputFormat inputFormat; + private transient JobConf job; + private transient WritableComparable key; + private transient Writable value; + private transient Deserializer serde; + private transient Iterator iterPath; + private transient Iterator iterPartDesc; + private transient Path currPath; + private transient StructObjectInspector rowObjectInspector; + private transient Object[] rowWithPart; /** * A cache of InputFormat instances. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -20,7 +20,9 @@ import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; +import java.beans.ExceptionListener; import java.beans.Expression; +import java.beans.Statement; import java.beans.XMLDecoder; import java.beans.XMLEncoder; import java.io.BufferedReader; @@ -40,10 +42,12 @@ import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -65,6 +69,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -197,6 +202,80 @@ } } + public static class MapDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Map oldMap = (Map)oldInstance; + HashMap newMap = new HashMap(oldMap); + return new Expression(newMap, HashMap.class, "new", new Object[] {}); + } + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection)oldInstance; + java.util.Collection newO = (java.util.Collection)newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[]{})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()})); + } + } + } + + public static class SetDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Set oldSet = (Set)oldInstance; + HashSet newSet = new HashSet(oldSet); + return new Expression(newSet, HashSet.class, "new", new Object[] {}); + } + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection)oldInstance; + java.util.Collection newO = (java.util.Collection)newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[]{})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()})); + } + } + + } + + public static class ListDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + List oldList = (List)oldInstance; + ArrayList newList = new ArrayList(oldList); + return new Expression(newList, ArrayList.class, "new", new Object[] {}); + } + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection)oldInstance; + java.util.Collection newO = (java.util.Collection)newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[]{})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[]{i.next()})); + } + } + + } + public static void setMapRedWork(Configuration job, MapredWork w) { try { // use the default file system of the job @@ -243,25 +322,80 @@ return s.hashCode(); } + /** + * Serialize a single Task. + */ public static void serializeTasks(Task t, OutputStream out) { XMLEncoder e = new XMLEncoder(out); // workaround for java 1.5 e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); - e - .setPersistenceDelegate(Operator.ProgressCounter.class, + e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); e.writeObject(t); e.close(); } + public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate { + protected Expression instantiate(Object oldInstance, Encoder out) { + return new Expression(oldInstance, + oldInstance.getClass(), + "new", + null); + } + + protected void initialize(Class type, Object oldInstance, Object newInstance, + Encoder out) { + Iterator ite = ((Collection) oldInstance).iterator(); + while (ite.hasNext()) { + out.writeStatement(new Statement(oldInstance, "add", + new Object[] { ite.next() })); + } + } + } + /** - * Serialize the plan object to an output stream. DO NOT use this to write to - * standard output since it closes the output stream DO USE mapredWork.toXML() - * instead + * Serialize the whole query plan. */ + public static void serializeQueryPlan(QueryPlan plan, OutputStream out) { + XMLEncoder e = new XMLEncoder(out); + e.setExceptionListener(new ExceptionListener() { + public void exceptionThrown(Exception e) { + LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new RuntimeException("Cannot serialize the query plan", e); + } + }); + // workaround for java 1.5 + e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); + e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); + e.setPersistenceDelegate(Operator.ProgressCounter.class, + new EnumDelegate()); + + e.setPersistenceDelegate(org.datanucleus.sco.backed.Map.class, new MapDelegate()); + e.setPersistenceDelegate(org.datanucleus.sco.backed.List.class, new ListDelegate()); + + e.writeObject(plan); + e.close(); + } + + /** + * Deserialize the whole query plan. + */ + public static QueryPlan deserializeQueryPlan(InputStream in, + Configuration conf) { + XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader()); + QueryPlan ret = (QueryPlan) d.readObject(); + d.close(); + return (ret); + } + + /** + * Serialize the mapredWork object to an output stream. DO NOT use this to + * write to standard output since it closes the output stream. + * DO USE mapredWork.toXML() instead. + */ public static void serializeMapRedWork(MapredWork w, OutputStream out) { XMLEncoder e = new XMLEncoder(out); // workaround for java 1.5 Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -283,9 +283,10 @@ } finally { BufferedWriter resultOut = null; try { - FileSystem fs = msckDesc.getResFile().getFileSystem(conf); + Path resFile = new Path(msckDesc.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); resultOut = new BufferedWriter(new OutputStreamWriter(fs - .create(msckDesc.getResFile()))); + .create(resFile))); boolean firstWritten = false; firstWritten |= writeMsckResult(result.getTablesNotInMs(), @@ -385,8 +386,9 @@ // write the results in the file try { - FileSystem fs = showParts.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(showParts.getResFile()); + Path resFile = new Path(showParts.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); Iterator iterParts = parts.iterator(); while (iterParts.hasNext()) { @@ -432,8 +434,9 @@ // write the results in the file try { - FileSystem fs = showTbls.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(showTbls.getResFile()); + Path resFile = new Path(showTbls.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); SortedSet sortedTbls = new TreeSet(tbls); Iterator iterTbls = sortedTbls.iterator(); @@ -477,8 +480,9 @@ // write the results in the file try { - FileSystem fs = showFuncs.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(showFuncs.getResFile()); + Path resFile = new Path(showFuncs.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); SortedSet sortedFuncs = new TreeSet(funcs); Iterator iterFuncs = sortedFuncs.iterator(); @@ -512,8 +516,9 @@ // write the results in the file try { - FileSystem fs = descFunc.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(descFunc.getResFile()); + Path resFile = new Path(descFunc.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); // get the function documentation Description desc = null; @@ -601,8 +606,9 @@ // write the results in the file try { - FileSystem fs = showTblStatus.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(showTblStatus.getResFile()); + Path resFile = new Path(showTblStatus.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); Iterator iterTables = tbls.iterator(); while (iterTables.hasNext()) { @@ -677,7 +683,7 @@ LOG.info("show table status: " + stringifyException(e)); return 1; } catch (Exception e) { - throw new HiveException(e.toString()); + throw new HiveException(e); } return 0; } @@ -703,9 +709,10 @@ false); Partition part = null; try { + Path resFile = new Path(descTbl.getResFile()); if (tbl == null) { - FileSystem fs = descTbl.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.open(resFile); String errMsg = "Table " + tableName + " does not exist"; outStream.write(errMsg.getBytes("UTF-8")); ((FSDataOutputStream) outStream).close(); @@ -714,8 +721,8 @@ if (descTbl.getPartSpec() != null) { part = db.getPartition(tbl, descTbl.getPartSpec(), false); if (part == null) { - FileSystem fs = descTbl.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.open(resFile); String errMsg = "Partition " + descTbl.getPartSpec() + " for table " + tableName + " does not exist"; outStream.write(errMsg.getBytes("UTF-8")); @@ -745,8 +752,9 @@ } else { cols = Hive.getFieldsFromDeserializer(colPath, tbl.getDeserializer()); } - FileSystem fs = descTbl.getResFile().getFileSystem(conf); - DataOutput outStream = (DataOutput) fs.create(descTbl.getResFile()); + Path resFile = new Path(descTbl.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = (DataOutput) fs.create(resFile); Iterator iterCols = cols.iterator(); while (iterCols.hasNext()) { // create a row per column @@ -808,7 +816,7 @@ LOG.info("describe table: " + stringifyException(e)); return 1; } catch (Exception e) { - throw new HiveException(e.toString()); + throw new HiveException(e); } return 0; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (working copy) @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -48,20 +49,26 @@ // tables into corresponding different dirs (one dir per table). // this map stores mapping from "big key dir" to its corresponding mapjoin // task. - Map> dirToTaskMap; + HashMap> dirToTaskMap; + /** + * For serialization use only. + */ + public ConditionalResolverSkewJoinCtx() { + } + public ConditionalResolverSkewJoinCtx( - Map> dirToTaskMap) { + HashMap> dirToTaskMap) { super(); this.dirToTaskMap = dirToTaskMap; } - public Map> getDirToTaskMap() { + public HashMap> getDirToTaskMap() { return dirToTaskMap; } public void setDirToTaskMap( - Map> dirToTaskMap) { + HashMap> dirToTaskMap) { this.dirToTaskMap = dirToTaskMap; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.HashSet; import java.util.Set; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -48,16 +49,16 @@ /** * ReadEntitites that are passed to the hooks. */ - protected Set inputs; + protected HashSet inputs; /** * List of WriteEntities that are passed to the hooks. */ - protected Set outputs; + protected HashSet outputs; public DDLWork() { } - public DDLWork(Set inputs, Set outputs) { + public DDLWork(HashSet inputs, HashSet outputs) { this.inputs = inputs; this.outputs = outputs; } @@ -66,7 +67,7 @@ * @param alterTblDesc * alter table descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, AlterTableDesc alterTblDesc) { this(inputs, outputs); this.alterTblDesc = alterTblDesc; @@ -76,7 +77,7 @@ * @param createTblDesc * create table descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, CreateTableDesc createTblDesc) { this(inputs, outputs); @@ -87,7 +88,7 @@ * @param createTblLikeDesc * create table like descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, CreateTableLikeDesc createTblLikeDesc) { this(inputs, outputs); @@ -98,7 +99,7 @@ * @param createVwDesc * create view descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, CreateViewDesc createVwDesc) { this(inputs, outputs); @@ -109,7 +110,7 @@ * @param dropTblDesc * drop table descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, DropTableDesc dropTblDesc) { this(inputs, outputs); @@ -119,7 +120,7 @@ /** * @param descTblDesc */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, DescTableDesc descTblDesc) { this(inputs, outputs); @@ -129,7 +130,7 @@ /** * @param showTblsDesc */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, ShowTablesDesc showTblsDesc) { this(inputs, outputs); @@ -139,7 +140,7 @@ /** * @param showFuncsDesc */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, ShowFunctionsDesc showFuncsDesc) { this(inputs, outputs); @@ -149,7 +150,7 @@ /** * @param descFuncDesc */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, DescFunctionDesc descFuncDesc) { this(inputs, outputs); @@ -159,7 +160,7 @@ /** * @param showPartsDesc */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, ShowPartitionsDesc showPartsDesc) { this(inputs, outputs); @@ -170,14 +171,14 @@ * @param addPartitionDesc * information about the partitions we want to add. */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, AddPartitionDesc addPartitionDesc) { this(inputs, outputs); this.addPartitionDesc = addPartitionDesc; } - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, MsckDesc checkDesc) { this(inputs, outputs); @@ -188,7 +189,7 @@ * @param showTblStatusDesc * show table status descriptor */ - public DDLWork(Set inputs, Set outputs, + public DDLWork(HashSet inputs, HashSet outputs, ShowTableStatusDesc showTblStatusDesc) { this(inputs, outputs); @@ -412,19 +413,19 @@ this.descFunctionDesc = descFunctionDesc; } - public Set getInputs() { + public HashSet getInputs() { return inputs; } - public Set getOutputs() { + public HashSet getOutputs() { return outputs; } - public void setInputs(Set inputs) { + public void setInputs(HashSet inputs) { this.inputs = inputs; } - public void setOutputs(Set outputs) { + public void setOutputs(HashSet outputs) { this.outputs = outputs; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (working copy) @@ -38,15 +38,20 @@ @Explain(displayName = "Partition") public class PartitionDesc implements Serializable, Cloneable { private static final long serialVersionUID = 2L; - private TableDesc table; + private TableDesc tableDesc; private java.util.LinkedHashMap partSpec; private java.lang.Class deserializerClass; private Class inputFileFormatClass; private Class outputFileFormatClass; private java.util.Properties properties; private String serdeClassName; + private transient String baseFileName; + public void setBaseFileName(String baseFileName) { + this.baseFileName = baseFileName; + } + public PartitionDesc() { } @@ -61,7 +66,7 @@ final Class inputFileFormatClass, final Class outputFormat, final java.util.Properties properties, final String serdeClassName) { - this.table = table; + this.tableDesc = table; this.partSpec = partSpec; deserializerClass = serdeClass; this.inputFileFormatClass = inputFileFormatClass; @@ -78,7 +83,7 @@ public PartitionDesc(final org.apache.hadoop.hive.ql.metadata.Partition part) throws HiveException { - table = Utilities.getTableDesc(part.getTable()); + tableDesc = Utilities.getTableDesc(part.getTable()); partSpec = part.getSpec(); deserializerClass = part.getDeserializer().getClass(); inputFileFormatClass = part.getInputFormatClass(); @@ -91,11 +96,11 @@ @Explain(displayName = "") public TableDesc getTableDesc() { - return table; + return tableDesc; } - public void setTableDesc(final TableDesc table) { - this.table = table; + public void setTableDesc(TableDesc tableDesc) { + this.tableDesc = tableDesc; } @Explain(displayName = "partition values") @@ -108,8 +113,8 @@ } public java.lang.Class getDeserializerClass() { - if (deserializerClass == null && table != null) { - setDeserializerClass(table.getDeserializerClass()); + if (deserializerClass == null && tableDesc != null) { + setDeserializerClass(tableDesc.getDeserializerClass()); } return deserializerClass; } @@ -120,8 +125,8 @@ } public Class getInputFileFormatClass() { - if (inputFileFormatClass == null && table != null) { - setInputFileFormatClass(table.getInputFileFormatClass()); + if (inputFileFormatClass == null && tableDesc != null) { + setInputFileFormatClass(tableDesc.getInputFileFormatClass()); } return inputFileFormatClass; } @@ -141,8 +146,8 @@ } public Class getOutputFileFormatClass() { - if (outputFileFormatClass == null && table != null) { - setOutputFileFormatClass(table.getOutputFileFormatClass()); + if (outputFileFormatClass == null && tableDesc != null) { + setOutputFileFormatClass(tableDesc.getOutputFileFormatClass()); } return outputFileFormatClass; } @@ -154,8 +159,8 @@ @Explain(displayName = "properties", normalExplain = false) public java.util.Properties getProperties() { - if (table != null) { - return table.getProperties(); + if (tableDesc != null) { + return tableDesc.getProperties(); } return properties; } @@ -169,8 +174,8 @@ */ @Explain(displayName = "serde") public String getSerdeClassName() { - if (serdeClassName == null && table != null) { - setSerdeClassName(table.getSerdeClassName()); + if (serdeClassName == null && tableDesc != null) { + setSerdeClassName(tableDesc.getSerdeClassName()); } return serdeClassName; } @@ -221,7 +226,7 @@ } ret.setProperties(newProp); } - ret.table = (TableDesc) table.clone(); + ret.tableDesc = (TableDesc) tableDesc.clone(); // The partition spec is not present if (partSpec != null) { ret.partSpec = new java.util.LinkedHashMap(); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java (working copy) @@ -31,6 +31,12 @@ private String functionName; private String className; + /** + * For serialization only. + */ + public CreateFunctionDesc() { + } + public CreateFunctionDesc(String functionName, String className) { this.functionName = functionName; this.className = className; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java (working copy) @@ -30,6 +30,12 @@ private String functionName; + /** + * For serialization only. + */ + public DropFunctionDesc() { + } + public DropFunctionDesc(String functionName) { this.functionName = functionName; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.HashSet; import java.util.Set; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -39,21 +40,21 @@ /** * ReadEntitites that are passed to the hooks. */ - protected Set inputs; + protected HashSet inputs; /** * List of WriteEntities that are passed to the hooks. */ - protected Set outputs; + protected HashSet outputs; public MoveWork() { } - public MoveWork(Set inputs, Set outputs) { + public MoveWork(HashSet inputs, HashSet outputs) { this.inputs = inputs; this.outputs = outputs; } - public MoveWork(Set inputs, Set outputs, + public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat) { this(inputs, outputs); @@ -88,19 +89,19 @@ this.checkFileFormat = checkFileFormat; } - public Set getInputs() { + public HashSet getInputs() { return inputs; } - public Set getOutputs() { + public HashSet getOutputs() { return outputs; } - public void setInputs(Set inputs) { + public void setInputs(HashSet inputs) { this.inputs = inputs; } - public void setOutputs(Set outputs) { + public void setOutputs(HashSet outputs) { this.outputs = outputs; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java (working copy) @@ -17,20 +17,31 @@ */ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; /** * Contains the information needed to add a partition. */ -public class AddPartitionDesc { +public class AddPartitionDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + String tableName; String dbName; String location; boolean ifNotExists; - Map partSpec; + LinkedHashMap partSpec; /** + * For serialization only. + */ + public AddPartitionDesc() { + } + + /** * @param dbName * database to add to. * @param tableName @@ -47,7 +58,7 @@ super(); this.dbName = dbName; this.tableName = tableName; - this.partSpec = partSpec; + this.partSpec = new LinkedHashMap(partSpec); this.location = location; this.ifNotExists = ifNotExists; } @@ -100,7 +111,7 @@ /** * @return partition specification. */ - public Map getPartSpec() { + public LinkedHashMap getPartSpec() { return partSpec; } @@ -108,7 +119,7 @@ * @param partSpec * partition specification */ - public void setPartSpec(Map partSpec) { + public void setPartSpec(LinkedHashMap partSpec) { this.partSpec = partSpec; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FunctionWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FunctionWork.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FunctionWork.java (working copy) @@ -29,6 +29,12 @@ private CreateFunctionDesc createFunctionDesc; private DropFunctionDesc dropFunctionDesc; + /** + * For serialization only. + */ + public FunctionWork() { + } + public FunctionWork(CreateFunctionDesc createFunctionDesc) { this.createFunctionDesc = createFunctionDesc; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTableStatusDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTableStatusDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTableStatusDesc.java (working copy) @@ -31,7 +31,7 @@ public class ShowTableStatusDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; String pattern; - Path resFile; + String resFile; String dbName; HashMap partSpec; @@ -53,10 +53,16 @@ } /** + * For serializatino use only. + */ + public ShowTableStatusDesc() { + } + + /** * @param pattern * names of tables to show */ - public ShowTableStatusDesc(Path resFile, String dbName, String pattern) { + public ShowTableStatusDesc(String resFile, String dbName, String pattern) { this.dbName = dbName; this.resFile = resFile; this.pattern = pattern; @@ -71,7 +77,7 @@ * @param part * partition specification */ - public ShowTableStatusDesc(Path resFile, String dbName, String pattern, + public ShowTableStatusDesc(String resFile, String dbName, String pattern, HashMap partSpec) { this.dbName = dbName; this.resFile = resFile; @@ -98,20 +104,20 @@ /** * @return the resFile */ - public Path getResFile() { + public String getResFile() { return resFile; } @Explain(displayName = "result file", normalExplain = false) public String getResFileString() { - return getResFile().getName(); + return getResFile(); } /** * @param resFile * the resFile to set */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java (working copy) @@ -16,10 +16,16 @@ private String tableName; private ArrayList> partSpecs; - private Path resFile; + private String resFile; private boolean repairPartitions; /** + * For serialization use only. + */ + public MsckDesc() { + } + + /** * Description of a msck command. * * @param tableName @@ -39,7 +45,7 @@ for (int i = 0; i < partSpecs.size(); i++) { this.partSpecs.add(new LinkedHashMap(partSpecs.get(i))); } - this.resFile = resFile; + this.resFile = resFile.toString(); this.repairPartitions = repairPartitions; } @@ -76,7 +82,7 @@ /** * @return file to save command output to */ - public Path getResFile() { + public String getResFile() { return resFile; } @@ -84,7 +90,7 @@ * @param resFile * file to save command output to */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowPartitionsDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowPartitionsDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowPartitionsDesc.java (working copy) @@ -30,7 +30,7 @@ public class ShowPartitionsDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; String tabName; - Path resFile; + String resFile; /** * table name for the result of show tables. */ @@ -59,7 +59,7 @@ */ public ShowPartitionsDesc(String tabName, Path resFile) { this.tabName = tabName; - this.resFile = resFile; + this.resFile = resFile.toString(); } /** @@ -81,20 +81,16 @@ /** * @return the results file */ - public Path getResFile() { + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { return resFile; } - @Explain(displayName = "result file", normalExplain = false) - public String getResFileString() { - return getResFile().getName(); - } - /** * @param resFile * the results file to be used to return the results */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowFunctionsDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowFunctionsDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowFunctionsDesc.java (working copy) @@ -30,7 +30,7 @@ public class ShowFunctionsDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; String pattern; - Path resFile; + String resFile; /** * table name for the result of show tables. */ @@ -55,7 +55,7 @@ * @param resFile */ public ShowFunctionsDesc(Path resFile) { - this.resFile = resFile; + this.resFile = resFile.toString(); pattern = null; } @@ -64,7 +64,7 @@ * names of tables to show */ public ShowFunctionsDesc(Path resFile, String pattern) { - this.resFile = resFile; + this.resFile = resFile.toString(); this.pattern = pattern; } @@ -87,20 +87,16 @@ /** * @return the resFile */ - public Path getResFile() { + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { return resFile; } - @Explain(displayName = "result file", normalExplain = false) - public String getResFileString() { - return getResFile().getName(); - } - /** * @param resFile * the resFile to set */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTablesDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTablesDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowTablesDesc.java (working copy) @@ -30,7 +30,7 @@ public class ShowTablesDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; String pattern; - Path resFile; + String resFile; /** * table name for the result of show tables. */ @@ -55,7 +55,7 @@ * @param resFile */ public ShowTablesDesc(Path resFile) { - this.resFile = resFile; + this.resFile = resFile.toString(); pattern = null; } @@ -64,7 +64,7 @@ * names of tables to show */ public ShowTablesDesc(Path resFile, String pattern) { - this.resFile = resFile; + this.resFile = resFile.toString(); this.pattern = pattern; } @@ -87,20 +87,16 @@ /** * @return the resFile */ - public Path getResFile() { + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { return resFile; } - @Explain(displayName = "result file", normalExplain = false) - public String getResFileString() { - return getResFile().getName(); - } - /** * @param resFile * the resFile to set */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DescFunctionDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DescFunctionDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DescFunctionDesc.java (working copy) @@ -30,7 +30,7 @@ public class DescFunctionDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; String name; - Path resFile; + String resFile; boolean isExtended; public boolean isExtended() { @@ -65,7 +65,7 @@ * @param resFile */ public DescFunctionDesc(Path resFile) { - this.resFile = resFile; + this.resFile = resFile.toString(); name = null; } @@ -75,7 +75,7 @@ */ public DescFunctionDesc(Path resFile, String name, boolean isExtended) { this.isExtended = isExtended; - this.resFile = resFile; + this.resFile = resFile.toString(); this.name = name; } @@ -98,20 +98,16 @@ /** * @return the resFile */ - public Path getResFile() { + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { return resFile; } - @Explain(displayName = "result file", normalExplain = false) - public String getResFileString() { - return getResFile().getName(); - } - /** * @param resFile * the resFile to set */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DescTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DescTableDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DescTableDesc.java (working copy) @@ -37,7 +37,7 @@ String tableName; HashMap partSpec; - Path resFile; + String resFile; boolean isExt; /** * table name for the result of describe table. @@ -61,7 +61,7 @@ HashMap partSpec, boolean isExt) { this.isExt = isExt; this.partSpec = partSpec; - this.resFile = resFile; + this.resFile = resFile.toString(); this.tableName = tableName; } @@ -123,20 +123,16 @@ /** * @return the resFile */ - public Path getResFile() { + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { return resFile; } - @Explain(displayName = "result file", normalExplain = false) - public String getResFileString() { - return getResFile().getName(); - } - /** * @param resFile * the resFile to set */ - public void setResFile(Path resFile) { + public void setResFile(String resFile) { this.resFile = resFile; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CreateViewDesc.java (working copy) @@ -29,7 +29,7 @@ * */ @Explain(displayName = "Create View") -public class CreateViewDesc implements Serializable { +public class CreateViewDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; private String viewName; @@ -39,6 +39,28 @@ private String comment; private boolean ifNotExists; + public String getOriginalText() { + return originalText; + } + + public void setOriginalText(String originalText) { + this.originalText = originalText; + } + + public String getExpandedText() { + return expandedText; + } + + public void setExpandedText(String expandedText) { + this.expandedText = expandedText; + } + + /** + * For serialization only. + */ + public CreateViewDesc() { + } + public CreateViewDesc(String viewName, List schema, String comment, boolean ifNotExists) { this.viewName = viewName; Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -193,7 +193,7 @@ /** * Create a map-reduce scratch directory on demand and return it. */ - private String getMRScratchDir() { + public String getMRScratchDir() { try { if (MRScratchDir == null) { MRScratchDir = makeMRScratchDir(conf, executionId, !explain); @@ -210,7 +210,7 @@ /** * Create a local scratch directory on demand and return it. */ - private String getLocalScratchDir() { + public String getLocalScratchDir() { try { if (localScratchDir == null) { localScratchDir = makeLocalScratchDir(conf, executionId, true); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -382,7 +382,7 @@ } } } - showTblStatusDesc = new ShowTableStatusDesc(ctx.getResFile(), dbName, + showTblStatusDesc = new ShowTableStatusDesc(ctx.getResFile().toString(), dbName, tableNames, partSpec); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), showTblStatusDesc), conf)); Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 910409) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -19,30 +19,27 @@ package org.apache.hadoop.hive.ql; import java.io.DataInput; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Random; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -53,7 +50,6 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; @@ -65,16 +61,12 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -302,7 +294,7 @@ try { ctx = new Context(conf); - + ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(command, ctx); tree = ParseUtils.findRootNonNullToken(tree); @@ -317,13 +309,38 @@ plan = new QueryPlan(command, sem); // initialize FetchTask right here - if (sem.getFetchTask() != null) { - sem.getFetchTask().initialize(conf, plan, null); + if (plan.getFetchTask() != null) { + plan.getFetchTask().initialize(conf, plan, null); } // get the output schema schema = getSchema(sem, conf); + // Serialize the query plan + // get temp file name and remove file: + String queryPlanFileName = ctx.getLocalScratchDir() + Path.SEPARATOR_CHAR + + "queryplan.xml"; + LOG.info("query plan = " + queryPlanFileName); + queryPlanFileName = new Path(queryPlanFileName).toUri().getPath(); + + // serialize the queryPlan + FileOutputStream fos = new FileOutputStream(queryPlanFileName); + Utilities.serializeQueryPlan(plan, fos); + fos.close(); + + // deserialize the queryPlan + FileInputStream fis = new FileInputStream(queryPlanFileName); + QueryPlan newPlan = Utilities.deserializeQueryPlan(fis, conf); + fis.close(); + + // Use the deserialized plan + plan = newPlan; + + // initialize FetchTask right here + if (plan.getFetchTask() != null) { + plan.getFetchTask().initialize(conf, plan, null); + } + return (0); } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();