diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 2bf9871..6feceaa 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import org.apache.hadoop.fs.FileUtil; @@ -41,6 +43,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class resides in itests to facilitate running query using Tez engine, since the jars are @@ -260,4 +264,301 @@ private void setupMapJoin(HiveConf conf) { driver.getResults(rs); return rs; } + static List stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List rs = new ArrayList(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator { + @Override + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class); + + private void logResuts(List r, String header, String prefix) { + LOG.info(prefix + " " + header); + StringBuilder sb = new StringBuilder(); + int numLines = 0; + for(String line : r) { + numLines++; + sb.append(prefix).append(line).append("\n"); + } + LOG.info(sb.toString()); + LOG.info(prefix + " Printed " + numLines + " lines"); + } + /** + * https://hortonworks.jira.com/browse/BUG-66580 + */ + @Test + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 3 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); + runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + setupTez(hc); + hc.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + String query = " from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"; + List r = runStatementOnDriver("explain " + query); + logResuts(r, "explain1", ""); + runStatementOnDriver(query); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; +// Assert.assertEquals(stringifyValues(rExpected), r); + + r = runStatementOnDriver("explain insert into srcpart partition(z) select x, 1, 2, x as z from data1 union all select x, 3, 4, x as z from data2"); + logResuts(r, "explain2", ""); + /* + 2017-03-09T13:53:16,148 DEBUG [main] ql.Driver: Shutting down query explain insert into srcpart partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: explain2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez + DagId: ekoifman_20170309135305_d6258cb6-9b75-4b4a-ba67-a666d919d5e0:4 + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Reducer 3 <- Union 2 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 1 (type: int), 2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Map 4 + Map Operator Tree: + TableScan + alias: data2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 3 (type: int), 4 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), VALUE._col2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Union 2 + Vertex: Union 2 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z 1 + replace: false +*/ + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ + + + +2017-03-09T12:06:28,292 DEBUG [main] ql.Driver: Shutting down query explain from data1 insert into srcpart partition(z) select 0,0,x,x insert into srcpart partition(z=1) select 0,0,17 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: explain1 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-3 + Stage-4 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Tez + DagId: ekoifman_20170309120619_f21fdc51-b8ba-4c30-ac1d-b7aa62cec6b1:1 + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int) + outputColumnNames: _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: int) + Select Operator + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), VALUE._col1 (type: int), VALUE._col1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), 17 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + + Stage: Stage-3 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + partition: + z 1 + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-5 + Stats-Aggr Operator + +2017-03-09T12:06:28,295 INFO [main] ql.TestAcidOnTez: Printed 100 lines + +*/ + + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6693134..04dcb3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1201,7 +1201,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ - private static final String COPY_KEYWORD = "_copy_"; // copy keyword + public static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 7c7074d..0edef65 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -116,6 +116,9 @@ public DeltaMetaData() { this(0,0,new ArrayList()); } + /** + * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition + */ DeltaMetaData(long minTxnId, long maxTxnId, List stmtIds) { this.minTxnId = minTxnId; this.maxTxnId = maxTxnId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827..8b92da3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -51,6 +51,10 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + /** + * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive.mvFile()} _copy_N starts with 1 + */ + private int copyNumber; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -180,6 +184,18 @@ public Options bucket(int bucket) { } /** + * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket + * file. + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + * @param copyNumber the number of the copy ( > 0) + * @return this + */ + public Options copyNumber(int copyNumber) { + this.copyNumber = copyNumber; + return this; + } + + /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names * @return this diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3..db0282c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -48,6 +48,8 @@ import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -98,7 +100,13 @@ public boolean accept(Path path) { */ public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); + //this should match 000000_0_copy_1 public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** + * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read + * a "delta" dir written by a real Acid write - cannot have any copies + */ + public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -113,6 +121,11 @@ private AcidUtils() { private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+"); public static final PathFilter hiddenFileFilter = new PathFilter(){ @Override @@ -243,7 +256,21 @@ static long parseBase(Path path) { .maximumTransactionId(0) .bucket(bucket) .writingBase(true); - } else if (filename.startsWith(BUCKET_PREFIX)) { + } + else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + //arguably this should throw since we don't know how to handle this in OrcRawRecordMerger + int bucket = + Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); + result + .setOldStyle(true) + .minimumTransactionId(0) + .maximumTransactionId(0) + .bucket(bucket) + .copyNumber(copyNumber) + .writingBase(true); + } + else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { @@ -269,6 +296,7 @@ static long parseBase(Path path) { .bucket(bucket); } } else { + //why is this useful? what are we going to do with bucketId = -1? result.setOldStyle(true).bucket(-1).minimumTransactionId(0) .maximumTransactionId(0); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 59682db..0e1bed7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1678,7 +1678,7 @@ private long computeProjectionSize(List fileTypes, // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -2014,9 +2014,10 @@ public float getProgress() throws IOException { }; } - static Path findOriginalBucket(FileSystem fs, + private static Path findOriginalBucket(FileSystem fs, Path directory, int bucket) throws IOException { + //todo: are all buckets necessarily 1 level deep? - Sergey says NO for(FileStatus stat: fs.listStatus(directory)) { String name = stat.getPath().getName(); String numberPart = name.substring(0, name.indexOf('_')); @@ -2137,9 +2138,18 @@ private static boolean isStripeSatisfyPredicate( return sarg.evaluate(truthValues).isNeeded(); } + /** + * @param dir - what is this? run debugger + * @param baseFiles + * @param parsedDeltas + * @param readerTypes + * @param ugi + * @param allowSyntheticFileIds + * @return + */ @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, List parsedDeltas, List readerTypes, @@ -2150,7 +2160,7 @@ private static boolean isStripeSatisfyPredicate( // When no baseFiles, we will just generate a single split strategy and return. List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2170,7 +2180,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for non-acid schema original files, if any. if (!originalSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2179,7 +2189,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for acid schema files, if any. if (!acidSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2191,7 +2201,7 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, boolean isOriginal, List parsedDeltas, @@ -2266,6 +2276,9 @@ private static boolean isStripeSatisfyPredicate( } reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); } + //todo: if isOriginal, find N in copyN and pass it in.... + //except that it's just picking 1 file, rather than all copies... WTF + //or perhaps return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, new Reader.Options(), deltaDirectory); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806..23b0002 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -264,6 +264,7 @@ int getColumns() { @Override void next(OrcStruct next) throws IOException { if (recordReader.hasNext()) { + /*this produces a global row number even with PPD*/ long nextRowId = recordReader.getRowNumber(); // have to do initialization here, because the super's constructor // calls next and thus we need to initialize before our constructor diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ed854bf..aeb6019 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3035,14 +3035,14 @@ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, int counter = 1; if (!isRenameAllowed || isBlobStoragePath) { while (destFs.exists(destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } if (isRenameAllowed) { while (!destFs.rename(sourcePath, destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } else if (isSrcLocal) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 05b6fc4..53eba4f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -359,7 +359,7 @@ public CompactorInputSplit() { * @param hadoopConf * @param bucket bucket to be processed by this split * @param files actual files this split should process. It is assumed the caller has already - * parsed out the files in base and deltas to populate this list. + * parsed out the files in base and deltas to populate this list. Includes copy_N * @param base directory of the base, or the partition/table location if the files are in old * style. Can be null. * @param deltas directories of the delta files. diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index b9df674..a59b817 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.output.StringBuilderWriter; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -106,6 +106,7 @@ public void setUp() throws Exception { hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); @@ -898,4 +899,42 @@ public void run() { //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired Assert.assertTrue(System.currentTimeMillis() > start + 2); } + + /** + * This is to demonstrate the issue in HIVE-16177 - ROW__IDs are not unique. + */ + //@Ignore("HIVE-16177") + @Test + public void testNonAcidToAcidConversion0() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,3)"); + //we should now have non-empty bucket files 000001_0 and 000001_0_copy_1 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + + List rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + //this is of course BS since we have duplicate ROW__IDs + Assert.assertTrue(rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0).endsWith("/warehouse/nonacidorctbl/000001_0\t1\t2")); + Assert.assertTrue(rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1).endsWith("/warehouse/nonacidorctbl/000001_0_copy_1\t1\t3")); + + //now let's compact it + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + //more BS - we had 2 rows in the table and now we have just 1 after compaction.... + //Compactor doesn't know about copy_N files either - would have to take all of them together for a given bucket + //and read as a single stream in order... but OrcRawRecordMerge takes a Reader as input which is reading 1 file + //so we'd have to implement a new Reader that keeps track of a collection of inner readers and chains them + //together.... + //Worse yet, is that there is no guarantee in Hive that all data files in a partition are 1st level children + //some may be in subdirs, according to Sergey: + //[5:17 PM] Sergey Shelukhin: 1) list bucketing + //[5:18 PM] Sergey Shelukhin: 2) any time, if the MR recursive-whatever setting is enabled + //[5:18 PM] Sergey Shelukhin: 3) iirc Hive can produce it sometimes from unions but I'm not sure + //try ctas and insert with union in the query on Tez + Assert.assertEquals("wrong RS cardinality",1, rs.size()); + Assert.assertTrue(rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0).endsWith("warehouse/nonacidorctbl/base_-9223372036854775808/bucket_00001\t1\t2")); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6718ae9..c5849d0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1501,25 +1501,127 @@ public void testMerge3() throws Exception { } /** * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception */ - @Ignore @Test - public void testMultiInsert() throws Exception { + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + //supposedly this gives non-unique IDs + List r = runStatementOnDriver(" from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ +*/ + } + @Test + public void testMultiInsert2() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + Listr = runStatementOnDriver("from data1 " + + "insert into srcpart partition(z) select 0,0,x,1 where x % 2 = 0 " + + "insert into srcpart partition(z=1) select 2,0,x where x % 2 = 1"); + r = runStatementOnDriver("select * from srcpart order by a,b,c,z"); + int[][] rExpected = {{0,0,2,1},{2,0,1,1},{2,0,3,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + We get correct results and ROW_IDs look correct - but this is a fluke (I guess the optimizer figures out that DP branch is also actually SP with z=1 (change 1 -> x and you end up with dup ROW_IDs) + (maybe some other reason: either way OrcRecordUpdate.findRowIdOffsetForInsert() ends up doing the right thing) + Once again, there is an empty bucket in static part branch + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1 +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":3}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ +*/ + + } + @Test + public void testMultiInsert3() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + "partitioned by (z int) clustered by (a) into 2 buckets " + "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); runStatementOnDriver("insert into data1 values (1),(2),(3)"); // runStatementOnDriver("insert into data2 values (4),(5),(6)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + //this produces 3 rows (as it should I'd think) + List r = runStatementOnDriver("select 0,0,1,1 from data1 where x % 2 = 0 UNION ALL select 2,0.0,1,1 from data1 where x % 2 = 1"); } /** * Investigating DP and WriteEntity, etc