diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 362a79679b..77d12ffca8 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -12,6 +12,8 @@ minimr.query.files=infer_bucket_sort_map_operators.q,\ index_bitmap_auto.q,\ scriptfile1.q,\ bucket_num_reducers2.q,\ + bucket_num_reducers_acid.q,\ + bucket_num_reducers_acid2.q,\ scriptfile1_win.q # These tests are disabled for minimr diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4d46d65227..23069abc87 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -41,6 +42,9 @@ import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.QB; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; @@ -48,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -148,8 +153,6 @@ RecordWriter[] outWriters; RecordUpdater[] updaters; Stat stat; - int acidLastBucket = -1; - int acidFileOffset = -1; public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); @@ -282,8 +285,15 @@ public Stat getStat() { * each reducer can write 10 files - this way we effectively get 1000 files. */ private transient ExprNodeEvaluator[] partitionEval; + /** + * When multiFileSpray = true + * total number of files the query needs to produce. In practice this equals the number of buckets. + */ protected transient int totalFiles; - private transient int numFiles; + /** + * When multiFileSpray = true + * number of files this particular FS needs to produce + */ private transient int numFiles; protected transient boolean multiFileSpray; protected transient final Map bucketMap = new HashMap(); @@ -303,6 +313,7 @@ public Stat getStat() { String taskId; protected boolean filesCreated = false; + private boolean isBucketed = false; private void initializeSpecPath() { // For a query of the type: @@ -342,6 +353,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); try { this.hconf = hconf; + isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; filesCreated = false; isNativeTable = !conf.getTableInfo().isNonNative(); isTemporary = conf.isTemporary(); @@ -749,17 +761,15 @@ public void process(Object row, int tag) throws HiveException { } LOG.info(toString() + ": records written - " + numRows); } - - // This should always be 0 for the final result file - int writerOffset = findWriterOffset(row); + WriterInfo wi = findWriterOffset(row); // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { - rowOutWriters[writerOffset].write(recordValue); + rowOutWriters[wi.writerOffset].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { - fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row); + fpaths.updaters[wi.writerOffset].insert(conf.getTransactionId(), row); } else { // TODO I suspect we could skip much of the stuff above this in the function in the case // of update and delete. But I don't understand all of the side effects of the above @@ -767,27 +777,40 @@ public void process(Object row, int tag) throws HiveException { // Find the bucket id, and switch buckets if need to ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector; - Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); - int bucketProperty = + boolean isAssertEnabled = false; + assert isAssertEnabled = true; + if(isBucketed && isAssertEnabled) { + /* if the table is not bucketed then Delete events are randomly distributed to writers + so any of them can see any bucket value. This whole block is to preform the sanity + check. */ + Object recId = ((StructObjectInspector) rowInspector).getStructFieldData(row, recIdField); + int bucketProperty = bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); - int bucketNum = - BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); - if (fpaths.acidLastBucket != bucketNum) { - fpaths.acidLastBucket = bucketNum; - // Switch files - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset], - rowInspector, reporter, 0); + int bucketNumData = + BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + if (wi.bucketId != bucketNumData) { + //If here, the row ended up being shuffled to the wrong writer (Reducer) + throw new IllegalStateException("row.bucketId=" + bucketNumData + + ". task.bucketId=" + wi.bucketId + ". multiFileSpray=" + multiFileSpray); + } + } + if(fpaths.updaters[wi.writerOffset] == null) { + /*For un-bucketed tables wi.bucketId is just a writer ID which ensures that each writer + * creates a different bucketN file. For bucketed tables data is sorted by ROW__ID which + * has originalTxnId as the most significant component so for multiFileSpray this can see + * rows for the same bucket that are not contiguous.*/ + fpaths.updaters[wi.writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), wi.bucketId, conf, fpaths.outPaths[wi.writerOffset], + rowInspector, reporter, 0); if (LOG.isDebugEnabled()) { - LOG.debug("Created updater for bucket number " + bucketNum + " using file " + - fpaths.outPaths[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset]); + LOG.debug("Created updater for bucket number " + wi.writerOffset + " using file " + + fpaths.outPaths[wi.writerOffset]); } } - if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row); + fpaths.updaters[wi.writerOffset].update(conf.getTransactionId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + fpaths.updaters[wi.writerOffset].delete(conf.getTransactionId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } @@ -812,11 +835,26 @@ protected boolean areAllTrue(boolean[] statsFromRW) { } return true; } - - private int findWriterOffset(Object row) throws HiveException { + private WriterInfo singleFileWriterInfo = null; + private WriterInfo findWriterOffset(Object row) throws HiveException { if (!multiFileSpray) { - return 0; + //ok, so partitionEval etc is not set up in this case + if(singleFileWriterInfo == null) { + singleFileWriterInfo = + new WriterInfo(0, Integer.parseInt(Utilities.getTaskIdFromFilename(taskId))); + } + return singleFileWriterInfo; } else { + assert isBucketed; + /** if here, table is bucketed. + * When a single Reducer is writing multiple files, we compute the bucketId here to route the + * row to appropriate writer/updater. For Insert operations this based on the columns of + * CLUSTERED BY (...) clause. For update/delete we expect a single partitionEval: + * UDFToInteger(ROW__ID) which is always the 0th column when processing update/delete + * statement. + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#genPartnCols(String, Operator, QB, + * TableDesc, Table, SemanticAnalyzer.SortBucketRSCtx)} + * */ Object[] bucketFieldValues = new Object[partitionEval.length]; for(int i = 0; i < partitionEval.length; i++) { bucketFieldValues[i] = partitionEval[i].evaluate(row); @@ -824,9 +862,8 @@ private int findWriterOffset(Object row) throws HiveException { int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors); key.setHashCode(keyHashCode); int bucketNum = prtner.getBucket(key, null, totalFiles); - return bucketMap.get(bucketNum); + return new WriterInfo(bucketMap.get(bucketNum), bucketNum); } - } /** @@ -1312,4 +1349,15 @@ private Configuration unsetNestedColumnPaths(Configuration conf) { } return conf; } + private static final class WriterInfo { + private final int writerOffset; + /** + * For un-bucketed tables this is just a writer ID + */ + private final int bucketId; + private WriterInfo(int writerOffset, int bucketId) { + this.writerOffset = writerOffset; + this.bucketId = bucketId; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index d40b89ae14..c30e8fe75a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -479,6 +479,7 @@ public void close(boolean abort) throws IOException { } } else { if (writer == null) { + //so that we create empty bucket files when needed (but see HIVE-17138) writer = OrcFile.createWriter(path, writerOptions); } writer.close(); // normal close. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4faec05bb0..9862ababf9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6734,7 +6734,7 @@ private void genPartnCols(String dest, Operator input, QB qb, if ((dest_tab.getNumBuckets() > 0)) { enforceBucketing = true; if (updating(dest) || deleting(dest)) { - partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false); + partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, !false); } else { partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, false); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c531aeb8d2..3aff2ce24c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -888,4 +888,50 @@ public void testNonAcidToAcidConversion01() throws Exception { //make sure they are the same before and after compaction } + @Ignore("see bucket_num_reducers_acid.q") + @Test + public void testMoreBucketsThanReducers() throws Exception { + //see bucket_num_reducers.q bucket_num_reducers2.q + // todo: try using set VerifyNumReducersHook.num.reducers=10; + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 1); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 1); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)");//txn X write to bucket1 + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)");// txn X + 1 write to bucket0 + bucket1 + runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1"); + List r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b"); + int[][] expected = {{0, -1}, {1, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } + @Ignore("see bucket_num_reducers_acid2.q") + @Test + public void testMoreBucketsThanReducers2() throws Exception { + //todo: try using set VerifyNumReducersHook.num.reducers=10; + //see bucket_num_reducers.q bucket_num_reducers2.q + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + //below value for a is bucket id, for b - txn id (logically) + runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)");//txn X write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)");// txn X + 1 write to b2 + b3 + runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)");//txn X + 2 write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)");//txn X + 3 write to b2 + b3 + //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component + //FS2 should see (1,1),(3,2),(1,3),(3,4) + + runStatementOnDriver("update fourbuckets set b = -1"); + List r = runStatementOnDriver("select * from fourbuckets order by a, b"); + int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } } diff --git ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q new file mode 100644 index 0000000000..a44668eb49 --- /dev/null +++ ql/src/test/queries/clientpositive/bucket_num_reducers_acid.q @@ -0,0 +1,30 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.mode.local.auto=false; + +set mapred.reduce.tasks = 1; + +-- This test sets number of mapred tasks to 1 for a table with 2 buckets, +-- and uses a post-hook to confirm that 1 tasks were created + +drop table if exists bucket_nr_acid; +create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; +set VerifyNumReducersHook.num.reducers=1; + +-- txn X write to b1 +insert into bucket_nr_acid values(1,1); +-- txn X + 1 write to bucket0 + b1 +insert into bucket_nr_acid values(0,0),(3,3); + +update bucket_nr_acid set b = -1; +set hive.exec.post.hooks=; +select * from bucket_nr_acid order by a, b; + +drop table bucket_nr_acid; + + + + + + diff --git ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q new file mode 100644 index 0000000000..2e6aa6116a --- /dev/null +++ ql/src/test/queries/clientpositive/bucket_num_reducers_acid2.q @@ -0,0 +1,33 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.mode.local.auto=false; + +set mapred.reduce.tasks = 2; + +-- This test sets number of mapred tasks to 2 for a table with 4 buckets, +-- and uses a post-hook to confirm that 1 tasks were created + +drop table if exists bucket_nr_acid2; +create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyNumReducersHook; +set VerifyNumReducersHook.num.reducers=2; + +-- txn X write to b0 + b1 +insert into bucket_nr_acid2 values(0,1),(1,1); +-- txn X + 1 write to b2 + b3 +insert into bucket_nr_acid2 values(2,2),(3,2); +-- txn X + 2 write to b0 + b1 +insert into bucket_nr_acid2 values(0,3),(1,3); +-- txn X + 3 write to b2 + b3 +insert into bucket_nr_acid2 values(2,4),(3,4); + +-- so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by +-- ROW__ID where tnxid is the first component FS2 should see (1,1),(3,2),(1,3),(3,4) + + +update bucket_nr_acid2 set b = -1; +set hive.exec.post.hooks=; +select * from bucket_nr_acid2 order by a, b; + +drop table bucket_nr_acid2; + diff --git ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out new file mode 100644 index 0000000000..8ea23d7ce8 --- /dev/null +++ ql/src/test/results/clientpositive/bucket_num_reducers_acid.q.out @@ -0,0 +1,33 @@ +PREHOOK: query: drop table if exists bucket_nr_acid +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists bucket_nr_acid +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_nr_acid +POSTHOOK: query: create table bucket_nr_acid (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_nr_acid +PREHOOK: query: insert into bucket_nr_acid values(1,1) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: insert into bucket_nr_acid values(0,0),(3,3) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: update bucket_nr_acid set b = -1 +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid +PREHOOK: Output: default@bucket_nr_acid +PREHOOK: query: select * from bucket_nr_acid order by a, b +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid +#### A masked pattern was here #### +0 -1 +1 -1 +3 -1 +PREHOOK: query: drop table bucket_nr_acid +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_nr_acid +PREHOOK: Output: default@bucket_nr_acid diff --git ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out new file mode 100644 index 0000000000..cabb4f7dda --- /dev/null +++ ql/src/test/results/clientpositive/bucket_num_reducers_acid2.q.out @@ -0,0 +1,44 @@ +PREHOOK: query: drop table if exists bucket_nr_acid2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists bucket_nr_acid2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_nr_acid2 +POSTHOOK: query: create table bucket_nr_acid2 (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(0,1),(1,1) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(2,2),(3,2) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(0,3),(1,3) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: insert into bucket_nr_acid2 values(2,4),(3,4) +PREHOOK: type: QUERY +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: update bucket_nr_acid2 set b = -1 +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid2 +PREHOOK: Output: default@bucket_nr_acid2 +PREHOOK: query: select * from bucket_nr_acid2 order by a, b +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_nr_acid2 +#### A masked pattern was here #### +0 -1 +0 -1 +1 -1 +1 -1 +2 -1 +2 -1 +3 -1 +3 -1 +PREHOOK: query: drop table bucket_nr_acid2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_nr_acid2 +PREHOOK: Output: default@bucket_nr_acid2