diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 0e4e7064eb..39963fd015 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -576,6 +576,7 @@ minillaplocal.query.files=\ mapjoin_hint.q,\ mapjoin_emit_interval.q,\ mergejoin_3way.q,\ + mm_bhif.q,\ mm_conversions.q,\ mm_exim.q,\ mm_loaddata.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index e09c6ecac0..58f0480059 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.curator.shaded.com.google.common.collect.Lists; + +import org.apache.hadoop.hive.common.ValidWriteIdList; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -50,8 +52,7 @@ public class BucketizedHiveInputFormat extends HiveInputFormat { - public static final Logger LOG = LoggerFactory - .getLogger("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat"); + public static final Logger LOG = LoggerFactory.getLogger(BucketizedHiveInputFormat.class); @Override public RecordReader getRecordReader(InputSplit split, JobConf job, @@ -123,25 +124,34 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, // for each dir, get all files under the dir, do getSplits to each // individual file, // and then create a BucketizedHiveInputSplit on it + + ArrayList currentDir = null; for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); - // create a new InputFormat instance if this is the first time to see this - // class + // create a new InputFormat instance if this is the first time to see this class Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); newjob.setInputFormat(inputFormat.getClass()); - FileStatus[] listStatus = listStatus(newjob, dir); - - for (FileStatus status : listStatus) { - LOG.info("block size: " + status.getBlockSize()); - LOG.info("file length: " + status.getLen()); - FileInputFormat.setInputPaths(newjob, status.getPath()); - InputSplit[] iss = inputFormat.getSplits(newjob, 0); - if (iss != null && iss.length > 0) { - numOrigSplits += iss.length; - result.add(new BucketizedHiveInputSplit(iss, inputFormatClass - .getName())); + ValidWriteIdList mmIds = null; + if (part.getTableDesc() != null) { + // This can happen for truncate table case for non-MM tables. + mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null); + throw new AssertionError(dir + ": " + part); + } + // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF. + Path[] finalDirs = (mmIds == null) ? new Path[] { dir } + : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds); + if (finalDirs == null) { + continue; // No valid inputs - possible in MM case. + } + + for (Path finalDir : finalDirs) { + FileStatus[] listStatus = listStatus(newjob, finalDir); + + for (FileStatus status : listStatus) { + numOrigSplits = addBHISplit( + status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); } } } @@ -149,4 +159,18 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, + numOrigSplits + " original splits."); return result.toArray(new BucketizedHiveInputSplit[result.size()]); } + + private int addBHISplit(FileStatus status, InputFormat inputFormat, Class inputFormatClass, + int numOrigSplits, JobConf newjob, ArrayList result) throws IOException { + LOG.info("block size: " + status.getBlockSize()); + LOG.info("file length: " + status.getLen()); + FileInputFormat.setInputPaths(newjob, status.getPath()); + InputSplit[] iss = inputFormat.getSplits(newjob, 0); + if (iss != null && iss.length > 0) { + numOrigSplits += iss.length; + result.add(new BucketizedHiveInputSplit(iss, inputFormatClass + .getName())); + } + return numOrigSplits; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index b25bb1de49..655d10b643 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -478,18 +478,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job InputFormat inputFormat, Class inputFormatClass, int splits, TableDesc table, List result) throws IOException { - ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName()); - ValidWriteIdList validMmWriteIdList; - if (AcidUtils.isInsertOnlyTable(table.getProperties())) { - if (validWriteIdList == null) { - throw new IOException("Insert-Only table: " + table.getTableName() - + " is missing from the ValidWriteIdList config: " - + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); - } - validMmWriteIdList = validWriteIdList; - } else { - validMmWriteIdList = null; // for non-MM case - } + ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList( + conf, table.getTableName()); + ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList); try { Utilities.copyTablePropertiesToConf(table, conf); @@ -555,6 +546,20 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job } } + protected ValidWriteIdList getMmValidWriteIds( + JobConf conf, TableDesc table, ValidWriteIdList validWriteIdList) throws IOException { + if (!AcidUtils.isInsertOnlyTable(table.getProperties())) return null; + if (validWriteIdList == null) { + validWriteIdList = AcidUtils.getTableValidWriteIdList( conf, table.getTableName()); + if (validWriteIdList == null) { + throw new IOException("Insert-Only table: " + table.getTableName() + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + } + } + return validWriteIdList; + } + public static Path[] processPathsForMmRead(List dirs, JobConf conf, ValidWriteIdList validWriteIdList) throws IOException { if (validWriteIdList == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index 591c4b8c26..c112978ff7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -236,7 +236,9 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, Path backupPath = backupOutputPath(fs, outputPath, job); Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); - fs.delete(backupPath, true); + if (backupPath != null) { + fs.delete(backupPath, true); + } } } diff --git ql/src/test/queries/clientpositive/mm_bhif.q ql/src/test/queries/clientpositive/mm_bhif.q new file mode 100644 index 0000000000..f9c7f8ab84 --- /dev/null +++ ql/src/test/queries/clientpositive/mm_bhif.q @@ -0,0 +1,27 @@ +SET hive.vectorized.execution.enabled=true; +SET hive.vectorized.execution.reduce.enabled=true; +set hive.mapred.mode=nonstrict; +set hive.exec.reducers.max = 10; +set hive.map.groupby.sorted=true; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +-- SORT_QUERY_RESULTS + +CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1'); + +INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'; + + +set hive.fetch.task.conversion=none; + +select * from T1_mm; + +explain +select count(distinct key) from T1_mm; +select count(distinct key) from T1_mm; + +DROP TABLE T1_mm; diff --git ql/src/test/results/clientpositive/mm_bhif.q.out ql/src/test/results/clientpositive/mm_bhif.q.out new file mode 100644 index 0000000000..4774007660 --- /dev/null +++ ql/src/test/results/clientpositive/mm_bhif.q.out @@ -0,0 +1,146 @@ +PREHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T1_mm +POSTHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T1_mm +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1_mm +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1_mm +POSTHOOK: Output: default@t1_mm@ds=1 +PREHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +PREHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +POSTHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).key SIMPLE [(t1_mm)t1_mm.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).val SIMPLE [(t1_mm)t1_mm.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: select * from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select * from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +1 11 1 +2 12 1 +3 13 1 +7 17 1 +8 18 1 +8 28 1 +PREHOOK: query: explain +select count(distinct key) from T1_mm +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1_mm + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col0) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(distinct key) from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +5 +PREHOOK: query: DROP TABLE T1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1_mm +PREHOOK: Output: default@t1_mm +POSTHOOK: query: DROP TABLE T1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1_mm +POSTHOOK: Output: default@t1_mm