diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4d46d65227..6647c80711 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -750,15 +750,17 @@ public void process(Object row, int tag) throws HiveException { LOG.info(toString() + ": records written - " + numRows); } - // This should always be 0 for the final result file - int writerOffset = findWriterOffset(row); // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { + // This should always be 0 for the final result file + int writerOffset = findWriterOffset(row); rowOutWriters[writerOffset].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { + // This should always be 0 for the final result file + int writerOffset = findWriterOffset(row); fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row); } else { // TODO I suspect we could skip much of the stuff above this in the function in the case @@ -772,7 +774,21 @@ public void process(Object row, int tag) throws HiveException { bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); - if (fpaths.acidLastBucket != bucketNum) { + /** + * If this is writing multiple files, then it must be a bucketed table and bucketMap must be + * populated. + */ + int updaterOffset = !multiFileSpray ? 0 : bucketMap.get(bucketNum); + if(fpaths.updaters[updaterOffset] == null) { + fpaths.updaters[updaterOffset] =HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[updaterOffset], + rowInspector, reporter, 0); + if (LOG.isDebugEnabled()) { + LOG.debug("Created updater for bucket number " + bucketNum + " using file " + + fpaths.outPaths[updaterOffset]); + } + } + if (false && fpaths.acidLastBucket != bucketNum) { fpaths.acidLastBucket = bucketNum; // Switch files fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 : ++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( @@ -785,9 +801,11 @@ public void process(Object row, int tag) throws HiveException { } if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row); + fpaths.updaters[updaterOffset].update(conf.getTransactionId(), row); + //fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].update(conf.getTransactionId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + //fpaths.updaters[conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED) ? 0 :fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + fpaths.updaters[updaterOffset].delete(conf.getTransactionId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } @@ -817,6 +835,10 @@ private int findWriterOffset(Object row) throws HiveException { if (!multiFileSpray) { return 0; } else { + //todo: this doesn't do the right thing Acid Update/Delete because partitionEval is not set up + //correctly. It's set up to "bucket" on 0th column (i.e. ROW__ID) but this column is not + //wrapped with UDFToInteger() so the hash computation treats is like any struct rather than + //how UDFToInteger(ROW__ID) does it. Object[] bucketFieldValues = new Object[partitionEval.length]; for(int i = 0; i < partitionEval.length; i++) { bucketFieldValues[i] = partitionEval[i].evaluate(row); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c531aeb8d2..efe558f1f5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -888,4 +888,84 @@ public void testNonAcidToAcidConversion01() throws Exception { //make sure they are the same before and after compaction } + @Test + public void testMoreBucketsThanReducers() throws Exception { + //see bucket_num_reducers.q bucket_num_reducers2.q + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 1); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 1); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)");//txn X write to bucket1 + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)");// txn X + 1 write to bucket0 + bucket1 + + /*so now FileSinkOperator for this update should have totalFiles=2, numFiles=2 and multiFileSpray=2 + FileSinkOperator.process() has "if (fpaths.acidLastBucket != bucketNum) {" - this assumes that + rows seen by process are grouped by bucketNum when numBuckets > numReducers. There is nothing + that guarantees this. This demonstrates it - ReduceSinkOperator sorts by ROW_ID, thus the + 1 FileSinkOperator here in process() + should get (1,1),(0,0),(3,3) i.e. row from b1,b0,b1 and get ArrayIndexOutOfBoundsException + 2017-07-18T14:48:58,771 ERROR [pool-23-thread-1] ExecReducer: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":{"transactionid":12,"bucketid":536936448,"rowid":0}},"value":{"_col0":3}} + at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:243) + at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444) + at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) + at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:346) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) + at java.util.concurrent.FutureTask.run(FutureTask.java:266) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 + at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:779) + at org.apache.hadoop.hive.ql.exec.Operator.baseForward(Operator.java:952) + at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:900) + at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:891) + at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95) + at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:234) + ... 8 more + */ +// CommandProcessorResponse cpr = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set b = -1"); +// Assert.assertEquals("", 2, cpr.getResponseCode()); + /*this error is not the only possible error: we could just corrupt the data: + * say we have a single FS that should write 4 buckets and we see rows in this order: b1,b0,b3,b1 + * The 2nd row for b1 will cause "++fpaths.acidFileOffset" and a 2nd writer for b1 will be created + * in fpaths.updaters[3] (but same file name as updaters[0] - I don't know what will happen when + * file names collide - maybe we get bucket0 and bucket0_copy1 - maybe it will be clobbered*/ + runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1"); + List r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b"); + int[][] expected = {{0, -1}, {1, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } + @Test + public void testMoreBucketsThanReducers2() throws Exception { + //see bucket_num_reducers.q bucket_num_reducers2.q + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); + //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others + hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2); + d = new Driver(hc); + d.setMaxRows(10000); + runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + //below value for a is bucket id, for b - txn id (logically) + runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)");//txn X write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)");// txn X + 1 write to b2 + b3 + runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)");//txn X + 2 write to b0 + b1 + runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)");//txn X + 3 write to b2 + b3 + //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component + //FS2 should see (1,1),(3,2),(1,3),(3,4) + + /*this error is not the only possible error: we could just corrupt the data: + * say we have a single FS that should write 4 buckets and we see rows in this order: b1,b0,b3,b1 + * The 2nd row for b1 will cause "++fpaths.acidFileOffset" and a 2nd writer for b1 will be created + * in fpaths.updaters[3] (but same file name as updaters[0] - I don't know what will happen when + * file names collide - maybe we get bucket0 and bucket0_copy1 - maybe it will be clobbered*/ + runStatementOnDriver("update fourbuckets set b = -1"); + List r = runStatementOnDriver("select * from fourbuckets order by a, b"); + int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; + Assert.assertEquals(stringifyValues(expected), r); + } }