diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a00079..ce376ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2251,7 +2251,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, "Enable memory manager for tez"), HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 2.0, - "Expected inflation factor between disk/in memory representation of hash tables"); + "Expected inflation factor between disk/in memory representation of hash tables"), + HIVE_CTE_MATERIALIZE_THRESHOLD("hive.cte.materialize.threshold", -1, + "If the number of reference to a cte clause exceeds this threshold, hive will materialize " + + "it as a volatile table before executing main query block. -1 will disable this feature."); public final String varname; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index ca0d487..d8f5cc5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; @@ -45,9 +46,9 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; @@ -107,6 +108,8 @@ private boolean needLockMgr; + private AtomicInteger sequencer = new AtomicInteger(); + // Keep track of the mapping from load table desc to the output and the lock private final Map loadTableOutputMap = new HashMap(); @@ -713,4 +716,17 @@ public void setCboSucceeded(boolean cboSucceeded) { this.cboSucceeded = cboSucceeded; } + private final Map cteTables = new HashMap(); + + public Table getVolatileTable(String cteName) { + return cteTables.get(cteName); + } + + public void addVolatileTable(String cteName, Table table) { + cteTables.put(cteName, table); + } + + public AtomicInteger getSequencer() { + return sequencer; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4030075..6faa6c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -422,7 +422,7 @@ public int compile(String command, boolean resetTaskIds) { sem.analyze(tree, ctx); hookCtx.update(sem); for (HiveSemanticAnalyzerHook hook : saHooks) { - hook.postAnalyze(hookCtx, sem.getRootTasks()); + hook.postAnalyze(hookCtx, sem.getAllRootTasks()); } } else { sem.analyze(tree, ctx); @@ -535,7 +535,7 @@ private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); try { - List> rootTasks = sem.getRootTasks(); + List> rootTasks = sem.getAllRootTasks(); task.getJSONPlan(ps, astTree, rootTasks, sem.getFetchTask(), false, true, true); ret = baos.toString(); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index b9776ea..c37b7e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -119,13 +119,12 @@ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, S HiveOperation operation, Schema resultSchema) { this.queryString = queryString; - rootTasks = new ArrayList>(); - this.reducerTimeStatsPerJobList = new ArrayList(); - rootTasks.addAll(sem.getRootTasks()); + rootTasks = new ArrayList>(sem.getAllRootTasks()); + reducerTimeStatsPerJobList = new ArrayList(); fetchTask = sem.getFetchTask(); // Note that inputs and outputs can be changed when the query gets executed - inputs = sem.getInputs(); - outputs = sem.getOutputs(); + inputs = sem.getAllInputs(); + outputs = sem.getAllOutputs(); linfo = sem.getLineageInfo(); tableAccessInfo = sem.getTableAccessInfo(); columnAccessInfo = sem.getColumnAccessInfo(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 734742c..fef9f5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -112,7 +112,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreChecker; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -3706,10 +3705,14 @@ private boolean updateModifiedParameters(Map params, HiveConf co return true; } + private void validateSerDe(String serdeName) throws HiveException { + validateSerDe(serdeName, conf); + } + /** * Check if the given serde is valid. */ - private void validateSerDe(String serdeName) throws HiveException { + public static void validateSerDe(String serdeName, HiveConf conf) throws HiveException { try { Deserializer d = ReflectionUtil.newInstance(conf.getClassByName(serdeName). @@ -3815,164 +3818,9 @@ private int switchDatabase(Hive db, SwitchDatabaseDesc switchDb) */ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { // create the table - Table tbl; - if (crtTbl.getDatabaseName() == null || (crtTbl.getTableName().contains("."))){ - tbl = db.newTable(crtTbl.getTableName()); - }else { - tbl = new Table(crtTbl.getDatabaseName(),crtTbl.getTableName()); - } - - if (crtTbl.getTblProps() != null) { - tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()); - } - - if (crtTbl.getPartCols() != null) { - tbl.setPartCols(crtTbl.getPartCols()); - } - if (crtTbl.getNumBuckets() != -1) { - tbl.setNumBuckets(crtTbl.getNumBuckets()); - } - - if (crtTbl.getStorageHandler() != null) { - tbl.setProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, - crtTbl.getStorageHandler()); - } - HiveStorageHandler storageHandler = tbl.getStorageHandler(); - - /* - * We use LazySimpleSerDe by default. - * - * If the user didn't specify a SerDe, and any of the columns are not simple - * types, we will have to use DynamicSerDe instead. - */ - if (crtTbl.getSerName() == null) { - if (storageHandler == null) { - LOG.info("Default to LazySimpleSerDe for table " + crtTbl.getTableName()); - tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); - } else { - String serDeClassName = storageHandler.getSerDeClass().getName(); - LOG.info("Use StorageHandler-supplied " + serDeClassName - + " for table " + crtTbl.getTableName()); - tbl.setSerializationLib(serDeClassName); - } - } else { - // let's validate that the serde exists - validateSerDe(crtTbl.getSerName()); - tbl.setSerializationLib(crtTbl.getSerName()); - } - - if (crtTbl.getFieldDelim() != null) { - tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim()); - tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim()); - } - if (crtTbl.getFieldEscape() != null) { - tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape()); - } - - if (crtTbl.getCollItemDelim() != null) { - tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim()); - } - if (crtTbl.getMapKeyDelim() != null) { - tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim()); - } - if (crtTbl.getLineDelim() != null) { - tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim()); - } - if (crtTbl.getNullFormat() != null) { - tbl.setSerdeParam(serdeConstants.SERIALIZATION_NULL_FORMAT, crtTbl.getNullFormat()); - } - if (crtTbl.getSerdeProps() != null) { - Iterator> iter = crtTbl.getSerdeProps().entrySet() - .iterator(); - while (iter.hasNext()) { - Entry m = iter.next(); - tbl.setSerdeParam(m.getKey(), m.getValue()); - } - } - - if (crtTbl.getCols() != null) { - tbl.setFields(crtTbl.getCols()); - } - if (crtTbl.getBucketCols() != null) { - tbl.setBucketCols(crtTbl.getBucketCols()); - } - if (crtTbl.getSortCols() != null) { - tbl.setSortCols(crtTbl.getSortCols()); - } - if (crtTbl.getComment() != null) { - tbl.setProperty("comment", crtTbl.getComment()); - } - if (crtTbl.getLocation() != null) { - tbl.setDataLocation(new Path(crtTbl.getLocation())); - } - - if (crtTbl.getSkewedColNames() != null) { - tbl.setSkewedColNames(crtTbl.getSkewedColNames()); - } - if (crtTbl.getSkewedColValues() != null) { - tbl.setSkewedColValues(crtTbl.getSkewedColValues()); - } - - tbl.getTTable().setTemporary(crtTbl.isTemporary()); - - tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories()); - - tbl.setInputFormatClass(crtTbl.getInputFormat()); - tbl.setOutputFormatClass(crtTbl.getOutputFormat()); - - // only persist input/output format to metadata when it is explicitly specified. - // Otherwise, load lazily via StorageHandler at query time. - if (crtTbl.getInputFormat() != null && !crtTbl.getInputFormat().isEmpty()) { - tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()); - } - if (crtTbl.getOutputFormat() != null && !crtTbl.getOutputFormat().isEmpty()) { - tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()); - } - - if (!Utilities.isDefaultNameNode(conf) && doesTableNeedLocation(tbl)) { - // If location is specified - ensure that it is a full qualified name - makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName()); - } - - if (crtTbl.isExternal()) { - tbl.setProperty("EXTERNAL", "TRUE"); - tbl.setTableType(TableType.EXTERNAL_TABLE); - } - - // If the sorted columns is a superset of bucketed columns, store this fact. - // It can be later used to - // optimize some group-by queries. Note that, the order does not matter as - // long as it in the first - // 'n' columns where 'n' is the length of the bucketed columns. - if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) { - List bucketCols = tbl.getBucketCols(); - List sortCols = tbl.getSortCols(); - - if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) { - boolean found = true; - - Iterator iterBucketCols = bucketCols.iterator(); - while (iterBucketCols.hasNext()) { - String bucketCol = iterBucketCols.next(); - boolean colFound = false; - for (int i = 0; i < bucketCols.size(); i++) { - if (bucketCol.equals(sortCols.get(i).getCol())) { - colFound = true; - break; - } - } - if (colFound == false) { - found = false; - break; - } - } - if (found) { - tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE"); - } - } - } - + Table tbl = crtTbl.toTable(conf); + LOG.info("creating table " + tbl.getDbName() + "." + tbl.getTableName() + " on " + + tbl.getDataLocation()); // create the table if (crtTbl.getReplaceMode()){ // replace-mode creates are really alters using CreateTableDesc. @@ -4118,7 +3966,7 @@ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws Exceptio if (!Utilities.isDefaultNameNode(conf)) { // If location is specified - ensure that it is a full qualified name - makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName()); + makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tbl.getTableName(), conf); } // create the table @@ -4315,8 +4163,8 @@ public String getName() { * @param name * Object name. */ - private void makeLocationQualified(String databaseName, StorageDescriptor sd, String name) - throws HiveException { + public static void makeLocationQualified(String databaseName, StorageDescriptor sd, + String name, HiveConf conf) throws HiveException { Path path = null; if (!sd.isSetLocation()) { @@ -4388,7 +4236,7 @@ private void makeLocationQualified(Database database) throws HiveException { } } - private static boolean doesTableNeedLocation(Table tbl) { + public static boolean doesTableNeedLocation(Table tbl) { // If we are ok with breaking compatibility of existing 3rd party StorageHandlers, // this method could be moved to the HiveStorageHandler interface. boolean retval = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e584e6e..a64af6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -297,6 +297,22 @@ public boolean addDependentTask(Task dependent) { return ret; } + @SuppressWarnings("unchecked") + public static List> + findLeafs(List> rootTasks) { + final List> leafTasks = new ArrayList>(); + + NodeUtils.iterateTask(rootTasks, Task.class, new NodeUtils.Function() { + public void apply(Task task) { + List dependents = task.getDependentTasks(); + if (dependents == null || dependents.isEmpty()) { + leafTasks.add(task); + } + } + }); + return leafTasks; + } + /** * Remove the dependent task. * diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 52ed4a3..ff25413 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -91,6 +91,7 @@ private transient TableSpec tableSpec; + private transient boolean volatileTable; /** * Used only for serialization. @@ -335,6 +336,14 @@ public HiveStorageHandler getStorageHandler() { return outputFormatClass; } + public boolean isVolatileTable() { + return volatileTable; + } + + public void setVolatileTable(boolean volatileTable) { + this.volatileTable = volatileTable; + } + /** * Marker SemanticException, so that processing that allows for table validation failures * and appropriately handles them can recover from these types of SemanticExceptions diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 4a325fb..dd041a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1692,7 +1692,8 @@ public static boolean isMergeRequired(List> mvTasks, HiveConf hco // no need of merging if the move is to a local file system MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp); - if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) { + if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) && + !fsOp.getConf().isVolatile()) { GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index fbe93f9..b7e2605 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -34,7 +34,6 @@ import java.util.Map.Entry; import java.util.Set; -import org.antlr.runtime.tree.CommonTree; import org.antlr.runtime.tree.Tree; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -43,7 +42,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -75,7 +73,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import com.google.common.annotations.VisibleForTesting; @@ -108,7 +105,7 @@ public static int HIVE_COLUMN_ORDER_DESC = 0; /** - * ReadEntitites that are passed to the hooks. + * ReadEntities that are passed to the hooks. */ protected HashSet inputs; /** @@ -1471,4 +1468,16 @@ protected Partition getPartition(Table table, Map partSpec, protected String toMessage(ErrorMsg message, Object detail) { return detail == null ? message.getMsg() : message.getMsg(detail.toString()); } + + public List> getAllRootTasks() { + return rootTasks; + } + + public HashSet getAllInputs() { + return inputs; + } + + public HashSet getAllOutputs() { + return outputs; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 2d365a9..eed606b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -75,7 +75,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { sem.validate(); ctx.setResFile(ctx.getLocalTmpPath()); - List> tasks = sem.getRootTasks(); + List> tasks = sem.getAllRootTasks(); if (tasks == null) { tasks = Collections.emptyList(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 9334c73..391911f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -84,7 +85,7 @@ public Operator parentOfRoot; // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...) - private int sequenceNumber = 0; + private AtomicInteger sequenceNumber; // tez task we're currently processing public TezTask currentTask; @@ -200,12 +201,12 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.opMergeJoinWorkMap = new LinkedHashMap, MergeJoinWork>(); this.currentMergeJoinOperator = null; this.mapJoinToUnprocessedSmallTableReduceSinks = new HashMap>(); + this.sequenceNumber = parseContext.getContext().getSequencer(); rootTasks.add(currentTask); } - /** Not thread-safe. */ public int nextSequenceNumber() { - return ++sequenceNumber; + return sequenceNumber.incrementAndGet(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f6052e3..f714697 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -290,7 +290,8 @@ /* * Capture the CTE definitions in a Query. */ - private final Map aliasToCTEs; + private final Map aliasToCTEs; + /* * Used to check recursive CTE invocations. Similar to viewsExpanded */ @@ -338,7 +339,7 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException { HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME); queryProperties = new QueryProperties(); opToPartToSkewedPruner = new HashMap>(); - aliasToCTEs = new HashMap(); + aliasToCTEs = new HashMap(); globalLimitCtx = new GlobalLimitCtx(); viewAliasToInput = new HashMap(); noscan = partialscan = false; @@ -942,7 +943,7 @@ private void processCTE(QB qb, ASTNode ctes) throws SemanticException { if ( aliasToCTEs.containsKey(qName)) { throw new SemanticException(ErrorMsg.AMBIGUOUS_TABLE_ALIAS.getMsg(cte.getChild(1))); } - aliasToCTEs.put(qName, cteQry); + aliasToCTEs.put(qName, new CTEClause(qName, cteQry)); } } @@ -957,7 +958,7 @@ private void processCTE(QB qb, ASTNode ctes) throws SemanticException { * they appear in when adding them to the aliasToCTEs map. * */ - private ASTNode findCTEFromName(QB qb, String cteName) { + private CTEClause findCTEFromName(QB qb, String cteName) { StringBuilder qId = new StringBuilder(); if (qb.getId() != null) { qId.append(qb.getId()); @@ -965,8 +966,9 @@ private ASTNode findCTEFromName(QB qb, String cteName) { while (qId.length() > 0) { String nm = qId + ":" + cteName; - if (aliasToCTEs.containsKey(nm)) { - return aliasToCTEs.get(nm); + CTEClause cte = aliasToCTEs.get(nm); + if (cte != null) { + return cte; } int lastIndex = qId.lastIndexOf(":"); lastIndex = lastIndex < 0 ? 0 : lastIndex; @@ -975,6 +977,167 @@ private ASTNode findCTEFromName(QB qb, String cteName) { return aliasToCTEs.get(cteName); } + private final CTEClause rootClause = new CTEClause(null, null); + + @Override + public List> getAllRootTasks() { + return toRealRootTasks(rootClause.asExecutionOrder()); + } + + @Override + public HashSet getAllInputs() { + HashSet readEntities = new HashSet(getInputs()); + for (CTEClause cte : rootClause.asExecutionOrder()) { + if (cte.source != null) { + readEntities.addAll(cte.source.getInputs()); + } + } + return readEntities; + } + + @Override + public HashSet getAllOutputs() { + HashSet writeEntities = new HashSet(getOutputs()); + for (CTEClause cte : rootClause.asExecutionOrder()) { + if (cte.source != null) { + writeEntities.addAll(cte.source.getOutputs()); + } + } + return writeEntities; + } + + private class CTEClause { + private CTEClause(String alias, ASTNode cteNode) { + this.alias = alias; + this.cteNode = cteNode; + } + private String alias; + private ASTNode cteNode; + private int reference; + private QBExpr qbExpr; + private List parents = new ArrayList(); + + // materialized + private Table table; + private SemanticAnalyzer source; + + private List> getTasks() { + return source == null ? null : source.rootTasks; + } + + private List asExecutionOrder() { + List execution = new ArrayList(); + asExecutionOrder(new HashSet(), execution); + return execution; + } + + private void asExecutionOrder(Set visited, List execution) { + for (CTEClause parent : parents) { + if (visited.add(parent)) { + parent.asExecutionOrder(visited, execution); + } + } + execution.add(this); + } + + @Override + public String toString() { + return alias == null ? "" : alias; + } + } + + private List> toRealRootTasks(List execution) { + List> cteRoots = new ArrayList<>(); + List> cteLeafs = new ArrayList<>(); + List> curTopRoots = null; + List> curBottomLeafs = null; + for (int i = 0; i < execution.size(); i++) { + CTEClause current = execution.get(i); + if (current.parents.isEmpty() && curTopRoots != null) { + cteRoots.addAll(curTopRoots); + cteLeafs.addAll(curBottomLeafs); + curTopRoots = curBottomLeafs = null; + } + List> curTasks = current.getTasks(); + if (curTasks == null) { + continue; + } + if (curTopRoots == null) { + curTopRoots = curTasks; + } + if (curBottomLeafs != null) { + for (Task topLeafTask : curBottomLeafs) { + for (Task currentRootTask : curTasks) { + topLeafTask.addDependentTask(currentRootTask); + } + } + } + curBottomLeafs = Task.findLeafs(curTasks); + } + if (curTopRoots != null) { + cteRoots.addAll(curTopRoots); + cteLeafs.addAll(curBottomLeafs); + } + + if (cteRoots.isEmpty()) { + return rootTasks; + } + for (Task cteLeafTask : cteLeafs) { + for (Task mainRootTask : rootTasks) { + cteLeafTask.addDependentTask(mainRootTask); + } + } + return cteRoots; + } + + // used as text value of KW_TEMPORARY token. + private static final String VOLATILE_MARKER = "$VOLATILE"; + + private Table materializeCTE(String cteName, CTEClause cte) throws HiveException { + + ASTNode createTable = new ASTNode(new ClassicToken(HiveParser.TOK_CREATETABLE)); + + ASTNode tableName = new ASTNode(new ClassicToken(HiveParser.TOK_TABNAME)); + tableName.addChild(new ASTNode(new ClassicToken(HiveParser.Identifier, cteName))); + + ASTNode temporary = new ASTNode(new ClassicToken(HiveParser.KW_TEMPORARY, VOLATILE_MARKER)); + + createTable.addChild(tableName); + createTable.addChild(temporary); + createTable.addChild(cte.cteNode); + + SemanticAnalyzer analyzer = new SemanticAnalyzer(conf); + analyzer.initCtx(ctx); + analyzer.init(false); + + // should share cte contexts + analyzer.aliasToCTEs.putAll(aliasToCTEs); + + HiveOperation operation = SessionState.get().getHiveOperation(); + try { + analyzer.analyzeInternal(createTable); + } finally { + SessionState.get().setCommandType(operation); + } + + Table table = analyzer.tableDesc.toTable(conf); + Path location = table.getDataLocation(); + try { + location.getFileSystem(conf).mkdirs(location); + } catch (IOException e) { + throw new HiveException(e); + } + table.setVolatileTable(true); + + LOG.info(cteName + " will be materialized into " + location); + cte.table = table; + cte.source = analyzer; + + ctx.addVolatileTable(cteName, table); + + return table; + } + /* * If a CTE is referenced in a QueryBlock: * - add it as a SubQuery for now. @@ -986,12 +1149,9 @@ private ASTNode findCTEFromName(QB qb, String cteName) { * - trigger phase 1 on new QBExpr. * - update QB data structs: remove this as a table reference, move it to a SQ invocation. */ - private void addCTEAsSubQuery(QB qb, String cteName, String cteAlias) throws SemanticException { - cteAlias = cteAlias == null ? cteName : cteAlias; - ASTNode cteQryNode = findCTEFromName(qb, cteName); - QBExpr cteQBExpr = new QBExpr(cteAlias); - doPhase1QBExpr(cteQryNode, cteQBExpr, qb.getId(), cteAlias); - qb.rewriteCTEToSubq(cteAlias, cteName, cteQBExpr); + private void addCTEAsSubQuery(QB qb, String cteName, String cteAlias, QBExpr cteExpr) + throws SemanticException { + qb.rewriteCTEToSubq(cteAlias, cteName, cteExpr); } static boolean isJoinToken(ASTNode node) { @@ -1512,8 +1672,21 @@ private void handleInsertStatementSpecPhase1(ASTNode ast, QBParseInfo qbp, Phase } } - private void getMetaData(QBExpr qbexpr, ReadEntity parentInput) - throws SemanticException { + public void getMetaData(QB qb) throws SemanticException { + try { + getMetaData(qb, null); + } catch (HiveException e) { + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (e instanceof SemanticException) { + throw (SemanticException)e; + } + throw new SemanticException(e.getMessage(), e); + } + } + + private void getMetaData(QBExpr qbexpr, ReadEntity parentInput) throws HiveException { if (qbexpr.getOpcode() == QBExpr.Opcode.NULLOP) { getMetaData(qbexpr.getQB(), parentInput); } else { @@ -1522,190 +1695,245 @@ private void getMetaData(QBExpr qbexpr, ReadEntity parentInput) } } - public Table getTable(TableScanOperator ts) { - return topToTable.get(ts); + public void checkCTEReferences(QB qb) throws SemanticException { + try { + checkCTEReferences(qb, rootClause); + } catch (HiveException e) { + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (e instanceof SemanticException) { + throw (SemanticException)e; + } + throw new SemanticException(e.getMessage(), e); + } } - public void getMetaData(QB qb) throws SemanticException { - getMetaData(qb, null); + private void checkCTEReferences(QBExpr qbexpr, CTEClause parent) throws HiveException { + if (qbexpr.getOpcode() == QBExpr.Opcode.NULLOP) { + checkCTEReferences(qbexpr.getQB(), parent); + } else { + checkCTEReferences(qbexpr.getQBExpr1(), parent); + checkCTEReferences(qbexpr.getQBExpr2(), parent); + } } - @SuppressWarnings("nls") - public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException { - try { + // todo: check view references, too + private void checkCTEReferences(QB qb, CTEClause current) throws HiveException { + for (String alias : qb.getTabAliases()) { + String tab_name = qb.getTabNameForAlias(alias); + String cte_name = tab_name.toLowerCase(); + if (cte_name.contains(".")) { + // calcite adds database name to cte alias + cte_name = cte_name.substring(cte_name.lastIndexOf('.') + 1); + } + CTEClause cte = findCTEFromName(qb, cte_name); + if (cte != null) { + if (ctesExpanded.contains(cte_name)) { + throw new SemanticException("Recursive cte " + cte_name + + " detected (cycle: " + StringUtils.join(ctesExpanded, " -> ") + + " -> " + cte_name + ")."); + } + cte.reference++; + current.parents.add(cte); + if (cte.qbExpr != null) { + continue; + } + cte.qbExpr = new QBExpr(cte_name); + doPhase1QBExpr(cte.cteNode, cte.qbExpr, qb.getId(), cte_name); + + ctesExpanded.add(cte_name); + checkCTEReferences(cte.qbExpr, cte); + ctesExpanded.remove(ctesExpanded.size() - 1); + } + } + for (String alias : qb.getSubqAliases()) { + checkCTEReferences(qb.getSubqForAlias(alias), current); + } + } - LOG.info("Get metadata for source tables"); + @SuppressWarnings("nls") + private void getMetaData(QB qb, ReadEntity parentInput) throws HiveException { + LOG.info("Get metadata for source tables"); - // Go over the tables and populate the related structures. - // We have to materialize the table alias list since we might - // modify it in the middle for view rewrite. - List tabAliases = new ArrayList(qb.getTabAliases()); + // Go over the tables and populate the related structures. + // We have to materialize the table alias list since we might + // modify it in the middle for view rewrite. + List tabAliases = new ArrayList(qb.getTabAliases()); - // Keep track of view alias to view name and read entity - // For eg: for a query like 'select * from V3', where V3 -> V2, V2 -> V1, V1 -> T - // keeps track of full view name and read entity corresponding to alias V3, V3:V2, V3:V2:V1. - // This is needed for tracking the dependencies for inputs, along with their parents. - Map> aliasToViewInfo = - new HashMap>(); + // Keep track of view alias to view name and read entity + // For eg: for a query like 'select * from V3', where V3 -> V2, V2 -> V1, V1 -> T + // keeps track of full view name and read entity corresponding to alias V3, V3:V2, V3:V2:V1. + // This is needed for tracking the dependencies for inputs, along with their parents. + Map> aliasToViewInfo = + new HashMap>(); /* * used to capture view to SQ conversions. This is used to check for * recursive CTE invocations. */ - Map sqAliasToCTEName = new HashMap(); + Map sqAliasToCTEName = new HashMap(); + + for (String alias : tabAliases) { + String tab_name = qb.getTabNameForAlias(alias); + String cte_name = tab_name.toLowerCase(); + if (cte_name.contains(".")) { + cte_name = cte_name.substring(cte_name.lastIndexOf('.') + 1); + } - for (String alias : tabAliases) { - String tab_name = qb.getTabNameForAlias(alias); - + Table tab = ctx.getVolatileTable(cte_name); + if (tab == null) { // we first look for this alias from CTE, and then from catalog. /* - * if this s a CTE reference: Add its AST as a SubQuery to this QB. + * if this s a CTE reference: + * Add its AST as a SubQuery to this QB. */ - ASTNode cteNode = findCTEFromName(qb, tab_name.toLowerCase()); - if (cteNode != null) { - String cte_name = tab_name.toLowerCase(); - if (ctesExpanded.contains(cte_name)) { - throw new SemanticException("Recursive cte " + tab_name + " detected (cycle: " - + StringUtils.join(ctesExpanded, " -> ") + " -> " + tab_name + ")."); + CTEClause cte = findCTEFromName(qb, cte_name); + if (cte != null) { + int threshold = HiveConf.getIntVar(conf, ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD); + if (threshold < 0 || cte.reference < threshold) { + addCTEAsSubQuery(qb, cte_name, alias, cte.qbExpr); + sqAliasToCTEName.put(alias, cte_name); + continue; } - addCTEAsSubQuery(qb, cte_name, alias); - sqAliasToCTEName.put(alias, cte_name); - continue; + tab = materializeCTE(cte_name, cte); + } else { + tab = db.getTable(tab_name, false); } + } - Table tab = db.getTable(tab_name, false); - if (tab == null) { - ASTNode src = qb.getParseInfo().getSrcForAlias(alias); - if (null != src) { - throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(src)); - } else { - throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias)); - } + if (tab == null) { + ASTNode src = qb.getParseInfo().getSrcForAlias(alias); + if (null != src) { + throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(src)); + } else { + throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias)); } + } - // Disallow INSERT INTO on bucketized tables - boolean isAcid = isAcidTable(tab); - boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()); - if (isTableWrittenTo && - tab.getNumBuckets() > 0 && !isAcid) { - throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE. - getMsg("Table: " + tab_name)); - } - // Disallow update and delete on non-acid tables - if ((updating() || deleting()) && !isAcid && isTableWrittenTo) { - //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable) - //so only assert this if we are actually writing to this table - // isAcidTable above also checks for whether we are using an acid compliant - // transaction manager. But that has already been caught in - // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid - // here, it means the table itself doesn't support it. - throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tab_name); - } - - if (tab.isView()) { - if (qb.getParseInfo().isAnalyzeCommand()) { - throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg()); - } - String fullViewName = tab.getDbName() + "." + tab.getTableName(); - // Prevent view cycles - if (viewsExpanded.contains(fullViewName)) { - throw new SemanticException("Recursive view " + fullViewName + - " detected (cycle: " + StringUtils.join(viewsExpanded, " -> ") + - " -> " + fullViewName + ")."); - } - replaceViewReferenceWithDefinition(qb, tab, tab_name, alias); - // This is the last time we'll see the Table objects for views, so add it to the inputs - // now - ReadEntity viewInput = new ReadEntity(tab, parentInput); - viewInput = PlanUtils.addInput(inputs, viewInput); - aliasToViewInfo.put(alias, new ObjectPair(fullViewName, viewInput)); - viewAliasToInput.put(getAliasId(alias, qb), viewInput); - continue; - } + // Disallow INSERT INTO on bucketized tables + boolean isAcid = isAcidTable(tab); + boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()); + if (isTableWrittenTo && + tab.getNumBuckets() > 0 && !isAcid) { + throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE. + getMsg("Table: " + tab_name)); + } + // Disallow update and delete on non-acid tables + if ((updating() || deleting()) && !isAcid && isTableWrittenTo) { + //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable) + //so only assert this if we are actually writing to this table + // isAcidTable above also checks for whether we are using an acid compliant + // transaction manager. But that has already been caught in + // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid + // here, it means the table itself doesn't support it. + throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tab_name); + } - if (!InputFormat.class.isAssignableFrom(tab.getInputFormatClass())) { - throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getSrcForAlias(alias), - ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg())); - } + if (tab.isView()) { + if (qb.getParseInfo().isAnalyzeCommand()) { + throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg()); + } + String fullViewName = tab.getDbName() + "." + tab.getTableName(); + // Prevent view cycles + if (viewsExpanded.contains(fullViewName)) { + throw new SemanticException("Recursive view " + fullViewName + + " detected (cycle: " + StringUtils.join(viewsExpanded, " -> ") + + " -> " + fullViewName + ")."); + } + replaceViewReferenceWithDefinition(qb, tab, tab_name, alias); + // This is the last time we'll see the Table objects for views, so add it to the inputs + // now + ReadEntity viewInput = new ReadEntity(tab, parentInput); + viewInput = PlanUtils.addInput(inputs, viewInput); + aliasToViewInfo.put(alias, new ObjectPair(fullViewName, viewInput)); + viewAliasToInput.put(getAliasId(alias, qb), viewInput); + continue; + } + + if (!InputFormat.class.isAssignableFrom(tab.getInputFormatClass())) { + throw new SemanticException(generateErrorMessage( + qb.getParseInfo().getSrcForAlias(alias), + ErrorMsg.INVALID_INPUT_FORMAT_TYPE.getMsg())); + } - qb.getMetaData().setSrcForAlias(alias, tab); + qb.getMetaData().setSrcForAlias(alias, tab); - if (qb.getParseInfo().isAnalyzeCommand()) { - // allow partial partition specification for nonscan since noscan is fast. - TableSpec ts = new TableSpec(db, conf, (ASTNode) ast.getChild(0), true, this.noscan); - if (ts.specType == SpecType.DYNAMIC_PARTITION) { // dynamic partitions - try { - ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); - } catch (HiveException e) { - throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getSrcForAlias(alias), - "Cannot get partitions for " + ts.partSpec), e); - } + if (qb.getParseInfo().isAnalyzeCommand()) { + // allow partial partition specification for nonscan since noscan is fast. + TableSpec ts = new TableSpec(db, conf, (ASTNode) ast.getChild(0), true, this.noscan); + if (ts.specType == SpecType.DYNAMIC_PARTITION) { // dynamic partitions + try { + ts.partitions = db.getPartitionsByNames(ts.tableHandle, ts.partSpec); + } catch (HiveException e) { + throw new SemanticException(generateErrorMessage( + qb.getParseInfo().getSrcForAlias(alias), + "Cannot get partitions for " + ts.partSpec), e); } - // validate partial scan command - QBParseInfo qbpi = qb.getParseInfo(); - if (qbpi.isPartialScanAnalyzeCommand()) { - Class inputFormatClass = null; - switch (ts.specType) { - case TABLE_ONLY: - case DYNAMIC_PARTITION: - inputFormatClass = ts.tableHandle.getInputFormatClass(); - break; - case STATIC_PARTITION: - inputFormatClass = ts.partHandle.getInputFormatClass(); - break; - default: - assert false; - } - // throw a HiveException for formats other than rcfile or orcfile. - if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass - .equals(OrcInputFormat.class))) { - throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_NON_RCFILE.getMsg()); - } + } + // validate partial scan command + QBParseInfo qbpi = qb.getParseInfo(); + if (qbpi.isPartialScanAnalyzeCommand()) { + Class inputFormatClass = null; + switch (ts.specType) { + case TABLE_ONLY: + case DYNAMIC_PARTITION: + inputFormatClass = ts.tableHandle.getInputFormatClass(); + break; + case STATIC_PARTITION: + inputFormatClass = ts.partHandle.getInputFormatClass(); + break; + default: + assert false; + } + // throw a HiveException for formats other than rcfile or orcfile. + if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass + .equals(OrcInputFormat.class))) { + throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_NON_RCFILE.getMsg()); } - - tab.setTableSpec(ts); - qb.getParseInfo().addTableSpec(alias, ts); } - ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(getAliasId(alias, qb), viewAliasToInput); - PlanUtils.addInput(inputs, - new ReadEntity(tab, parentViewInfo, parentViewInfo == null)); + tab.setTableSpec(ts); + qb.getParseInfo().addTableSpec(alias, ts); } - LOG.info("Get metadata for subqueries"); - // Go over the subqueries and getMetaData for these - for (String alias : qb.getSubqAliases()) { - boolean wasView = aliasToViewInfo.containsKey(alias); - boolean wasCTE = sqAliasToCTEName.containsKey(alias); - ReadEntity newParentInput = null; - if (wasView) { - viewsExpanded.add(aliasToViewInfo.get(alias).getFirst()); - newParentInput = aliasToViewInfo.get(alias).getSecond(); - } else if (wasCTE) { - ctesExpanded.add(sqAliasToCTEName.get(alias)); - } - QBExpr qbexpr = qb.getSubqForAlias(alias); - getMetaData(qbexpr, newParentInput); - if (wasView) { - viewsExpanded.remove(viewsExpanded.size() - 1); - } else if (wasCTE) { - ctesExpanded.remove(ctesExpanded.size() - 1); - } + ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(getAliasId(alias, qb), viewAliasToInput); + PlanUtils.addInput(inputs, + new ReadEntity(tab, parentViewInfo, parentViewInfo == null)); + } + + LOG.info("Get metadata for subqueries"); + // Go over the subqueries and getMetaData for these + for (String alias : qb.getSubqAliases()) { + boolean wasView = aliasToViewInfo.containsKey(alias); + boolean wasCTE = sqAliasToCTEName.containsKey(alias); + ReadEntity newParentInput = null; + if (wasView) { + viewsExpanded.add(aliasToViewInfo.get(alias).getFirst()); + newParentInput = aliasToViewInfo.get(alias).getSecond(); + } else if (wasCTE) { + ctesExpanded.add(sqAliasToCTEName.get(alias)); } + QBExpr qbexpr = qb.getSubqForAlias(alias); + getMetaData(qbexpr, newParentInput); + if (wasView) { + viewsExpanded.remove(viewsExpanded.size() - 1); + } else if (wasCTE) { + ctesExpanded.remove(ctesExpanded.size() - 1); + } + } - RowFormatParams rowFormatParams = new RowFormatParams(); - StorageFormat storageFormat = new StorageFormat(conf); + RowFormatParams rowFormatParams = new RowFormatParams(); + StorageFormat storageFormat = new StorageFormat(conf); - LOG.info("Get metadata for destination tables"); - // Go over all the destination structures and populate the related - // metadata - QBParseInfo qbp = qb.getParseInfo(); + LOG.info("Get metadata for destination tables"); + // Go over all the destination structures and populate the related + // metadata + QBParseInfo qbp = qb.getParseInfo(); - for (String name : qbp.getClauseNamesForDest()) { - ASTNode ast = qbp.getDestForClause(name); - switch (ast.getToken().getType()) { + for (String name : qbp.getClauseNamesForDest()) { + ASTNode ast = qbp.getDestForClause(name); + switch (ast.getToken().getType()) { case HiveParser.TOK_TAB: { TableSpec ts = new TableSpec(db, conf, ast); if (ts.tableHandle.isView()) { @@ -1832,13 +2060,7 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException default: throw new SemanticException(generateErrorMessage(ast, "Unknown Token Type " + ast.getToken().getType())); - } } - } catch (HiveException e) { - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new SemanticException(e.getMessage(), e); } } @@ -6171,6 +6393,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Table dest_tab = null; // destination table if any boolean destTableIsAcid = false; // should the destination table be written to using ACID boolean destTableIsTemporary = false; + boolean destTableIsVolatile = false; Partition dest_part = null;// destination partition if any Path queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory @@ -6459,6 +6682,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (tblDesc != null) { field_schemas = new ArrayList(); destTableIsTemporary = tblDesc.isTemporary(); + destTableIsVolatile = tblDesc.isVolatile(); } boolean first = true; @@ -6612,6 +6836,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } fileSinkDesc.setTemporary(destTableIsTemporary); + fileSinkDesc.setVolatile(destTableIsVolatile); /* Set List Bucketing context. */ if (lbCtx != null) { @@ -6629,7 +6854,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // can be changed in the optimizer but the key should not be changed // it should be the same as the MoveWork's sourceDir. fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); - if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { + if (!destTableIsVolatile && + HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString(); LOG.info("Set stats collection dir : " + statsTmpLoc); conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc); @@ -10076,6 +10302,7 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { LOG.info("Completed phase 1 of Semantic Analysis"); // 5. Resolve Parse Tree + checkCTEReferences(qb); getMetaData(qb); LOG.info("Completed getting MetaData in Semantic Analysis"); @@ -10732,6 +10959,7 @@ ASTNode analyzeCreateTable( boolean ifNotExists = false; boolean isExt = false; boolean isTemporary = false; + boolean isVolatile = false; ASTNode selectStmt = null; final int CREATE_TABLE = 0; // regular CREATE TABLE final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT) @@ -10771,6 +10999,7 @@ ASTNode analyzeCreateTable( break; case HiveParser.KW_TEMPORARY: isTemporary = true; + isVolatile = VOLATILE_MARKER.equals(child.getText()); break; case HiveParser.TOK_LIKETABLE: if (child.getChildCount() > 0) { @@ -11003,6 +11232,7 @@ ASTNode analyzeCreateTable( storageFormat.getOutputFormat(), location, storageFormat.getSerde(), storageFormat.getStorageHandler(), storageFormat.getSerdeProps(), tblProps, ifNotExists, skewedColNames, skewedValues); + tableDesc.setVolatile(isVolatile); tableDesc.setStoredAsSubDirectories(storedAsDirs); tableDesc.setNullFormat(rowFormatParams.nullFormat); qb.setTableDesc(tableDesc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index ba11e41..db62dce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -228,7 +228,7 @@ public void compile(final ParseContext pCtx, final List> skewedColValues; boolean isStoredAsSubDirectories = false; boolean isTemporary = false; + private boolean isVolatile = false; private boolean replaceMode = false; public CreateTableDesc() { @@ -553,6 +561,21 @@ public void setTemporary(boolean isTemporary) { } /** + * @return the isVolatile + */ + @Explain(displayName = "isVolatile", displayOnlyOnTrue = true) + public boolean isVolatile() { + return isVolatile; + } + + /** + * @param isVolatile table is Volatile or not. + */ + public void setVolatile(boolean isVolatile) { + this.isVolatile = isVolatile; + } + + /** * @param replaceMode Determine if this CreateTable should behave like a replace-into alter instead */ public void setReplaceMode(boolean replaceMode) { @@ -565,4 +588,170 @@ public void setReplaceMode(boolean replaceMode) { public boolean getReplaceMode() { return replaceMode; } + + public Table toTable(HiveConf conf) throws HiveException { + String databaseName = getDatabaseName(); + String tableName = getTableName(); + + if (databaseName == null || tableName.contains(".")) { + String[] names = Utilities.getDbTableName(tableName); + databaseName = names[0]; + tableName = names[1]; + } + + Table tbl = new Table(databaseName, tableName); + + if (getTblProps() != null) { + tbl.getTTable().getParameters().putAll(getTblProps()); + } + + if (getPartCols() != null) { + tbl.setPartCols(getPartCols()); + } + if (getNumBuckets() != -1) { + tbl.setNumBuckets(getNumBuckets()); + } + + if (getStorageHandler() != null) { + tbl.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, + getStorageHandler()); + } + HiveStorageHandler storageHandler = tbl.getStorageHandler(); + + /* + * We use LazySimpleSerDe by default. + * + * If the user didn't specify a SerDe, and any of the columns are not simple + * types, we will have to use DynamicSerDe instead. + */ + if (getSerName() == null) { + if (storageHandler == null) { + LOG.info("Default to LazySimpleSerDe for table " + tableName); + tbl.setSerializationLib(LazySimpleSerDe.class.getName()); + } else { + String serDeClassName = storageHandler.getSerDeClass().getName(); + LOG.info("Use StorageHandler-supplied " + serDeClassName + + " for table " + tableName); + tbl.setSerializationLib(serDeClassName); + } + } else { + // let's validate that the serde exists + DDLTask.validateSerDe(getSerName(), conf); + tbl.setSerializationLib(getSerName()); + } + + if (getFieldDelim() != null) { + tbl.setSerdeParam(serdeConstants.FIELD_DELIM, getFieldDelim()); + tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, getFieldDelim()); + } + if (getFieldEscape() != null) { + tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, getFieldEscape()); + } + + if (getCollItemDelim() != null) { + tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, getCollItemDelim()); + } + if (getMapKeyDelim() != null) { + tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, getMapKeyDelim()); + } + if (getLineDelim() != null) { + tbl.setSerdeParam(serdeConstants.LINE_DELIM, getLineDelim()); + } + if (getNullFormat() != null) { + tbl.setSerdeParam(serdeConstants.SERIALIZATION_NULL_FORMAT, getNullFormat()); + } + if (getSerdeProps() != null) { + Iterator> iter = getSerdeProps().entrySet() + .iterator(); + while (iter.hasNext()) { + Map.Entry m = iter.next(); + tbl.setSerdeParam(m.getKey(), m.getValue()); + } + } + + if (getCols() != null) { + tbl.setFields(getCols()); + } + if (getBucketCols() != null) { + tbl.setBucketCols(getBucketCols()); + } + if (getSortCols() != null) { + tbl.setSortCols(getSortCols()); + } + if (getComment() != null) { + tbl.setProperty("comment", getComment()); + } + if (getLocation() != null) { + tbl.setDataLocation(new Path(getLocation())); + } + + if (getSkewedColNames() != null) { + tbl.setSkewedColNames(getSkewedColNames()); + } + if (getSkewedColValues() != null) { + tbl.setSkewedColValues(getSkewedColValues()); + } + + tbl.getTTable().setTemporary(isTemporary()); + + tbl.setStoredAsSubDirectories(isStoredAsSubDirectories()); + + tbl.setInputFormatClass(getInputFormat()); + tbl.setOutputFormatClass(getOutputFormat()); + + // only persist input/output format to metadata when it is explicitly specified. + // Otherwise, load lazily via StorageHandler at query time. + if (getInputFormat() != null && !getInputFormat().isEmpty()) { + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()); + } + if (getOutputFormat() != null && !getOutputFormat().isEmpty()) { + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()); + } + + if (!Utilities.isDefaultNameNode(conf) && DDLTask.doesTableNeedLocation(tbl)) { + // If location is specified - ensure that it is a full qualified name + DDLTask.makeLocationQualified(tbl.getDbName(), tbl.getTTable().getSd(), tableName, conf); + } + + if (isExternal()) { + tbl.setProperty("EXTERNAL", "TRUE"); + tbl.setTableType(TableType.EXTERNAL_TABLE); + } + + // If the sorted columns is a superset of bucketed columns, store this fact. + // It can be later used to + // optimize some group-by queries. Note that, the order does not matter as + // long as it in the first + // 'n' columns where 'n' is the length of the bucketed columns. + if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) { + List bucketCols = tbl.getBucketCols(); + List sortCols = tbl.getSortCols(); + + if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) { + boolean found = true; + + Iterator iterBucketCols = bucketCols.iterator(); + while (iterBucketCols.hasNext()) { + String bucketCol = iterBucketCols.next(); + boolean colFound = false; + for (int i = 0; i < bucketCols.size(); i++) { + if (bucketCol.equals(sortCols.get(i).getCol())) { + colFound = true; + break; + } + } + if (colFound == false) { + found = false; + break; + } + } + if (found) { + tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE"); + } + } + } + return tbl; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index f73b502..7969d39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -51,6 +51,7 @@ private String compressType; private boolean multiFileSpray; private boolean temporary; + private boolean isVolatile; // Whether the files output by this FileSink can be merged, e.g. if they are to be put into a // bucketed or sorted table/partition they cannot be merged. private boolean canBeMerged; @@ -241,6 +242,14 @@ public void setTemporary(boolean temporary) { this.temporary = temporary; } + public boolean isVolatile() { + return isVolatile; + } + + public void setVolatile(boolean isVolatile) { + this.isVolatile = isVolatile; + } + public boolean canBeMerged() { return canBeMerged; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 55aea0e..375c7b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableByteObjectInspector; @@ -91,7 +90,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -569,7 +567,7 @@ public static long getFileSizeForTable(HiveConf conf, Table table) { * - hive conf * @param parts * - partition list - * @return sizes of patitions + * @return sizes of partitions */ public static List getFileSizeForPartitions(HiveConf conf, List parts) { List sizes = Lists.newArrayList(); @@ -721,6 +719,10 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab */ public static List getTableColumnStats( Table table, List schema, List neededColumns) { + if (table.isVolatileTable()) { + LOG.debug("Volatile table does not contain table statistics"); + return null; + } String dbName = table.getDbName(); String tabName = table.getTableName(); List neededColsInTable = processNeededColumns(schema, neededColumns);