Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -56,7 +56,7 @@ transient protected LogHelper console; transient protected QueryPlan queryPlan; transient protected TaskHandle taskHandle; - transient protected Map taskCounters; + transient protected HashMap taskCounters; transient protected DriverContext driverContext; // Bean methods @@ -279,7 +279,7 @@ // default, do nothing } - public Map getCounters() { + public HashMap getCounters() { return taskCounters; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -811,7 +811,7 @@ /** * populated at runtime from hadoop counters at run time in the client */ - transient protected Map counters; + transient protected HashMap counters; /** * keeps track of unique ProgressCounter enums used this value is used at @@ -893,7 +893,7 @@ this.operatorId = operatorId; } - public Map getCounters() { + public HashMap getCounters() { return counters; } Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -36,8 +36,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.api.AdjacencyType; import org.apache.hadoop.hive.ql.plan.api.NodeType; @@ -47,27 +50,47 @@ import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; +/** + * QueryPlan can be serialized to disk so that we can restart/resume the + * progress of it in the future, either within or outside of the current + * jvm. + */ public class QueryPlan implements Serializable { private static final long serialVersionUID = 1L; static final private Log LOG = LogFactory.getLog(QueryPlan.class.getName()); private final String queryString; - private final BaseSemanticAnalyzer plan; + + private ArrayList> rootTasks; + private FetchTask fetchTask; + private HashSet inputs; + private HashSet outputs; + + private HashMap idToTableNameMap; + private final String queryId; private final org.apache.hadoop.hive.ql.plan.api.Query query; - private final Map> counters; - private final Set done; - private final Set started; + private final HashMap> counters; + private final HashSet done; + private final HashSet started; - public QueryPlan(String queryString, BaseSemanticAnalyzer plan) { + public QueryPlan(String queryString, BaseSemanticAnalyzer sem) { this.queryString = queryString; - this.plan = plan; + + rootTasks = new ArrayList>(); + rootTasks.addAll(sem.getRootTasks()); + fetchTask = sem.getFetchTask(); + // Note that inputs and outputs can be changed when the query gets executed + inputs = sem.getInputs(); + outputs = sem.getOutputs(); + idToTableNameMap = new HashMap(sem.getIdToTableNameMap()); + queryId = makeQueryId(); query = new org.apache.hadoop.hive.ql.plan.api.Query(); query.setQueryId(queryId); query.putToQueryAttributes("queryString", this.queryString); - counters = new HashMap>(); + counters = new HashMap>(); done = new HashSet(); started = new HashSet(); } @@ -76,10 +99,6 @@ return queryString; } - public BaseSemanticAnalyzer getPlan() { - return plan; - } - public String getQueryId() { return queryId; } @@ -152,7 +171,7 @@ Queue> tasksToVisit = new LinkedList>(); Set> tasksVisited = new HashSet>(); - tasksToVisit.addAll(plan.getRootTasks()); + tasksToVisit.addAll(rootTasks); while (tasksToVisit.size() != 0) { Task task = tasksToVisit.remove(); tasksVisited.add(task); @@ -273,7 +292,7 @@ private void extractCounters() throws IOException { Queue> tasksToVisit = new LinkedList>(); Set> tasksVisited = new HashSet>(); - tasksToVisit.addAll(plan.getRootTasks()); + tasksToVisit.addAll(rootTasks); while (tasksToVisit.peek() != null) { Task task = tasksToVisit.remove(); tasksVisited.add(task); @@ -601,4 +620,44 @@ return done; } + public ArrayList> getRootTasks() { + return rootTasks; + } + + public void setRootTasks(ArrayList> rootTasks) { + this.rootTasks = rootTasks; + } + + public FetchTask getFetchTask() { + return fetchTask; + } + + public void setFetchTask(FetchTask fetchTask) { + this.fetchTask = fetchTask; + } + + public HashSet getInputs() { + return inputs; + } + + public void setInputs(HashSet inputs) { + this.inputs = inputs; + } + + public HashSet getOutputs() { + return outputs; + } + + public void setOutputs(HashSet outputs) { + this.outputs = outputs; + } + + public HashMap getIdToTableNameMap() { + return idToTableNameMap; + } + + public void setIdToTableNameMap(HashMap idToTableNameMap) { + this.idToTableNameMap = idToTableNameMap; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -56,8 +58,7 @@ protected final Hive db; protected final HiveConf conf; protected List> rootTasks; - protected Task fetchTask; - protected boolean fetchTaskInit; + protected FetchTask fetchTask; protected final Log LOG; protected final LogHelper console; @@ -67,11 +68,11 @@ /** * ReadEntitites that are passed to the hooks. */ - protected Set inputs; + protected HashSet inputs; /** * List of WriteEntities that are passed to the hooks. */ - protected Set outputs; + protected HashSet outputs; protected static final String TEXTFILE_INPUT = TextInputFormat.class .getName(); @@ -124,7 +125,7 @@ /** * @return the fetchTask */ - public Task getFetchTask() { + public FetchTask getFetchTask() { return fetchTask; } @@ -132,18 +133,10 @@ * @param fetchTask * the fetchTask to set */ - public void setFetchTask(Task fetchTask) { + public void setFetchTask(FetchTask fetchTask) { this.fetchTask = fetchTask; } - public boolean getFetchTaskInit() { - return fetchTaskInit; - } - - public void setFetchTaskInit(boolean fetchTaskInit) { - this.fetchTaskInit = fetchTaskInit; - } - protected void reset() { rootTasks = new ArrayList>(); } @@ -293,11 +286,11 @@ return sb.toString(); } - public Set getInputs() { + public HashSet getInputs() { return inputs; } - public Set getOutputs() { + public HashSet getOutputs() { return outputs; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -5164,7 +5165,7 @@ private void genMapRedTasks(QB qb) throws SemanticException { FetchWork fetch = null; List> mvTask = new ArrayList>(); - Task fetchTask = null; + FetchTask fetchTask = null; QBParseInfo qbParseInfo = qb.getParseInfo(); @@ -5235,7 +5236,7 @@ } if (noMapRed) { - fetchTask = TaskFactory.get(fetch, conf); + fetchTask = (FetchTask)TaskFactory.get(fetch, conf); setFetchTask(fetchTask); // remove root tasks if any @@ -5262,7 +5263,7 @@ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)), qb.getParseInfo().getOuterQueryLimit()); - fetchTask = TaskFactory.get(fetch, conf); + fetchTask = (FetchTask)TaskFactory.get(fetch, conf); setFetchTask(fetchTask); } else { new ArrayList(); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; @@ -290,7 +291,7 @@ * @param schema * thrift ddl */ - private Task createFetchTask(String schema) { + private FetchTask createFetchTask(String schema) { Properties prop = new Properties(); prop.setProperty(Constants.SERIALIZATION_FORMAT, "9"); @@ -303,7 +304,7 @@ LazySimpleSerDe.class, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, prop), -1); fetch.setSerializationNullFormat(" "); - return TaskFactory.get(fetch, conf); + return (FetchTask)TaskFactory.get(fetch, conf); } private void analyzeDescribeTable(ASTNode ast) throws SemanticException { Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 4726) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -18,26 +18,38 @@ package org.apache.hadoop.hive.ql; +import java.beans.XMLDecoder; +import java.beans.XMLEncoder; import java.io.DataInput; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Random; import java.util.Set; import java.util.Vector; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -48,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; @@ -59,11 +72,15 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -81,6 +98,8 @@ private DataInput resStream; private Context ctx; private QueryPlan plan; + private Schema schema; + private String errorMessage; private String SQLState; @@ -137,59 +156,60 @@ return cs; } + + public Schema getSchema() { + return schema; + } + /** * Get a Schema with fields represented with native Hive types */ - public Schema getSchema() throws Exception { + public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { Schema schema = null; - try { - // If we have a plan, prefer its logical result schema if it's - // available; otherwise, try digging out a fetch task; failing that, - // give up. - if (plan == null) { - // can't get any info without a plan - } else if (plan.getPlan().getResultSchema() != null) { - List lst = plan.getPlan().getResultSchema(); - schema = new Schema(lst, null); - } else if (plan.getPlan().getFetchTask() != null) { - BaseSemanticAnalyzer sem = plan.getPlan(); - - if (!sem.getFetchTaskInit()) { - sem.setFetchTaskInit(true); - sem.getFetchTask().initialize(conf, plan, null); + + // If we have a plan, prefer its logical result schema if it's + // available; otherwise, try digging out a fetch task; failing that, + // give up. + if (sem == null) { + // can't get any info without a plan + } else if (sem.getResultSchema() != null) { + List lst = sem.getResultSchema(); + schema = new Schema(lst, null); + } else if (sem.getFetchTask() != null) { + FetchTask ft = (FetchTask) sem.getFetchTask(); + TableDesc td = ft.getTblDesc(); + // partitioned tables don't have tableDesc set on the FetchTask. Instead + // they have a list of PartitionDesc objects, each with a table desc. + // Let's + // try to fetch the desc for the first partition and use it's + // deserializer. + if (td == null && ft.getWork() != null + && ft.getWork().getPartDesc() != null) { + if (ft.getWork().getPartDesc().size() > 0) { + td = ft.getWork().getPartDesc().get(0).getTableDesc(); } - FetchTask ft = (FetchTask) sem.getFetchTask(); + } - TableDesc td = ft.getTblDesc(); - // partitioned tables don't have tableDesc set on the FetchTask. Instead - // they have a list of PartitionDesc objects, each with a table desc. - // Let's - // try to fetch the desc for the first partition and use it's - // deserializer. - if (td == null && ft.getWork() != null - && ft.getWork().getPartDesc() != null) { - if (ft.getWork().getPartDesc().size() > 0) { - td = ft.getWork().getPartDesc().get(0).getTableDesc(); - } + if (td == null) { + LOG.info("No returning schema."); + } else { + String tableName = "result"; + List lst = null; + try { + lst = MetaStoreUtils.getFieldsFromDeserializer( + tableName, td.getDeserializer()); + } catch (Exception e) { + LOG.warn("Error getting schema: " + + org.apache.hadoop.util.StringUtils.stringifyException(e)); } - - if (td == null) { - throw new Exception("No table description found for fetch task: " - + ft); + if (lst != null) { + schema = new Schema(lst, null); } - - String tableName = "result"; - List lst = MetaStoreUtils.getFieldsFromDeserializer( - tableName, td.getDeserializer()); - schema = new Schema(lst, null); } - if (schema == null) { - schema = new Schema(); - } - } catch (Exception e) { - e.printStackTrace(); - throw e; } + if (schema == null) { + schema = new Schema(); + } LOG.info("Returning Hive schema: " + schema); return schema; } @@ -302,7 +322,14 @@ sem.validate(); plan = new QueryPlan(command, sem); - + // initialize FetchTask right here + if (sem.getFetchTask() != null) { + sem.getFetchTask().initialize(conf, plan, null); + } + + // get the output schema + schema = getSchema(sem, conf); + return (0); } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); @@ -458,16 +485,14 @@ } resStream = null; - BaseSemanticAnalyzer sem = plan.getPlan(); - // Get all the pre execution hooks and execute them. for (PreExecute peh : getPreExecHooks()) { - peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(), + peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), UnixUserGroupInformation.readFromConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME)); } - int jobs = countJobs(sem.getRootTasks()); + int jobs = countJobs(plan.getRootTasks()); if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } @@ -475,7 +500,7 @@ SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS, String.valueOf(jobs)); SessionState.get().getHiveHistory().setIdToTableMap( - sem.getIdToTableNameMap()); + plan.getIdToTableNameMap()); } String jobname = Utilities.abbreviate(queryStr, maxlen - 6); @@ -492,7 +517,7 @@ // Add root Tasks to runnable - for (Task tsk : sem.getRootTasks()) { + for (Task tsk : plan.getRootTasks()) { driverCxt.addToRunnable(tsk); } @@ -541,7 +566,7 @@ // Get all the post execution hooks and execute them. for (PostExecute peh : getPostExecHooks()) { - peh.run(SessionState.get(), sem.getInputs(), sem.getOutputs(), + peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), UnixUserGroupInformation.readFromConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME)); } @@ -579,6 +604,7 @@ } } console.printInfo("OK"); + return (0); } @@ -676,13 +702,8 @@ } public boolean getResults(Vector res) throws IOException { - if (plan != null && plan.getPlan().getFetchTask() != null) { - BaseSemanticAnalyzer sem = plan.getPlan(); - if (!sem.getFetchTaskInit()) { - sem.setFetchTaskInit(true); - sem.getFetchTask().initialize(conf, plan, null); - } - FetchTask ft = (FetchTask) sem.getFetchTask(); + if (plan != null && plan.getFetchTask() != null) { + FetchTask ft = (FetchTask) plan.getFetchTask(); ft.setMaxRows(maxRows); return ft.fetch(res); }