diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 183fae5b9d..7b351af064 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.hadoop.conf.Configurable; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; @@ -273,6 +275,10 @@ private boolean getNextPath() throws Exception { if (isNonNativeTable) { return true; } + if(!this.getWork().isSourceTable() && this.getWork().getFilesToFetch() != null + && !this.getWork().getFilesToFetch().isEmpty()) { + return true; + } FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { if (extractValidWriteIdList() != null && @@ -367,55 +373,69 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } protected FetchInputFormatSplit[] getNextSplits() throws Exception { - while (getNextPath()) { - // not using FileInputFormat.setInputPaths() here because it forces a connection to the - // default file system - which may or may not be online during pure metadata operations - job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); + while (getNextPath()) { + // not using FileInputFormat.setInputPaths() here because it forces a connection to the + // default file system - which may or may not be online during pure metadata operations + job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); // Fetch operator is not vectorized and as such turn vectorization flag off so that // non-vectorized record reader is created below. HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - Class formatter = currDesc.getInputFileFormatClass(); - Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); - InputFormat inputFormat = getInputFormatFromCache(formatter, job); - List dirs = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); - processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals); + Class formatter = currDesc.getInputFileFormatClass(); + Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); + InputFormat inputFormat = getInputFormatFromCache(formatter, job); + List dirs = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals); if (dirs.isEmpty() && dirsWithOriginals.isEmpty()) { LOG.debug("No valid directories for " + currPath); continue; } - List inputSplits = new ArrayList<>(); - if (!dirs.isEmpty()) { - String inputs = makeInputString(dirs); - Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to {}", inputs); - job.set("mapred.input.dir", inputs); + List inputSplits = new ArrayList<>(); + if(inputFormat instanceof HiveSequenceFileInputFormat && this.getWork().getFilesToFetch() != null + && !this.getWork().getFilesToFetch().isEmpty() && !this.getWork().isSourceTable()) { + if(this.getWork().getFilesToFetch().isEmpty()) { + continue; + } + //TODO: is numSplits 1 right? + HiveSequenceFileInputFormat fileFormat = (HiveSequenceFileInputFormat)inputFormat; + fileFormat.setListsToFetch(this.getWork().getFilesToFetch()); + InputSplit[] splits = inputFormat.getSplits(job, 1 ); + for (int i = 0; i < splits.length; i++) { + inputSplits.add(new FetchInputFormatSplit(splits[i], inputFormat)); + } + } else { + if (!dirs.isEmpty()) { + String inputs = makeInputString(dirs); + Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to {}", inputs); + job.set("mapred.input.dir", inputs); - generateWrappedSplits(inputFormat, inputSplits, job); + generateWrappedSplits(inputFormat, inputSplits, job); + } } - if (!dirsWithOriginals.isEmpty()) { - String inputs = makeInputString(dirsWithOriginals); - Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to {}", inputs); - JobConf jobNoRec = HiveInputFormat.createConfForMmOriginalsSplit(job, dirsWithOriginals); - jobNoRec.set("mapred.input.dir", inputs); - generateWrappedSplits(inputFormat, inputSplits, jobNoRec); - } + if (!dirsWithOriginals.isEmpty()) { + String inputs = makeInputString(dirsWithOriginals); + Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to {}", inputs); + JobConf jobNoRec = HiveInputFormat.createConfForMmOriginalsSplit(job, dirsWithOriginals); + jobNoRec.set("mapred.input.dir", inputs); + generateWrappedSplits(inputFormat, inputSplits, jobNoRec); + } - if (work.getSplitSample() != null) { - inputSplits = splitSampling(work.getSplitSample(), inputSplits); - } + if (work.getSplitSample() != null) { + inputSplits = splitSampling(work.getSplitSample(), inputSplits); + } - if (inputSplits.isEmpty()) { - LOG.debug("No splits for " + currPath); - continue; - } - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { - Collections.sort(inputSplits, new FetchInputFormatSplitComparator()); + if (inputSplits.isEmpty()) { + LOG.debug("No splits for " + currPath); + continue; + } + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { + Collections.sort(inputSplits, new FetchInputFormatSplitComparator()); + } + return inputSplits.toArray(new FetchInputFormatSplit[inputSplits.size()]); } - return inputSplits.toArray(new FetchInputFormatSplit[inputSplits.size()]); - } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 61e34308bc..c9e38b96ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -134,6 +134,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; @@ -1204,6 +1205,18 @@ private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set filesToMove) + throws IOException, HiveException { + if (!fs.exists(dst)) { + fs.mkdirs(dst); + } + + for (Path path: filesToMove) { + FileStatus fsStatus = fs.getFileStatus(path); + Utilities.moveFile(fs, fsStatus, dst); + } + } + private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException, HiveException { Path srcFilePath = file.getPath(); @@ -1463,6 +1476,19 @@ private static String replaceTaskIdFromFilename(String filename, String oldTaskI return snew.toString(); } + + public static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) { + // we are avoiding rename/move only if following conditions are met + // * execution engine is tez + // * query cache is disabled + // * if it is select query + if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null + && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez") + && !HiveConf.getBoolVar(hConf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)){ + return true; + } + return false; + } /** * returns null if path is not exist */ @@ -1503,11 +1529,11 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // (each directory move would result in a file-by-file move of the files in the directory) // FileSystem fs = specPath.getFileSystem(hconf); - boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { - if (!isBlobStorage && fs.exists(tmpPath)) { + if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath)) { + Utilities.FILE_OP_LOGGER.trace("Skipping renaming tmpPath"); // 1) Rename tmpPath to a new directory name to prevent additional files // from being added by runaway processes. Path tmpPathOriginal = tmpPath; @@ -1538,17 +1564,27 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // move to the file destination Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); - - perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); - if (isBlobStorage) { + if(shouldAvoidRename(conf, hconf)){ + Utilities.FILE_OP_LOGGER.trace("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); + conf.getFilesToFetch().addAll(filesKept); + } else { + Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept); + } + //perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); + /*if (isBlobStorage) { // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks, // by moving just the files we've tracked from removeTempOrDuplicateFiles(). - Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept); + if(conf.getFilesToFetch() != null && conf.getIsQuery()) { + Utilities.FILE_OP_LOGGER.trace("Skipping rename/move files for blobstorage. Files to be kept are: " + filesKept.toString()); + conf.getFilesToFetch().addAll(filesKept); + } else { + Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept); + } } else { // For non-blobstore case, can just move the directory - the initial directory rename // at the start of this method should prevent files written by runaway tasks. Utilities.renameOrMoveFiles(fs, tmpPath, specPath); - } + } */ perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); } } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileInputFormat.java new file mode 100644 index 0000000000..cfb071e798 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileInputFormat.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.*; + +/** + * RCFileInputFormat. + * + * @param + * @param + */ +public class HiveSequenceFileInputFormat + extends SequenceFileInputFormat { + + public HiveSequenceFileInputFormat() { + setMinSplitSize(SequenceFile.SYNC_INTERVAL); + } + + Set listsToFetch = null; + + public void setListsToFetch(Set listsToFetch) { + this.listsToFetch = listsToFetch; + } + + /*public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if(listsToFetch == null) { + return super.getSplits(job, numSplits); + } + ArrayList splits = new ArrayList(numSplits); + for(Path path:listsToFetch) { + FileSystem fs = path.getFileSystem(job); + FileStatus fsStatus = fs.getFileStatus(path); + splits.add(makeSplit(path, 0, fsStatus.getLen(), null, null)); + } + return splits.toArray(new FileSplit[splits.size()]); + } */ + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + if(listsToFetch == null) { + return super.listStatus(job); + } + List fsStatusList = new ArrayList<>(); + for(Path path:listsToFetch) { + FileSystem fs = path.getFileSystem(job); + FileStatus fsStatus = fs.getFileStatus(path); + fsStatusList.add(fsStatus); + } + FileStatus[] fsStatusArray = new FileStatus[fsStatusList.size()]; + return fsStatusList.toArray(fsStatusArray); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 94879c9529..9a5d6a120a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -299,6 +299,11 @@ public static void removeUnionOperators(GenTezProcContext context, BaseWork work Set> seen = new HashSet>(); + Set filesToFetch = null; + if(context.parseContext.getFetchTask() != null) { + filesToFetch = context.parseContext.getFetchTask().getWork().getFilesToFetch(); + } + while(!operators.isEmpty()) { Operator current = operators.pop(); seen.add(current); @@ -325,6 +330,7 @@ public static void removeUnionOperators(GenTezProcContext context, BaseWork work + desc.getDirName() + "; parent " + path); desc.setLinkedFileSink(true); desc.setLinkedFileSinkDesc(linked); + desc.setFilesToFetch(filesToFetch); } if (current instanceof AppMasterEventOperator) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8dc5b34a34..8d1309dc30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7862,7 +7862,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas, isInsertOverwrite); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery()); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index cc676c55f8..612edae6d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,6 +20,7 @@ import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import com.google.common.collect.Lists; import org.apache.commons.collections.*; import org.apache.hadoop.fs.Path; @@ -32,8 +33,11 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MaterializedViewDesc; import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -62,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -81,6 +86,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -177,9 +183,16 @@ public void compile(final ParseContext pCtx, // read formatted thrift objects from the output SequenceFile written by Tasks. conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); } else { - resultTab = - PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, - LazySimpleSerDe.class); + if(resFileFormat.equalsIgnoreCase("SequenceFile")) { + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, "HiveSequenceFile", + LazySimpleSerDe.class); + + } else { + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, + LazySimpleSerDe.class); + } } } else { if (resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB) @@ -204,6 +217,16 @@ public void compile(final ParseContext pCtx, fetch.setIsUsingThriftJDBCBinarySerDe(false); } + Collection> tableScanOps = + Lists.>newArrayList(pCtx.getTopOps().values()); + Set fsOps = OperatorUtils.findOperators(tableScanOps, FileSinkOperator.class); + if(fsOps != null && fsOps.size() == 1) { + FileSinkOperator op = fsOps.iterator().next(); + Set filesToFetch = new HashSet<>(); + op.getConf().setFilesToFetch(filesToFetch); + fetch.setFilesToFetch(filesToFetch); + } + pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch)); // For the FetchTask, the limit optimization requires we fetch all the rows diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index 1f139c8020..b85db1531a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeMap; @@ -30,8 +31,8 @@ import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.parse.SplitSample; -import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -80,6 +81,8 @@ */ private boolean isCachedResult = false; + private Set filesToFetch = null; + public boolean isHiveServerQuery() { return isHiveServerQuery; } @@ -132,6 +135,7 @@ public FetchWork(List partDir, List partDesc, this.partDir = new ArrayList(partDir); this.partDesc = new ArrayList(partDesc); this.limit = limit; + this.filesToFetch = new HashSet<>(); } public void initializeForFetch(CompilationOpContext ctx) { @@ -293,6 +297,13 @@ public void setLeastNumRows(int leastNumRows) { return source; } + public boolean isSourceTable() { + if(this.source != null && this.source instanceof TableScanOperator) { + return true; + } + return false; + } + public void setSource(Operator source) { this.source = source; } @@ -377,4 +388,12 @@ public boolean isCachedResult() { public void setCachedResult(boolean isCachedResult) { this.isCachedResult = isCachedResult; } + + public void setFilesToFetch(Set filesToFetch) { + this.filesToFetch = filesToFetch; + } + + public Set getFilesToFetch() { + return filesToFetch; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 42b8f40fc8..6ce95092e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -101,6 +102,8 @@ private boolean isMerge; private boolean isMmCtas; + private Set filesToFetch = null; + /** * Whether is a HiveServer query, and the destination table is * indeed written using a row batching SerDe @@ -109,6 +112,8 @@ private boolean isInsertOverwrite = false; + private boolean isQuery = false; + public FileSinkDesc() { } @@ -119,7 +124,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -136,6 +141,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.mmWriteId = mmWriteId; this.isMmCtas = isMmCtas; this.isInsertOverwrite = isInsertOverwrite; + this.isQuery = isQuery; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -157,7 +163,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -172,9 +178,23 @@ public Object clone() throws CloneNotSupportedException { ret.setStatementId(statementId); ret.setStatsTmpDir(statsTmpDir); ret.setIsMerge(isMerge); + ret.setFilesToFetch(filesToFetch); + ret.setIsQuery(isQuery); return ret; } + public void setFilesToFetch(Set filesToFetch) { + this.filesToFetch = filesToFetch; + } + + public void setIsQuery(boolean isQuery) { this.isQuery = isQuery; } + + public boolean getIsQuery() { return this.isQuery; } + + public Set getFilesToFetch() { + return filesToFetch; + } + public boolean isHiveServerQuery() { return this.isHiveServerQuery; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 5229700dbd..76cf54ec3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileInputFormat; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; @@ -280,7 +281,10 @@ public static TableDesc getTableDesc( Class inputFormat, outputFormat; // get the input & output file formats - if ("SequenceFile".equalsIgnoreCase(fileFormat)) { + if ("HiveSequenceFile".equalsIgnoreCase(fileFormat)) { + inputFormat = HiveSequenceFileInputFormat.class; + outputFormat = SequenceFileOutputFormat.class; + } else if ("SequenceFile".equalsIgnoreCase(fileFormat)) { inputFormat = SequenceFileInputFormat.class; outputFormat = SequenceFileOutputFormat.class; } else if ("RCFile".equalsIgnoreCase(fileFormat)) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index b369c9633c..a75103d60d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); }