diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 607abfdecb..ea7ba53a3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -226,6 +226,7 @@ public String toString() { return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; } protected String bucketToString() { + if (bucketId == -1) return ("bucket: " + bucketId); BucketCodec codec = BucketCodec.determineVersion(bucketId); return "bucket: " + bucketId + "(" + codec.getVersion() + "." + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 31338d761e..a789dd218f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2076,9 +2076,11 @@ public float getProgress() throws IOException { String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); ValidWriteIdList validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); - LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString() + if (LOG.isDebugEnabled()) { + LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString() + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)); - + LOG.debug("Creating merger for {} and {}", split.getPath(), Arrays.toString(deltas)); + } final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, validWriteIdList, readOptions, deltas, mergerOptions); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 8c7c72e056..9d954cad18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -249,6 +249,10 @@ public String toString() { } while (nextRecord() != null && (minKey != null && key.compareRow(getMinKey()) <= 0)); } + @Override + public String toString() { + return "[key=" + key + ", nextRecord=" + nextRecord + ", reader=" + reader + "]"; + } @Override public final OrcStruct nextRecord() { return nextRecord; } @@ -281,7 +285,6 @@ public void next(OrcStruct next) throws IOException { OrcRecordUpdater.getRowId(nextRecord()), OrcRecordUpdater.getCurrentTransaction(nextRecord()), OrcRecordUpdater.getOperation(nextRecord()) == OrcRecordUpdater.DELETE_OPERATION); - // if this record is larger than maxKey, we need to stop if (getMaxKey() != null && getKey().compareRow(getMaxKey()) > 0) { LOG.debug("key " + getKey() + " > maxkey " + getMaxKey()); @@ -999,7 +1002,7 @@ public Options clone() { LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range ReaderPair pair = null; - ReaderKey key = new ReaderKey(); + ReaderKey baseKey = new ReaderKey(); if (isOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { @@ -1009,7 +1012,7 @@ public Options clone() { readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); } - pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions, + pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions, conf, validWriteIdList, 0);//0 since base_x doesn't have a suffix (neither does pre acid write) } else { @@ -1024,7 +1027,7 @@ public Options clone() { readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, tfp.syntheticWriteId, tfp.folder); } - pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), + pair = new OriginalReaderPairToRead(baseKey, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(), options, readerPairOptions, conf, validWriteIdList, tfp.statementId); } } else { @@ -1039,7 +1042,7 @@ public Options clone() { //doing major compaction - it's possible where full compliment of bucket files is not //required (on Tez) that base_x/ doesn't have a file for 'bucket' reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf)); - pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions); } else { @@ -1049,7 +1052,7 @@ public Options clone() { } else { assert reader != null : "no reader? " + mergerOptions.getRootPath(); - pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + pair = new ReaderPairAcid(baseKey, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), eventOptions); } } @@ -1058,7 +1061,8 @@ public Options clone() { LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // if there is at least one record, put it in the map if (pair.nextRecord() != null) { - readers.put(key, pair); + ensurePutReader(baseKey, pair); + baseKey = null; } baseReader = pair.getRecordReader(); } @@ -1088,7 +1092,8 @@ public Options clone() { ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options, rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId()); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } continue; } @@ -1101,6 +1106,7 @@ public Options clone() { */ continue; } + LOG.debug("Looking at delta file {}", deltaFile); if(deltaDir.isDeleteDelta()) { //if here it maybe compaction or regular read or Delete event sorter //in the later 2 cases we should do: @@ -1109,7 +1115,8 @@ public Options clone() { ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } continue; } @@ -1123,13 +1130,15 @@ public Options clone() { //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions); if (deltaPair.nextRecord() != null) { - readers.put(key, deltaPair); + ensurePutReader(key, deltaPair); + key = new ReaderKey(); } } } } // get the first record + LOG.debug("Final reader map {}", readers); Map.Entry entry = readers.pollFirstEntry(); if (entry == null) { columns = 0; @@ -1146,6 +1155,14 @@ public Options clone() { } } + private void ensurePutReader(ReaderKey key, ReaderPair deltaPair) throws IOException { + ReaderPair oldPair = readers.put(key, deltaPair); + if (oldPair == null) return; + String error = "Two readers for " + key + ": new " + deltaPair + ", old " + oldPair; + LOG.error(error); + throw new IOException(error); + } + /** * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read @@ -1352,6 +1369,7 @@ public boolean next(RecordIdentifier recordIdentifier, boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier); // if we are collapsing, figure out if this is a new row if (collapse || isSameRow) { + // Note: for collapse == false, this just sets keysSame. keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow); if (!keysSame) { prevKey.set(recordIdentifier);