diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 2bf9871956..a2cedc6b88 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import org.apache.hadoop.fs.FileUtil; @@ -33,14 +35,14 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; -import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class resides in itests to facilitate running query using Tez engine, since the jars are @@ -260,4 +262,323 @@ private void setupMapJoin(HiveConf conf) { driver.getResults(rs); return rs; } + static List stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List rs = new ArrayList(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator { + @Override + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class); + + private void logResuts(List r, String header, String prefix) { + LOG.info(prefix + " " + header); + StringBuilder sb = new StringBuilder(); + int numLines = 0; + for(String line : r) { + numLines++; + sb.append(prefix).append(line).append("\n"); + } + LOG.info(sb.toString()); + LOG.info(prefix + " Printed " + numLines + " lines"); + } + @Test + public void testUnion() throws Exception { + runStatementOnDriver("drop table if exists srcpart2"); + runStatementOnDriver("create table if not exists srcpart2 (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 3 buckets " + + "stored as orc"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); + runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + setupTez(hc); + hc.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + List r = runStatementOnDriver("explain insert into srcpart2 partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + logResuts(r, "explain3", ""); +// r = runStatementOnDriver("insert into srcpart2 partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + r = runStatementOnDriver("create table srcpart3 stored as orc as select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + //see https://issues.apache.org/jira/browse/HIVE-15899 for dir structure that this produces + } + /** + * https://hortonworks.jira.com/browse/BUG-66580 + */ + @Test + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 3 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); + runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + setupTez(hc); + hc.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + String query = " from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"; + List r = runStatementOnDriver("explain " + query); + logResuts(r, "explain1", ""); + runStatementOnDriver(query); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; +// Assert.assertEquals(stringifyValues(rExpected), r); + + r = runStatementOnDriver("explain insert into srcpart partition(z) select x, 1, 2, x as z from data1 union all select x, 3, 4, x as z from data2"); + logResuts(r, "explain2", ""); + /* + 2017-03-09T13:53:16,148 DEBUG [main] ql.Driver: Shutting down query explain insert into srcpart partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: explain2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez + DagId: ekoifman_20170309135305_d6258cb6-9b75-4b4a-ba67-a666d919d5e0:4 + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Reducer 3 <- Union 2 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 1 (type: int), 2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Map 4 + Map Operator Tree: + TableScan + alias: data2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 3 (type: int), 4 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), VALUE._col2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Union 2 + Vertex: Union 2 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z 1 + replace: false +*/ + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ + + + +2017-03-09T12:06:28,292 DEBUG [main] ql.Driver: Shutting down query explain from data1 insert into srcpart partition(z) select 0,0,x,x insert into srcpart partition(z=1) select 0,0,17 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: explain1 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-3 + Stage-4 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Tez + DagId: ekoifman_20170309120619_f21fdc51-b8ba-4c30-ac1d-b7aa62cec6b1:1 + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int) + outputColumnNames: _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: int) + Select Operator + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), VALUE._col1 (type: int), VALUE._col1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), 17 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + + Stage: Stage-3 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + partition: + z 1 + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-5 + Stats-Aggr Operator + +2017-03-09T12:06:28,295 INFO [main] ql.TestAcidOnTez: Printed 100 lines + +*/ + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ebf13442f4..a53da97b13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1205,7 +1205,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ - private static final String COPY_KEYWORD = "_copy_"; // copy keyword + public static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 7c7074d156..0edef65828 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -116,6 +116,9 @@ public DeltaMetaData() { this(0,0,new ArrayList()); } + /** + * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition + */ DeltaMetaData(long minTxnId, long maxTxnId, List stmtIds) { this.minTxnId = minTxnId; this.maxTxnId = maxTxnId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827424..405cfde503 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; @@ -51,6 +52,11 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + /** + * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)} + * _copy_N starts with 1. + */ + private int copyNumber = 0; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -180,6 +186,18 @@ public Options bucket(int bucket) { } /** + * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket + * file. + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + * @param copyNumber the number of the copy ( > 0) + * @return this + */ + public Options copyNumber(int copyNumber) { + this.copyNumber = copyNumber; + return this; + } + + /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names * @return this @@ -293,6 +311,9 @@ boolean getOldStyle() { public int getStatementId() { return statementId; } + public int getCopyNumber() { + return copyNumber; + } public Path getFinalDestination() { return finalDestination; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3363..3d3efe65cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -48,6 +48,8 @@ import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -99,6 +101,10 @@ public boolean accept(Path path) { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** + * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read + * a "delta" dir written by a real Acid write - cannot have any copies + */ public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -113,6 +119,11 @@ private AcidUtils() { private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+"); public static final PathFilter hiddenFileFilter = new PathFilter(){ @Override @@ -243,7 +254,22 @@ static long parseBase(Path path) { .maximumTransactionId(0) .bucket(bucket) .writingBase(true); - } else if (filename.startsWith(BUCKET_PREFIX)) { + } + else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + //todo: define groups in regex and use parseInt(Matcher.group(2)).... + //arguably this should throw since we don't know how to handle this in OrcRawRecordMerger + int bucket = + Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); + result + .setOldStyle(true) + .minimumTransactionId(0) + .maximumTransactionId(0) + .bucket(bucket) + .copyNumber(copyNumber) + .writingBase(true); + } + else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { @@ -269,6 +295,7 @@ static long parseBase(Path path) { .bucket(bucket); } } else { + //why is this useful? what are we going to do with bucketId = -1? result.setOldStyle(true).bucket(-1).minimumTransactionId(0) .maximumTransactionId(0); } @@ -482,7 +509,7 @@ public String toString() { Path getBaseDirectory(); /** - * Get the list of original files. Not {@code null}. + * Get the list of original files. Not {@code null}. Must be sorted. * @return the list of original files (eg. 000000_0) */ List getOriginalFiles(); @@ -825,7 +852,7 @@ public static Directory getAcidState(Path directory, // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds); + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles); } } @@ -893,7 +920,10 @@ else if (prev != null && next.maxTransaction == prev.maxTransaction final Path base = bestBase.status == null ? null : bestBase.status.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); - + /** + * See {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger#OriginalReaderPair} + */ + Collections.sort(original); return new Directory(){ @Override @@ -1001,6 +1031,10 @@ public FileStatus getFileStatus() { public Long getFileId() { return null; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } } /** @@ -1011,7 +1045,7 @@ public Long getFileId() { * @throws IOException */ private static void findOriginals(FileSystem fs, FileStatus stat, - List original, Ref useFileIds) throws IOException { + List original, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; @@ -1031,18 +1065,22 @@ private static void findOriginals(FileSystem fs, FileStatus stat, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDir()) { - findOriginals(fs, child.getFileStatus(), original, useFileIds); + findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles); } else { - original.add(child); + if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { + original.add(child); + } } } } else { List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { - findOriginals(fs, child, original, useFileIds); + findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles); } else { - original.add(createOriginalObj(null, child)); + if(!ignoreEmptyFiles || child.getLen() > 0) { + original.add(createOriginalObj(null, child)); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java index 9299306ff1..aafb72c19e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java @@ -220,6 +220,10 @@ public FileStatus getFileStatus() { public Long getFileId() { return fileId; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } }); return fileId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8fb7211bac..b3c38a4523 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1678,7 +1678,7 @@ private long computeProjectionSize(List fileTypes, // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -1928,7 +1928,7 @@ public float getProgress() throws IOException { } else { root = path.getParent().getParent(); } - } else { + } else {//todo: so here path is a delta/ but above it's a partition/ root = path; } @@ -1948,7 +1948,23 @@ public float getProgress() throws IOException { final Configuration conf = options.getConfiguration(); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); - final int bucket = OrcInputFormat.getBucketForSplit(conf, split); + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); + mergerOptions.rootPath(root); + final int bucket; + if (split.hasBase()) { + AcidOutputFormat.Options acidIOOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf); + if(acidIOOptions.getBucket() < 0) { + LOG.warn("Can't determine bucket ID for " + split.getPath() + "; ignoring"); + } + bucket = acidIOOptions.getBucket(); + if(split.isOriginal()) { + mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath()); + } + } else { + bucket = (int) split.getStart(); + } + final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); @@ -1957,7 +1973,7 @@ public float getProgress() throws IOException { new ValidReadTxnList(txnString); final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, - validTxnList, readOptions, deltas); + validTxnList, readOptions, deltas, mergerOptions); return new RowReader() { OrcStruct innerRecord = records.createValue(); @@ -2010,19 +2026,21 @@ public float getProgress() throws IOException { }; } - static Path findOriginalBucket(FileSystem fs, + private static Path findOriginalBucket(FileSystem fs, Path directory, int bucket) throws IOException { for(FileStatus stat: fs.listStatus(directory)) { - String name = stat.getPath().getName(); + if(stat.getLen() <= 0) { + continue; + } + String name = stat.getPath().getName();//todo: use AcidUtils.parseBaseOrDeltaBucketFilename() String numberPart = name.substring(0, name.indexOf('_')); if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) && Integer.parseInt(numberPart) == bucket) { return stat.getPath(); } } - throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + - directory); + throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory); } static Reader.Options createOptionsForReader(Configuration conf) { @@ -2055,14 +2073,6 @@ static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) thr return reader; } - static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { - if (orcSplit.hasBase()) { - return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); - } else { - return (int) orcSplit.getStart(); - } - } - public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg, OrcFile.WriterVersion writerVersion, List types, List stripeStats, int stripeCount) { @@ -2135,7 +2145,7 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, List parsedDeltas, List readerTypes, @@ -2146,7 +2156,7 @@ private static boolean isStripeSatisfyPredicate( // When no baseFiles, we will just generate a single split strategy and return. List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2166,7 +2176,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for non-acid schema original files, if any. if (!originalSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2175,7 +2185,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for acid schema files, if any. if (!acidSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2187,7 +2197,7 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, boolean isOriginal, List parsedDeltas, @@ -2262,8 +2272,12 @@ private static boolean isStripeSatisfyPredicate( } reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); } + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options() + .isCompacting(true) + .rootPath(baseDirectory); + //todo: should this bother to create a 'reader'? Alternatively, should it make sure, it's the 1st one? return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal, - bucket, validTxnList, new Reader.Options(), deltaDirectory); + bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95b8806e70..df47d1674f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -156,7 +160,7 @@ private boolean isSameRow(ReaderKey other) { return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - public long getCurrentTransactionId() { + long getCurrentTransactionId() { return currentTransactionId; } @@ -165,7 +169,7 @@ public long getCurrentTransactionId() { * @param other the value to compare to * @return -1, 0, +1 */ - public int compareRow(RecordIdentifier other) { + int compareRow(RecordIdentifier other) { return compareToInternal(other); } @@ -188,7 +192,8 @@ public String toString() { final Reader reader; final RecordReader recordReader; final ReaderKey key; - final RecordIdentifier maxKey; + RecordIdentifier minKey; + RecordIdentifier maxKey; final int bucket; private final int statementId; @@ -210,16 +215,23 @@ public String toString() { ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; + this.minKey = minKey; this.maxKey = maxKey; this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); this.statementId = statementId; + } + /** + * This must be called right after the constructor but not in the constructor to make sure + * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called + */ + void advnaceToMinKey() throws IOException { // advance the reader until we reach the minimum key do { next(nextRecord); } while (nextRecord != null && - (minKey != null && key.compareRow(minKey) <= 0)); + (minKey != null && key.compareRow(minKey) <= 0)); } void next(OrcStruct next) throws IOException { @@ -253,64 +265,238 @@ int getColumns() { * A reader that pretends an original base file is a new version base file. * It wraps the underlying reader's row with an ACID event object and * makes the relevant translations. + * + * Running multiple Insert statements on the same partition (of non acid table) creates files + * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all + * of these files as part of a single logical file. + * + * For Compaction, where each split includes the whole bucket, this means reading over all the + * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket. + * + * For a read after the table is marked transactional but before it's rewritten into a base/ + * by compaction, each of the original files may be split into many pieces. For each split we + * must make sure to include only the relevant part of each delta file. + * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each + * split of the original file and used to filter all the deltas. The ROW__ID.rowid for the rows + * of the 'original' file of course, must be assigned from the beginning of logical bucket. */ static final class OriginalReaderPair extends ReaderPair { + private final Options mergerOptions; + /** + * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list + */ + private long rowIdOffset = 0; + /** + * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. This + * is the full list when compacting and empty when reading. + */ + private final List originalFiles; + /** + * index into {@link #originalFiles} + */ + private int nextFileIndex = 0; + private long numRowsInCurrentFile = 0; + private RecordReader originalFileRecordReader = null; + private final Configuration conf; + private final Reader.Options options; OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, - Reader.Options options) throws IOException { + Reader.Options options, Options mergerOptions, Configuration conf, + ValidTxnList validTxnList) throws IOException { super(key, reader, bucket, minKey, maxKey, options, 0); + this.mergerOptions = mergerOptions; + this.conf = conf; + this.options = options; + assert mergerOptions.getRootPath() != null : "Since we have original files"; + assert bucket >= 0 : "don't support non-bucketed tables yet"; + if(mergerOptions.isCompacting()) { + { + //when compacting each split needs to process the whole logical bucket + assert options.getOffset() == 0; + assert options.getMaxOffset() == Long.MAX_VALUE; + assert minKey == null; + assert maxKey == null; + } + AcidUtils.Directory directoryState = AcidUtils.getAcidState( + mergerOptions.getRootPath(), conf, validTxnList, false, true); + originalFiles = directoryState.getOriginalFiles(); + assert originalFiles.size() > 0; + recordReader.close();//todo: should not even pass it in if compacting originals.... + reader = advanceToNextFile();//in case of Compaction, this is the 1st of the current bucket + if(reader == null) { + //Compactor generated a split for a bucket that has no data? + throw new IllegalStateException("No 'original' files found for bucketId=" + bucket + + " in " + mergerOptions.getRootPath()); + } + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); + } + else { + boolean isLastFileForThisBucket = false; + if (mergerOptions.getCopyIndex() > 0) { + //the split is from something other than the 1st file of the logical bucket - compute offset + originalFiles = Collections.emptyList(); + + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), + conf, validTxnList, false, true); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() != bucket) { + continue; + } + if(isLastFileForThisBucket) { + //if here we already saw current file and now found another file for the same bucket + //so the current file is not the last file of the logical bucket + isLastFileForThisBucket = false; + break; + } + if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { + //found the file whence the current split is from so we're done counting offset + isLastFileForThisBucket = true;//to indicate we saw current file + continue; + } + Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(), OrcFile.readerOptions(conf)); + rowIdOffset += copyReader.getNumberOfRows(); + } + if (rowIdOffset > 0) { + //rowIdOffset could be 0 if all files before current one are empty + /** + * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options)} + * need to fix min/max key since these are used by {@link #next(OrcStruct)} which uses + * {@link #rowIdOffset} to generate rowId for the key. Clear? + * todo: move discovery of min/max key to ReaderPair c'tor instead? - less convoluted but doing this makes some UTs difficult*/ + if (minKey != null) { + minKey.setRowId(minKey.getRowId() + rowIdOffset); + } + else { + /* If this is not the 1st file, set minKey 1 less than the start of current file + * (Would not need this if we knew that there are o delta files)*/ + this.minKey = new RecordIdentifier(0, bucket, rowIdOffset - 1); + } + if (maxKey != null) { + maxKey.setRowId(maxKey.getRowId() + rowIdOffset); + } + } + } else { + isLastFileForThisBucket = true; + originalFiles = Collections.emptyList(); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); + int numFilesInBucket= 0; + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() == bucket) { + numFilesInBucket++; + if(numFilesInBucket > 1) { + isLastFileForThisBucket = false; + break; + } + } + } + } + originalFileRecordReader = recordReader; + if(!isLastFileForThisBucket && this.maxKey == null) { + /** + * If this is the last file for this bucket, maxKey == null means the split is the tail + * of the file so we want to leave it blank to make sure any insert events in delta + * files are included; Conversely, if it's not the last file, set the maxKey so that + * events from deltas that don't modify anything in the current split are excluded*/ + this.maxKey = new RecordIdentifier(0, bucket, + rowIdOffset + reader.getNumberOfRows() - 1); + } + } + } + @Override + void advnaceToMinKey() throws IOException { + super.advnaceToMinKey(); } @Override void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - long nextRowId = recordReader.getRowNumber(); - // have to do initialization here, because the super's constructor - // calls next and thus we need to initialize before our constructor - // runs - if (next == null) { - nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); - IntWritable operation = + while(true) { + if (originalFileRecordReader.hasNext()) { + //RecordReader.getRowNumber() produces a file-global row number even with PPD + long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset; + // have to do initialization here, because the super's constructor + // calls next and thus we need to initialize before our constructor + // runs todo: does this comment still make sense? + if (next == null) { + nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); + IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); - nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); - nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); + nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, + nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, + nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(null)); - } else { - nextRecord = next; - ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) + nextRecord.setFieldValue(OrcRecordUpdater.ROW, + originalFileRecordReader.next(null)); + } else { + nextRecord = next; + ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) .set(0); - ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) + ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucket); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) .set(0); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(OrcRecordUpdater.getRow(next))); - } - key.setValues(0L, bucket, nextRowId, 0L, 0); - if (maxKey != null && key.compareRow(maxKey) > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("key " + key + " > maxkey " + maxKey); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, + originalFileRecordReader.next(OrcRecordUpdater.getRow(next))); + } + key.setValues(0L, bucket, nextRowId, 0L, 0); + if (maxKey != null && key.compareRow(maxKey) > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("key " + key + " > maxkey " + maxKey); + } + nextRecord = null; + originalFileRecordReader.close(); + } + return; + } else { + if (originalFiles.size() <= nextFileIndex) { + //no more original files + nextRecord = null; + originalFileRecordReader.close(); + return; + } else { + assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting"; + rowIdOffset += numRowsInCurrentFile; + originalFileRecordReader.close(); + Reader reader = advanceToNextFile(); + if(reader == null) { + nextRecord = null; + return; + } + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); } - nextRecord = null; - recordReader.close(); } - } else { - nextRecord = null; - recordReader.close(); } } + /** + * Finds the next file of the logical bucket + * @return {@code null} if there are no more files + */ + private Reader advanceToNextFile() throws IOException { + while(nextFileIndex < originalFiles.size()) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() == bucket) { + break; + } + nextFileIndex++; + } + if(originalFiles.size() <= nextFileIndex) { + return null;//no more files for current bucket + } + return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf)); + } @Override int getColumns() { @@ -339,9 +525,16 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, ) throws IOException { long rowLength = 0; long rowOffset = 0; - long offset = options.getOffset(); - long maxOffset = options.getMaxOffset(); + long offset = options.getOffset();//this would usually be at block boundary + long maxOffset = options.getMaxOffset();//this would usually be at block boundary boolean isTail = true; + /** + * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't + * necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st + * row of the first stripe that starts after getOffset() and maxKey to be the last row of the + * stripe that contains getMaxOffset(). This breaks if getOffset() and getMaxOffset() are inside + * the sames tripe + */ for(StripeInformation stripe: reader.getStripes()) { if (offset > stripe.getOffset()) { rowOffset += stripe.getNumberOfRows(); @@ -416,6 +609,45 @@ private void discoverKeyBounds(Reader reader, return result; } + static class Options { + private int copyIndex = 0; + private boolean isCompacting = false; + private Path bucketPath; + /** + * Partition folder (Table folder if not partitioned) + */ + private Path rootPath; + Options copyIndex(int copyIndex) { + assert copyIndex >= 0;//??? + this.copyIndex = copyIndex; + return this; + } + Options isCompacting(boolean isCompacting) { + this.isCompacting = isCompacting; + return this; + } + Options bucketPath(Path bucketPath) { + this.bucketPath = bucketPath; + return this; + } + Options rootPath(Path rootPath) { + this.rootPath = rootPath; + return this; + } + /** + * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix + */ + int getCopyIndex() { + return copyIndex; + } + boolean isCompacting() { + return isCompacting; + } + Path getBucketPath() { + return bucketPath; + } + Path getRootPath() { return rootPath; } + } /** * Create a reader that merge sorts the ACID events together. * @param conf the configuration @@ -433,7 +665,7 @@ private void discoverKeyBounds(Reader reader, int bucket, ValidTxnList validTxnList, Reader.Options options, - Path[] deltaDirectory) throws IOException { + Path[] deltaDirectory, Options mergerOptions) throws IOException { this.conf = conf; this.collapse = collapseEvents; this.offset = options.getOffset(); @@ -462,20 +694,32 @@ private void discoverKeyBounds(Reader reader, // use the min/max instead of the byte range ReaderPair pair; ReaderKey key = new ReaderKey(); + /* TODO: + 1. move discoverKeyBounds into ReaderPair - this makes testing difficult + make min/maxKey in OrcRawRecordMerger final and set here after reading from ReaderPair + Add a test version of c'tor to pass them in? + + maybe then 'advanceToMinKey() can be pushed back into ReaderPair... NO + * 4. note that OrcInputFormat.getReader() assumes that root is at certain distance from file + * which may not be the case - need to look at OrcInputFormat - pass it through OrcSplit? where can we access the metastore?*/ if (isOriginal) { options = options.clone(); pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); + options, mergerOptions, conf, validTxnList); + baseReader = ((OriginalReaderPair)pair).originalFileRecordReader; + minKey = pair.minKey; + maxKey = pair.maxKey; } else { pair = new ReaderPair(key, reader, bucket, minKey, maxKey, eventOptions, 0); + baseReader = pair.recordReader; } - + pair.advnaceToMinKey(); // if there is at least one record, put it in the map if (pair.nextRecord != null) { readers.put(key, pair); } - baseReader = pair.recordReader; +// baseReader = pair.recordReader; } // we always want to read all of the deltas @@ -504,6 +748,7 @@ private void discoverKeyBounds(Reader reader, ReaderPair deltaPair; deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + deltaPair.advnaceToMinKey(); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -648,6 +893,7 @@ public void close() throws IOException { @Override public float getProgress() throws IOException { + //this is not likely to do the right thing for Compaction of "original" files when there are copyN files return baseReader == null ? 1 : baseReader.getProgress(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..0cdb4cec47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -360,8 +360,11 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + //todo: mergerOptions are incomplete here - maybe not. I think only applies when we no longer have isOriginal files + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, - validTxnList, readerOptions, deleteDeltas); + validTxnList, readerOptions, deleteDeltas, + mergerOptions); this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); this.deleteRecordValue = this.deleteRecords.createValue(); // Initialize the first value in the delete reader. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5b908e8453..9ecb37f63e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3103,7 +3103,7 @@ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, * I'll leave the below loop for now until a better approach is found. */ for (int counter = 1; destFs.exists(destFilePath); counter++) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); } if (isRenameAllowed) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db8bf..bafed9e0e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -251,6 +251,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, // There are original format files for (HdfsFileStatusWithId stat : originalFiles) { Path path = stat.getFileStatus().getPath(); + //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); LOG.debug("Adding original file " + path + " to dirs to search"); } @@ -275,6 +276,12 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, su.gatherStats(); } + + /** + * @param baseDir if not null, it's either table/partition root folder or base_xxxx. + * If it's base_xxxx, it's in dirsToSearch, else the actual original files + * (all leaves recursively) are in the dirsToSearch list + */ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List parsedDeltas, @@ -363,7 +370,7 @@ public CompactorInputSplit() { * @param hadoopConf * @param bucket bucket to be processed by this split * @param files actual files this split should process. It is assumed the caller has already - * parsed out the files in base and deltas to populate this list. + * parsed out the files in base and deltas to populate this list. Includes copy_N * @param base directory of the base, or the partition/table location if the files are in old * style. Can be null. * @param deltas directories of the delta files. diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 7c66955e14..c0800bf484 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hive.ql; +import javafx.scene.control.Tab; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.output.StringBuilderWriter; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -105,6 +106,7 @@ public void setUp() throws Exception { hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); @@ -880,4 +882,103 @@ public void testMergeCase() throws Exception { runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)"); runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))"); } + + /** + * This is to demonstrate the issue in HIVE-16177 - ROW__IDs are not unique. + */ + @Test + public void testNonAcidToAcidConversion0() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,3)"); + //we should now have non-empty bucket files 000001_0 and 000001_0_copy_1 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + + //check that the files are named as expected and that Acid Reader produces unique ROW__ID.rowId + // across all files making up a logical bucket + List rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile")); + Assert.assertTrue(rs.get(0), rs.get(0).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("/warehouse/nonacidorctbl/000001_0\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\tfile")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("/warehouse/nonacidorctbl/000001_0_copy_1\t1\t3")); + + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + //more BS - we had 2 rows in the table and now we have just 1 after compaction.... + //Compactor doesn't know about copy_N files either - would have to take all of them together for a given bucket + //and read as a single stream in order... but OrcRawRecordMerge takes a Reader as input which is reading 1 file + //so we'd have to implement a new Reader that keeps track of a collection of inner readers and chains them + //together.... + //Worse yet, is that there is no guarantee in Hive that all data files in a partition are 1st level children + //some may be in subdirs, according to Sergey: + //[5:17 PM] Sergey Shelukhin: 1) list bucketing + //[5:18 PM] Sergey Shelukhin: 2) any time, if the MR recursive-whatever setting is enabled + //[5:18 PM] Sergey Shelukhin: 3) iirc Hive can produce it sometimes from unions but I'm not sure + //try ctas and insert with union in the query on Tez + Assert.assertEquals("wrong RS cardinality",2, rs.size()); + Assert.assertTrue(rs.get(1), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile")); + Assert.assertTrue(rs.get(0), rs.get(0).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("warehouse/nonacidorctbl/base_-9223372036854775808/bucket_00001\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\tfile")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("warehouse/nonacidorctbl/base_-9223372036854775808/bucket_00001\t1\t3")); + } + + /** + * Is this test needed? + * see HIVE-16177 + * Note sure it's a good idea to lock in original files. On Tez, for examples, we should not be + * creating empty files... + */ + @Test + public void testNonAcidToAcidConversion01() throws Exception { + //create 1 row in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + //create a delta directory + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)"); + + //make sure we assign correct Ids + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("", 4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1")); + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("", 4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001")); + + //make sure they are the same before and after compaction + } + } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f659..fb06f17ec4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1499,25 +1499,127 @@ public void testMerge3() throws Exception { } /** * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception */ - @Ignore @Test - public void testMultiInsert() throws Exception { + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + "partitioned by (z int) clustered by (a) into 2 buckets " + "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); runStatementOnDriver("insert into data1 values (1),(2),(3)"); // runStatementOnDriver("insert into data2 values (4),(5),(6)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + //supposedly this gives non-unique IDs + List r = runStatementOnDriver(" from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ +*/ + } + @Test + public void testMultiInsert2() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + Listr = runStatementOnDriver("from data1 " + + "insert into srcpart partition(z) select 0,0,x,1 where x % 2 = 0 " + + "insert into srcpart partition(z=1) select 2,0,x where x % 2 = 1"); + r = runStatementOnDriver("select * from srcpart order by a,b,c,z"); + int[][] rExpected = {{0,0,2,1},{2,0,1,1},{2,0,3,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + We get correct results and ROW_IDs look correct - but this is a fluke (I guess the optimizer figures out that DP branch is also actually SP with z=1 (change 1 -> x and you end up with dup ROW_IDs) + (maybe some other reason: either way OrcRecordUpdate.findRowIdOffsetForInsert() ends up doing the right thing) + Once again, there is an empty bucket in static part branch + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1 +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":3}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ +*/ + + } + @Test + public void testMultiInsert3() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + //this produces 3 rows (as it should I'd think) + List r = runStatementOnDriver("select 0,0,1,1 from data1 where x % 2 = 0 UNION ALL select 2,0.0,1,1 from data1 where x % 2 = 1"); } /** * Investigating DP and WriteEntity, etc @@ -1623,6 +1725,66 @@ public void testMergeAliasedTarget() throws Exception { int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } + /** + * see HIVE-16177 + */ + @Test + public void testNonAcidToAcidConversion02() throws Exception { + //create 2 rows in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)"); + //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); + //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + List rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL); + //create a some of delta directories + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)"); + runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)"); + runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3"); + + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + /* + * All ROW__IDs are unique on read after conversion to acid + * ROW__IDs are exactly the same before and after compaction + * Also check the file name after compaction for completeness + */ + String[][] expected = {{"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"transactionid\":18,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"} + }; + Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + } + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b"); + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make sure they are the same before and after compaction + } @Test @Ignore("Values clause with table constructor not yet supported") diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index a7ff9a3749..c928732e07 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; @@ -142,6 +143,10 @@ public void testOriginal() throws Exception { Configuration conf = new Configuration(); MockFileSystem fs = new MockFileSystem(conf, new MockFile("mock:/tbl/part1/000000_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1", + 500, new byte[0]), + new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2", + 500, new byte[0]), new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]), new MockFile("mock:/tbl/part1/000002_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/random", 500, new byte[0]), @@ -154,13 +159,17 @@ public void testOriginal() throws Exception { assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); List result = dir.getOriginalFiles(); - assertEquals(5, result.size()); + assertEquals(7, result.size()); assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString()); - assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1", + result.get(1).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2", + result.get(2).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000001_1", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000002_0", result.get(4).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/random", result.get(5).getFileStatus().getPath().toString()); assertEquals("mock:/tbl/part1/subdir/000000_0", - result.get(4).getFileStatus().getPath().toString()); + result.get(6).getFileStatus().getPath().toString()); } @Test diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index bb7985711f..41649a7d40 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -910,7 +910,7 @@ public void testEtlCombinedStrategy() throws Exception { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } @@ -924,7 +924,7 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb1dd..ec415a58a0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.fs.FileStatus; import org.apache.orc.CompressionKind; import org.apache.orc.MemoryManager; import org.apache.orc.StripeInformation; @@ -62,6 +63,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; @@ -189,6 +191,7 @@ public void testReaderPair() throws Exception { RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -215,6 +218,7 @@ public void testReaderPairNoMin() throws Exception { ReaderPair pair = new ReaderPair(key, reader, 20, null, null, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -290,8 +294,14 @@ public void testOriginalReaderPair() throws Exception { RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); boolean[] includes = new boolean[]{true, true}; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testOriginalReaderPair"); + fs.makeQualified(root); + fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, - new Reader.Options().include(includes)); + new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -319,8 +329,14 @@ private static ValidTxnList createMaximalTxnList() { public void testOriginalReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testOriginalReaderPairNoMin"); + fs.makeQualified(root); + fs.create(root); ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, - new Reader.Options()); + new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList()); + pair.advnaceToMinKey(); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -423,7 +439,7 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), - new Reader.Options().range(1000, 1000), null); + new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options()); RecordReader rr = merger.getCurrentReader().recordReader; assertEquals(0, merger.getOtherReaders().size()); @@ -531,7 +547,7 @@ public void testEmpty() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); RecordIdentifier key = merger.createKey(); OrcStruct value = merger.createValue(); assertEquals(false, merger.next(key, value)); @@ -606,7 +622,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); assertEquals(null, merger.getMinKey()); assertEquals(null, merger.getMaxKey()); RecordIdentifier id = merger.createKey(); @@ -681,7 +697,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { // make a merger that doesn't collapse events merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, @@ -780,7 +796,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, txns, new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); for(int i=0; i < values.length; ++i) { assertEquals(true, merger.next(id, event)); LOG.info("id = " + id + "event = " + event); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf13129b8..73bc1ab852 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -182,7 +182,7 @@ public void setup() throws Exception { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); assertEquals(1, splitStrategies.size()); List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 0483e91c4b..8d7eb51cdb 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -735,6 +735,10 @@ public FileStatus getFileStatus() { public Long getFileId() { return fileId; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } } @Override diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index c280d49227..31586ab51c 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.tools.HDFSConcat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; @@ -269,9 +270,17 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor */ public void hflush(FSDataOutputStream stream) throws IOException; - public interface HdfsFileStatusWithId { + public interface HdfsFileStatusWithId extends Comparable{ public FileStatus getFileStatus(); public Long getFileId(); + + /** + * If this sort order is changed and there are tables that have been converted to transactional + * and have had any update/delete/merge operations performed but not yet fully compacted, it + * will result in data loss. + */ + @Override + public int compareTo(HdfsFileStatusWithId o); } public HCatHadoopShims getHCatShim();