diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 183515a0ed..a755463af9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1050,8 +1050,15 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { - //this does "Path.uri.compareTo(that.uri)" - return o1.getFileStatus().compareTo(o2.getFileStatus()); + //sort sort by file size (largest first) then by file name + if(o1.getFileStatus().getLen() == o2.getFileStatus().getLen()) { + //this does "Path.uri.compareTo(that.uri)" + return o1.getFileStatus().compareTo(o2.getFileStatus()); + } + else { + long diff = o1.getFileStatus().getLen() - o2.getFileStatus().getLen(); + return diff > 0 ? -1 : +1; + } }); // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 8caa265e8b..b74b0c9202 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -26,7 +26,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -222,8 +224,9 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte rootPath = orcSplit.getRootDir(); //why even compute syntheticProps if !isOriginal??? syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); + this.conf = conf; } - + private final Configuration conf; /** * Used for generating synthetic ROW__IDs for reading "original" files */ @@ -237,6 +240,22 @@ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long synth this.syntheticWriteId = syntheticWriteId; } } + + /** + * so this can be done when converting non-acid to acid to determine what to seed NEXT_WRITE_ID with + * @return + * @throws IOException + */ + private int countFiles(Path tablePath) throws IOException { + int count = 0; + FileSystem fs = FileSystem.get(conf); + RemoteIterator ri = fs.listFiles(tablePath, true); + while (ri.hasNext()){ + count++; + ri.next(); + } + return count; + } /** * See {@link #next(NullWritable, VectorizedRowBatch)} first and * {@link OrcRawRecordMerger.OriginalReaderPair}. @@ -265,6 +284,90 @@ private OffsetAndBucketProperty computeOffsetAndBucket( } return null; } + /** + * so for original data in a delta/base we have to take statementId from dir suffix + * (Load Data may generate > 1 delta with the same txnid) and for copy_N + * to look back at footers. We really only need to look at footers for deltas with the same + * txn id but with smaller suffix. Also, Load Data into Acid table in 3.0 renames input files + * using copy_N suffix so even a single delta may have copy_N files. + * + * Now, assuming on conversion we seed NEXT_WRITE_ID to 10M (or just count the files in the table during Alter Table) + * For any data in txnid:0, list all originals and assign writeid by position + * + * also, note we can end up with any file name. flat table does load data, which just copies files + * then it gets converted to Acid. I don't know what the compactor will do with such file. + * It will probably fail. + * + * We can't mess with filenames here since there may be many concurrent readers + * + * So for unbucketed tables, sort all true originals by (size,name) [to handle empty files]. + * (BTW, this changes sort order, so requires Major compaction to upgrade to 3.0 if there are any update/deletes) + * + * Each file gets the writeid equal to it's position in the sort order (< 10M or whatever) + * we take bucketid from filename if it's a standard name + * we use copy_N as statementId + * if file name is non-standard, hash it to get bucketid. (need good hash for this) + * + * Maybe while counting number of files, we should estimate total table size... will take more time. + * + * Alternatively, maybe we can just hash(filename) for all original files to get bucket and not bother + * with stmt id. + * + * Or don't hash it, just take it's writeId % N (for some N). We an figure out N per partition. + * Since we sort by size, we can also know total partiton size, say X GB. + * so Do lg X to guess number of buckets. And then allocate to each bucket some number of files so + * that total bucket size is fixed. In the sorted array of files, go left to right, placing each + * file in the 'next' bucket that has 'room'. Keep looping around the bucket list. + * + * Compactor has to do the same. + * + * This is exactly the same algorithm as for renaming files on upgrade which we no longer have to do if this is done. + * + * We could fix Load Data to rename all files all the time. Then if we rename files during upgrade, + * everything can be file based. But you can always imagine someone creates flat table post upgrade + * or opts out of upgrade for some table and then Alter Tables it to Acid. So we can't guarantee + * how files are named. What if someone imports an archive to a flat table and then converts to acid + * + * If the table is bucketd, we must take the bucket id from file name and we can take copy_N as statement id. + * Because ...? we don't have a choice? + * bucketed tables don't support union All optimizations and load data is disabled by default in 2.x + * and enforces file names in 3.0 + * + * + * In order to assign ROW_IDs properly we cannot rely on renaming files during upgrade to 3.0. + * This is so becase post upgrade user may create a flat table and use Load Data statement + * or Import an archive. They may also do Insert + Union All which creates files with the + * same name in multiple subdirectories. Then Alter Table to make this acid, has to handle + * all these situations. + * For Load Data into acid table, we may have > 1 delta in 1 txn and with copy_N files. This + * can only happen with multi-stmt txns. I don't see any way to handle this in current impl, + * except to look back at file footers. I think we can get rid of statement id and use writeID + * instead, then Load Data is not special any more, i.e. each delta will have unique ROW_IDs. + * That also means we don't need BucketCodec. + * I don't have time to do this in 3.0. + * Keep things as is for 3.0 with slow footer look ups. In 3.1, add acidVersion as a property to + * the table. Old tables will continue to do current thing. New ones can ... + * If Load Data produces copy_N files, we still need stmt id. + * + * we don't have multi-stmt txn in 3.0. So we could make LD case use copyN as stmt id. + * but we still can't do that for Union All insert. + * + * ========================= + * any time, we Alter Tablt to acid, allocate 10M write Ids and each file gets a write id. + * Then we can assign files to buckets/statements as we wish. + * + * For LD, use writeid+ copyN=stmtId. This only works for 1stmt txns. Once we get to multi-stmt + * make each stmt use it's own write id - then this works as well. + * + * For non-standard file names, need to fix Compactor - actually many need to fix many other + * places - any time we parse bucket file, we'll get bucketId = -1 - which will break + * something. + * + * The issue of making sure all files don't fall into 1 bucket is separate. Can start with + * what it does now, get bucketId from file name. Improve (and handle non-standard file names) + * if there is time. + * + */ long rowIdOffset = 0; OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo = OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(), split.getRootDir(), conf); @@ -291,6 +394,40 @@ private OffsetAndBucketProperty computeOffsetAndBucket( return new OffsetAndBucketProperty(rowIdOffset, bucketProperty, syntheticTxnInfo.syntheticWriteId); } + /*represent this as long[] where each cell is a bucket that stores current size + * run over the file list in order, for each item take the size and += in current bucket + * if the bucket still has room. If we know the size of the partition X and + * num buckets N, then each bucket is ceil(X/N).*/ + private long[] getBuckets(long tableSizeInGB) { + int numBuckets = 10;//(int) Math.log10(tableSizeInGB);//todo: is base 2 better? + return new long[numBuckets]; + } + //todo: should go to AcidUtils + private int[] distributeToBuckets(AcidUtils.Directory directoryState) throws IOException { + long bucketSize = 100;//todo compute it + long[] buckets = getBuckets(1000); + int[] bucketMap = new int[directoryState.getOriginalFiles().size()]; + int fileIdx = 0; + int bucketIdx = 0; + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + //we just need bucket id here - make lighter weight API fo that - don't reader footer + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + while(true) { + //todo: some safety check that this doesn't go to infinite loop - shouldn't but.. + if (f.getFileStatus().getLen() + buckets[bucketIdx] <= bucketSize) { + buckets[bucketIdx % buckets.length] += f.getFileStatus().getLen(); + bucketMap[fileIdx] = bucketIdx % buckets.length;//record which bucket this file is in + fileIdx++; +// bucketIdx++;//todo: don't advance the bucket + break;//finish this loop, go to next file/bucket + } + //if here, file doesn't fit + bucketIdx++; + } + } + return null; + } /** * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables. * In some cases this cannot be used from LLAP IO elevator because diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 11c59309fb..ce8a3dadc1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -385,6 +385,8 @@ public void testMMOrcTable() throws Exception { /** * Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir * For example, delta_x_x_0001. + * + * todo: add test with 2 load data in 1 txn */ private void testMultiStatement(boolean isVectorized) throws Exception { runStatementOnDriver("drop table if exists T");