Index: ql/src/test/results/clientpositive/insert_nothing.q.out =================================================================== --- ql/src/test/results/clientpositive/insert_nothing.q.out (revision 0) +++ ql/src/test/results/clientpositive/insert_nothing.q.out (revision 0) @@ -0,0 +1,148 @@ +PREHOOK: query: drop table insert_nothing +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table insert_nothing +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table insert_nothing1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table insert_nothing1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table insert_nothing (key int, value string) stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table insert_nothing (key int, value string) stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@insert_nothing +PREHOOK: query: insert overwrite table insert_nothing select * from src where false +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@insert_nothing +POSTHOOK: query: insert overwrite table insert_nothing select * from src where false +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@insert_nothing +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted insert_nothing +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted insert_nothing +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int from deserializer +value string from deserializer + +# Detailed Table Information +Database: default +Owner: franklin +CreateTime: Fri Aug 05 15:17:02 PDT 2011 +LastAccessTime: UNKNOWN +Protect Mode: None +Retention: 0 +Location: pfile:/data/users/franklin/hive-clean/build/ql/test/data/warehouse/insert_nothing +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 0 + numPartitions 0 + numRows 0 + rawDataSize 0 + totalSize 0 + transient_lastDdlTime 1312582629 + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe +InputFormat: org.apache.hadoop.hive.ql.io.RCFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.RCFileOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: create table insert_nothing1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table insert_nothing1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@insert_nothing1 +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table insert_nothing1 select * from src where false +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@insert_nothing1 +POSTHOOK: query: insert overwrite table insert_nothing1 select * from src where false +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@insert_nothing1 +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted insert_nothing1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted insert_nothing1 +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int from deserializer +value string from deserializer + +# Detailed Table Information +Database: default +Owner: franklin +CreateTime: Fri Aug 05 15:17:09 PDT 2011 +LastAccessTime: UNKNOWN +Protect Mode: None +Retention: 0 +Location: pfile:/data/users/franklin/hive-clean/build/ql/test/data/warehouse/insert_nothing1 +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 0 + numPartitions 0 + numRows 0 + rawDataSize 0 + totalSize 0 + transient_lastDdlTime 1312582633 + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe +InputFormat: org.apache.hadoop.hive.ql.io.RCFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.RCFileOutputFormat +Compressed: No +Num Buckets: 100 +Bucket Columns: [key] +Sort Columns: [Order(col:value, order:1)] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table insert_nothing +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@insert_nothing +PREHOOK: Output: default@insert_nothing +POSTHOOK: query: drop table insert_nothing +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@insert_nothing +POSTHOOK: Output: default@insert_nothing +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table insert_nothing1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@insert_nothing1 +PREHOOK: Output: default@insert_nothing1 +POSTHOOK: query: drop table insert_nothing1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@insert_nothing1 +POSTHOOK: Output: default@insert_nothing1 +POSTHOOK: Lineage: insert_nothing.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/insert_nothing_enforced.q.out =================================================================== --- ql/src/test/results/clientpositive/insert_nothing_enforced.q.out (revision 0) +++ ql/src/test/results/clientpositive/insert_nothing_enforced.q.out (revision 0) @@ -0,0 +1,148 @@ +PREHOOK: query: drop table insert_nothing_enforced +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table insert_nothing_enforced +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table insert_nothing_enforced1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table insert_nothing_enforced1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table insert_nothing_enforced (key int, value string) stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table insert_nothing_enforced (key int, value string) stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@insert_nothing_enforced +PREHOOK: query: insert overwrite table insert_nothing_enforced select * from src where false +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@insert_nothing_enforced +POSTHOOK: query: insert overwrite table insert_nothing_enforced select * from src where false +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@insert_nothing_enforced +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted insert_nothing_enforced +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted insert_nothing_enforced +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int from deserializer +value string from deserializer + +# Detailed Table Information +Database: default +Owner: franklin +CreateTime: Fri Aug 05 14:26:46 PDT 2011 +LastAccessTime: UNKNOWN +Protect Mode: None +Retention: 0 +Location: pfile:/data/users/franklin/hive-clean/build/ql/test/data/warehouse/insert_nothing_enforced +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 0 + numPartitions 0 + numRows 0 + rawDataSize 0 + totalSize 0 + transient_lastDdlTime 1312579612 + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe +InputFormat: org.apache.hadoop.hive.ql.io.RCFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.RCFileOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: create table insert_nothing_enforced1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table insert_nothing_enforced1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@insert_nothing_enforced1 +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table insert_nothing_enforced1 select * from src where false +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@insert_nothing_enforced1 +POSTHOOK: query: insert overwrite table insert_nothing_enforced1 select * from src where false +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@insert_nothing_enforced1 +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: desc formatted insert_nothing_enforced1 +PREHOOK: type: DESCTABLE +POSTHOOK: query: desc formatted insert_nothing_enforced1 +POSTHOOK: type: DESCTABLE +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key int from deserializer +value string from deserializer + +# Detailed Table Information +Database: default +Owner: franklin +CreateTime: Fri Aug 05 14:26:52 PDT 2011 +LastAccessTime: UNKNOWN +Protect Mode: None +Retention: 0 +Location: pfile:/data/users/franklin/hive-clean/build/ql/test/data/warehouse/insert_nothing_enforced1 +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 100 + numPartitions 0 + numRows 0 + rawDataSize 0 + totalSize 15100 + transient_lastDdlTime 1312579624 + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe +InputFormat: org.apache.hadoop.hive.ql.io.RCFileInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.RCFileOutputFormat +Compressed: No +Num Buckets: 100 +Bucket Columns: [key] +Sort Columns: [Order(col:value, order:1)] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: drop table insert_nothing_enforced +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@insert_nothing_enforced +PREHOOK: Output: default@insert_nothing_enforced +POSTHOOK: query: drop table insert_nothing_enforced +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@insert_nothing_enforced +POSTHOOK: Output: default@insert_nothing_enforced +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table insert_nothing_enforced1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@insert_nothing_enforced1 +PREHOOK: Output: default@insert_nothing_enforced1 +POSTHOOK: query: drop table insert_nothing_enforced1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@insert_nothing_enforced1 +POSTHOOK: Output: default@insert_nothing_enforced1 +POSTHOOK: Lineage: insert_nothing_enforced.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: insert_nothing_enforced1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/queries/clientpositive/insert_nothing_enforced.q =================================================================== --- ql/src/test/queries/clientpositive/insert_nothing_enforced.q (revision 0) +++ ql/src/test/queries/clientpositive/insert_nothing_enforced.q (revision 0) @@ -0,0 +1,17 @@ +set hive.stats.autogather=true; +set hive.enforce.bucketing=true; +drop table insert_nothing_enforced; +drop table insert_nothing_enforced1; + +create table insert_nothing_enforced (key int, value string) stored as rcfile; +insert overwrite table insert_nothing_enforced select * from src where false; +desc formatted insert_nothing_enforced; + +create table insert_nothing_enforced1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile; +insert overwrite table insert_nothing_enforced1 select * from src where false; +desc formatted insert_nothing_enforced1; + +drop table insert_nothing_enforced; +drop table insert_nothing_enforced1; Index: ql/src/test/queries/clientpositive/insert_nothing.q =================================================================== --- ql/src/test/queries/clientpositive/insert_nothing.q (revision 0) +++ ql/src/test/queries/clientpositive/insert_nothing.q (revision 0) @@ -0,0 +1,17 @@ +set hive.stats.autogather=true; +set hive.enforce.bucketing=false; +drop table insert_nothing; +drop table insert_nothing1; + +create table insert_nothing (key int, value string) stored as rcfile; +insert overwrite table insert_nothing select * from src where false; +desc formatted insert_nothing; + +create table insert_nothing1 (key int, value string) + clustered by (key) sorted by (value) into 100 buckets + stored as rcfile; +insert overwrite table insert_nothing1 select * from src where false; +desc formatted insert_nothing1; + +drop table insert_nothing; +drop table insert_nothing1; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1154349) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -34,11 +34,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -69,7 +71,7 @@ public class FileSinkOperator extends TerminalOperator implements Serializable { - protected transient HashMap valToPaths; + protected transient Map valToPaths; protected transient int numDynParts; protected transient List dpColNames; protected transient DynamicPartitionCtx dpCtx; @@ -82,6 +84,7 @@ protected transient List dpWritables; protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters protected transient int maxPartitions; + protected transient int numBuckets; private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error @@ -300,6 +303,14 @@ isCompressed = conf.getCompressed(); parent = Utilities.toTempPath(conf.getDirName()); + String buckets = (String) conf.getTableInfo().getProperties() + .get(Constants.BUCKET_COUNT); + if (buckets != null) { + numBuckets = Integer.valueOf(buckets); + } else { + numBuckets = -1; + } + serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(null, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); @@ -435,55 +446,7 @@ bucketMap.put(bucketNum, filesIdx); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); } - if (isNativeTable) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); - LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); - fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); - LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); - } else { - fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; - } - try { - // The reason to keep these instead of using - // OutputFormat.getRecordWriter() is that - // getRecordWriter does not give us enough control over the file name that - // we create. - if (!bDynParts) { - fsp.finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath( - parent, taskId, jc, hiveOutputFormat, isCompressed, fsp.finalPaths[filesIdx]); - } else { - String extension = null; - if (hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) { - extension = Utilities.getFileExtension(jc, isCompressed); - } - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); - } - - } catch (Exception e) { - e.printStackTrace(); - throw new HiveException(e); - } - LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); - - if (isNativeTable) { - try { - // in recent hadoop versions, use deleteOnExit to clean tmp files. - autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit( - fs, fsp.outPaths[filesIdx]); - } catch (IOException e) { - throw new HiveException(e); - } - } - - Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); - // only create bucket files only if no dynamic partitions, - // buckets of dynamic partitions will be created for each newly created partition - fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( - jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]); - // increment the CREATED_FILES counter - if (reporter != null) { - reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); - } + resolveAndCreateFile(fsp, filesIdx); filesIdx++; } assert filesIdx == numFiles; @@ -501,7 +464,75 @@ filesCreated = true; } + + private void createEmptyBucketFiles() throws HiveException { + totalFiles = numBuckets; + numFiles = numBuckets; + if (!bDynParts) { + fsp = new FSPaths(specPath); + valToPaths.put("", fsp); + } + int filesIdx = 0; + for (int idx = 0; idx < totalFiles; idx++) { + taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), idx); + LOG.error(filesIdx); + resolveAndCreateFile(fsp, filesIdx); + filesIdx++; + } + } + + private void resolveAndCreateFile(FSPaths fsp, int filesIdx) throws HiveException { + if (isNativeTable) { + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); + LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); + fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); + LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); + } else { + fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; + } + try { + // The reason to keep these instead of using + // OutputFormat.getRecordWriter() is that + // getRecordWriter does not give us enough control over the file name that + // we create. + if (!bDynParts) { + fsp.finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath( + parent, taskId, jc, hiveOutputFormat, isCompressed, fsp.finalPaths[filesIdx]); + } else { + String extension = null; + if (hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) { + extension = Utilities.getFileExtension(jc, isCompressed); + } + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); + } + } catch (Exception e) { + e.printStackTrace(); + throw new HiveException(e); + } + LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); + + if (isNativeTable) { + try { + // in recent hadoop versions, use deleteOnExit to clean tmp files. + autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit( + fs, fsp.outPaths[filesIdx]); + } catch (IOException e) { + throw new HiveException(e); + } + } + + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); + // only create bucket files only if no dynamic partitions, + // buckets of dynamic partitions will be created for each newly created partition + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( + jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]); + // increment the CREATED_FILES counter + if (reporter != null) { + reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); + } + } + /** * Report status to JT so that JT won't kill this task if closing takes too long * due to too many files to close and the NN is overloaded. @@ -543,7 +574,8 @@ updateProgress(); // if DP is enabled, get the final output writers and prepare the real output row - assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; + assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : + "input object inspector is not struct"; FSPaths fpaths; @@ -710,7 +742,14 @@ public void closeOp(boolean abort) throws HiveException { if (!bDynParts && !filesCreated) { - createBucketFiles(fsp); + if (numBuckets == -1 || + !HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEENFORCEBUCKETING)) { + // No rows were encountered and either table isn't bucketed + // OR table is bucketed but bucketing is not enforced + return; + } + // Table is bucketed and bucketing enforced + createEmptyBucketFiles(); } lastProgressReport = System.currentTimeMillis();