diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 2bf9871956..a2cedc6b88 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import org.apache.hadoop.fs.FileUtil; @@ -33,14 +35,14 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; -import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class resides in itests to facilitate running query using Tez engine, since the jars are @@ -260,4 +262,323 @@ private void setupMapJoin(HiveConf conf) { driver.getResults(rs); return rs; } + static List stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List rs = new ArrayList(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator { + @Override + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class); + + private void logResuts(List r, String header, String prefix) { + LOG.info(prefix + " " + header); + StringBuilder sb = new StringBuilder(); + int numLines = 0; + for(String line : r) { + numLines++; + sb.append(prefix).append(line).append("\n"); + } + LOG.info(sb.toString()); + LOG.info(prefix + " Printed " + numLines + " lines"); + } + @Test + public void testUnion() throws Exception { + runStatementOnDriver("drop table if exists srcpart2"); + runStatementOnDriver("create table if not exists srcpart2 (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 3 buckets " + + "stored as orc"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); + runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + setupTez(hc); + hc.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + List r = runStatementOnDriver("explain insert into srcpart2 partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + logResuts(r, "explain3", ""); +// r = runStatementOnDriver("insert into srcpart2 partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + r = runStatementOnDriver("create table srcpart3 stored as orc as select x, 1, 2 from data1 union all select x, 3, 4 from data2"); + //see https://issues.apache.org/jira/browse/HIVE-15899 for dir structure that this produces + } + /** + * https://hortonworks.jira.com/browse/BUG-66580 + */ + @Test + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 3 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); + runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + HiveConf hc = new HiveConf(hiveConf); + setupTez(hc); + hc.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + d = new Driver(hc); + String query = " from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"; + List r = runStatementOnDriver("explain " + query); + logResuts(r, "explain1", ""); + runStatementOnDriver(query); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; +// Assert.assertEquals(stringifyValues(rExpected), r); + + r = runStatementOnDriver("explain insert into srcpart partition(z) select x, 1, 2, x as z from data1 union all select x, 3, 4, x as z from data2"); + logResuts(r, "explain2", ""); + /* + 2017-03-09T13:53:16,148 DEBUG [main] ql.Driver: Shutting down query explain insert into srcpart partition(z=1) select x, 1, 2 from data1 union all select x, 3, 4 from data2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: explain2 +2017-03-09T13:53:16,149 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez + DagId: ekoifman_20170309135305_d6258cb6-9b75-4b4a-ba67-a666d919d5e0:4 + Edges: + Map 1 <- Union 2 (CONTAINS) + Map 4 <- Union 2 (CONTAINS) + Reducer 3 <- Union 2 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 1 (type: int), 2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Map 4 + Map Operator Tree: + TableScan + alias: data2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int), 3 (type: int), 4 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: int) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), VALUE._col2 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 12 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Union 2 + Vertex: Union 2 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z 1 + replace: false +*/ + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ + + + +2017-03-09T12:06:28,292 DEBUG [main] ql.Driver: Shutting down query explain from data1 insert into srcpart partition(z) select 0,0,x,x insert into srcpart partition(z=1) select 0,0,17 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: explain1 +2017-03-09T12:06:28,294 INFO [main] ql.TestAcidOnTez: STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-3 + Stage-4 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Tez + DagId: ekoifman_20170309120619_f21fdc51-b8ba-4c30-ac1d-b7aa62cec6b1:1 + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) + DagName: + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: data1 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: x (type: int) + outputColumnNames: _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + value expressions: _col2 (type: int) + Select Operator + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Map-reduce partition columns: 0 (type: int) + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), VALUE._col1 (type: int), VALUE._col1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: 0 (type: int), 0 (type: int), 17 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 6 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + Write Type: INSERT + + Stage: Stage-3 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + z + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + partition: + z 1 + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.srcpart + + Stage: Stage-5 + Stats-Aggr Operator + +2017-03-09T12:06:28,295 INFO [main] ql.TestAcidOnTez: Printed 100 lines + +*/ + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ebf13442f4..a53da97b13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1205,7 +1205,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ - private static final String COPY_KEYWORD = "_copy_"; // copy keyword + public static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 7c7074d156..0edef65828 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -116,6 +116,9 @@ public DeltaMetaData() { this(0,0,new ArrayList()); } + /** + * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition + */ DeltaMetaData(long minTxnId, long maxTxnId, List stmtIds) { this.minTxnId = minTxnId; this.maxTxnId = maxTxnId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827424..405cfde503 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; @@ -51,6 +52,11 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + /** + * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)} + * _copy_N starts with 1. + */ + private int copyNumber = 0; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -180,6 +186,18 @@ public Options bucket(int bucket) { } /** + * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket + * file. + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + * @param copyNumber the number of the copy ( > 0) + * @return this + */ + public Options copyNumber(int copyNumber) { + this.copyNumber = copyNumber; + return this; + } + + /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names * @return this @@ -293,6 +311,9 @@ boolean getOldStyle() { public int getStatementId() { return statementId; } + public int getCopyNumber() { + return copyNumber; + } public Path getFinalDestination() { return finalDestination; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3363..2ca5c1cc1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -48,6 +48,8 @@ import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -98,7 +100,12 @@ public boolean accept(Path path) { */ public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); + //this should match 000000_0_copy_1 - todo: add unit test for this - since we rely on this public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** + * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read + * a "delta" dir written by a real Acid write - cannot have any copies + */ public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -113,6 +120,18 @@ private AcidUtils() { private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+"); + + public static final PathFilter copyFileFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches(); + } + }; public static final PathFilter hiddenFileFilter = new PathFilter(){ @Override @@ -243,7 +262,22 @@ static long parseBase(Path path) { .maximumTransactionId(0) .bucket(bucket) .writingBase(true); - } else if (filename.startsWith(BUCKET_PREFIX)) { + } + else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + //todo: define groups in regex and use parseInt(Matcher.group(2)).... + //arguably this should throw since we don't know how to handle this in OrcRawRecordMerger + int bucket = + Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); + result + .setOldStyle(true) + .minimumTransactionId(0) + .maximumTransactionId(0) + .bucket(bucket) + .copyNumber(copyNumber) + .writingBase(true); + } + else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { @@ -269,6 +303,7 @@ static long parseBase(Path path) { .bucket(bucket); } } else { + //why is this useful? what are we going to do with bucketId = -1? result.setOldStyle(true).bucket(-1).minimumTransactionId(0) .maximumTransactionId(0); } @@ -767,6 +802,11 @@ public static Directory getAcidState(Path directory, return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles); } + /** + * TODO: this should probably look at all Original files found and check that they match some pattern we + * understand and raise some error if there are any that do not - otherwise we may skip some file + * that has data and thus end up with data loss - file a ticket + */ public static Directory getAcidState(Path directory, Configuration conf, ValidTxnList txnList, @@ -893,7 +933,10 @@ else if (prev != null && next.maxTransaction == prev.maxTransaction final Path base = bestBase.status == null ? null : bestBase.status.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); - + /** + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger#OriginalReaderPair} + */ + Collections.sort(original); return new Directory(){ @Override @@ -1001,9 +1044,32 @@ public FileStatus getFileStatus() { public Long getFileId() { return null; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } } /** + * See https://issues.apache.org/jira/browse/HIVE-15899 + * There is logic in various places in Acid that cannot handle with arbitrary layout on disk. + * It can handle bucket files under base_xxx or delta_xxx/delete_delta_xxx as created by a write + * to an acid table. It can handle bucket files under the table or partition directory as would + * usually be created by a write to a non acid table (which is relevant here if the table is then + * converted to acid). If you have multiple levels of nesting, the rest of the system is not + * smart enough to figure out that parts of the same logical bucket may be a "cousin" files. + * This is a suboptimal solution since the logic is only triggered by some acid operation on a + * partition but the conversion to acid table is done by setting transactional=true table property. + * To enforce this properly, {@link TransactionalValidationListener} would have to scan all partitions + * of a table to look for subdirectories. + * + * Alternatively, make all downstream processing look for all parts of a logical bucket file, i.e. + * include all siblings/cousins regardless of how fare removed they are. + */ + private static void assertValidDirectoryLayout(FileStatus dir) throws IllegalStateException { + throw new IllegalStateException(""); + } + /** * Find the original files (non-ACID layout) recursively under the partition directory. * @param fs the file system * @param stat the directory to add @@ -1031,18 +1097,20 @@ private static void findOriginals(FileSystem fs, FileStatus stat, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDir()) { + assertValidDirectoryLayout(child.getFileStatus()); findOriginals(fs, child.getFileStatus(), original, useFileIds); } else { - original.add(child); + original.add(child);//todo: skip empty ones } } } else { List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { + assertValidDirectoryLayout(child); findOriginals(fs, child, original, useFileIds); } else { - original.add(createOriginalObj(null, child)); + original.add(createOriginalObj(null, child));//todo: skip empty ones - or not - how's cleaner going to clean? } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java index 9299306ff1..aafb72c19e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java @@ -220,6 +220,10 @@ public FileStatus getFileStatus() { public Long getFileId() { return fileId; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } }); return fileId; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 8fb7211bac..652369a89c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -108,6 +108,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.apache.orc.ColumnStatistics; +import org.apache.orc.FileFormatException; import org.apache.orc.OrcProto; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; @@ -1678,7 +1679,7 @@ private long computeProjectionSize(List fileTypes, // independent split strategies for them. There is a global flag 'isOriginal' that is set // on a per split strategy basis and it has to be same for all the files in that strategy. List> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, + adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); for (SplitStrategy splitStrategy : splitStrategies) { @@ -1924,11 +1925,12 @@ public float getProgress() throws IOException { Path root; if (split.hasBase()) { if (split.isOriginal()) { + //todo: this assumes that all bucket files are immediate children which is not always the case in Hive root = path.getParent(); } else { root = path.getParent().getParent(); } - } else { + } else {//todo: so here path is a delta/ but above it's a partition/ root = path; } @@ -1948,7 +1950,22 @@ public float getProgress() throws IOException { final Configuration conf = options.getConfiguration(); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); - final int bucket = OrcInputFormat.getBucketForSplit(conf, split); + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); + mergerOptions.rootPath(root); + final int bucket; + if (split.hasBase()) { + AcidOutputFormat.Options acidIOOptions = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf); + if(acidIOOptions.getBucket() < 0) { + //todo: now what? + } + bucket = acidIOOptions.getBucket(); + if(split.isOriginal()) { + mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath()); + } + } else { + bucket = (int) split.getStart(); + } + final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); @@ -1957,7 +1974,7 @@ public float getProgress() throws IOException { new ValidReadTxnList(txnString); final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, - validTxnList, readOptions, deltas); + validTxnList, readOptions, deltas, mergerOptions); return new RowReader() { OrcStruct innerRecord = records.createValue(); @@ -2010,10 +2027,16 @@ public float getProgress() throws IOException { }; } - static Path findOriginalBucket(FileSystem fs, + private static Path findOriginalBucket(FileSystem fs, Path directory, int bucket) throws IOException { + //todo: are all buckets necessarily 1 level deep? - Sergey says NO; CTAS + Tez + Union All generates 1/ and 2/ dirs for each leg of union + //is CTAST supported with ACID? probably not but converting such table to Acid is probably not blocked for(FileStatus stat: fs.listStatus(directory)) { + if(stat.getLen() <= 0) { + continue;//inserting into non-acid bucketed table definitely can create empty bucket files (not even a header) - why wasn't this needed before? + }//trying to create Reader over an empty file makes ORC barf - naturally + //see Utilities#COPY_KEYWORD String name = stat.getPath().getName(); String numberPart = name.substring(0, name.indexOf('_')); if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) && @@ -2055,14 +2078,6 @@ static Reader createOrcReaderForSplit(Configuration conf, OrcSplit orcSplit) thr return reader; } - static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { - if (orcSplit.hasBase()) { - return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); - } else { - return (int) orcSplit.getStart(); - } - } - public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg, OrcFile.WriterVersion writerVersion, List types, List stripeStats, int stripeCount) { @@ -2133,9 +2148,12 @@ private static boolean isStripeSatisfyPredicate( return sarg.evaluate(truthValues).isNeeded(); } + /** + * @param dir - what is this? run debugger + */ @VisibleForTesting static List> determineSplitStrategies(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, List parsedDeltas, List readerTypes, @@ -2146,7 +2164,7 @@ private static boolean isStripeSatisfyPredicate( // When no baseFiles, we will just generate a single split strategy and return. List acidSchemaFiles = new ArrayList(); if (baseFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2166,7 +2184,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for non-acid schema original files, if any. if (!originalSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2175,7 +2193,7 @@ private static boolean isStripeSatisfyPredicate( // Generate split strategy for acid schema files, if any. if (!acidSchemaFiles.isEmpty()) { - splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); if (splitStrategy != null) { splitStrategies.add(splitStrategy); @@ -2187,7 +2205,7 @@ private static boolean isStripeSatisfyPredicate( @VisibleForTesting static SplitStrategy determineSplitStrategy(CombinedCtx combinedCtx, Context context, - FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + FileSystem fs, Path dir, List baseFiles, boolean isOriginal, List parsedDeltas, @@ -2257,13 +2275,26 @@ private static boolean isStripeSatisfyPredicate( bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket); } else { isOriginal = true; + /*this is clearly wrong - in presence of copy_N files and subdirs we can have several + * files with the same bucket id representing 1 logical bucket*/ bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf), baseDirectory, bucket); } reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf)); } + //this is for Compactor + //todo: if isOriginal, find N in copyN and pass it in.... + //except that it's just picking 1 file, rather than all copies... WTF + //or perhaps + //hmm so now the Merger has to know if it's doing compaction and treat all copy_N files for the + //same bucket as one logical file but for regular reads, we want to look up all copy_M, M originalFiles; + private int nextFileIndex = 0; + private long numRowsInCurrentFile = 0; + private RecordReader originalFileRecordReader = null; + private final Configuration conf; + private final Reader.Options options; OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, - Reader.Options options) throws IOException { + Reader.Options options, Options mergerOptions, Configuration conf, ValidTxnList validTxnList) throws IOException { super(key, reader, bucket, minKey, maxKey, options, 0); + this.mergerOptions = mergerOptions; + this.conf = conf; + this.options = options; + assert mergerOptions.getRootPath() != null : "Since we have original files"; + assert bucket >= 0 : "don't support non-bucketed tables yet"; + if(mergerOptions.isCompacting()) { + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); + originalFiles = directoryState.getOriginalFiles(); + assert originalFiles.size() > 0; + if(originalFiles.size() == 1) { + originalFileRecordReader = recordReader; + } + else { + recordReader.close(); + while(nextFileIndex < originalFiles.size() && originalFiles.get(nextFileIndex).getFileStatus().getLen() <= 0) { + //sometimes we get empty files which ORC can't read - should not need this + nextFileIndex++; + } + reader = OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf)); + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); + } + } + else { + if (mergerOptions.getCopyIndex() > 0) { + originalFiles = Collections.emptyList(); + boolean isLastFileForThisBucket = false; + + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() != bucket) { + continue; + } + if(isLastFileForThisBucket) { + //if here we already saw current file and now found another file for the same bucket + //so the current file is not the last file of the logical bucket + isLastFileForThisBucket = false; + break; + } + if (f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) { + //found the file whence the current split is from so we're done counting offset + isLastFileForThisBucket = true;//to indicate we saw current file + continue; + } + Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(), OrcFile.readerOptions(conf)); + rowIdOffset += copyReader.getNumberOfRows(); + } + if (rowIdOffset > 0) { + /**fix min/max key since these are used by {@link #next(OrcStruct)} which uses + * {@link #rowIdOffset} + * to generate rowId for the key. Clear? + * todo: move discovery of min/max key to ReaderPair c'tor instead? - less convoluted + * doing this makes some UTs difficult*/ + if (minKey != null) { + minKey.setRowId(minKey.getRowId() + rowIdOffset); + } + else { + /*if this is not the 1st file, set minKey 1 less than the start of current file (or last key of previous file) + * This is only needed if there are delta files*/ + this.minKey = new RecordIdentifier(0, bucket, rowIdOffset - 1); + } + if (maxKey != null) { + maxKey.setRowId(maxKey.getRowId() + rowIdOffset); + } + else { + if(!isLastFileForThisBucket) { + /*if this is not the last file, set the key to last key of file + * This is only needed if there are delta files; todo this should be done even for 1st file, i.e. when rowIdOffset == 0; no need if we know this is the last file*/ + this.maxKey = new RecordIdentifier(0, bucket, rowIdOffset + reader.getNumberOfRows() - 1); + } + } + } + else { + if (maxKey == null) { + //cur file may be the 1st in the logical bucket but may not be the last. So we need maxKey (unless we know it's the last file) + //if this is the last, then this maxKey is the last key of the file and functionally the same as setting it to null (but less efficient) + if(!isLastFileForThisBucket) { + this.maxKey = new RecordIdentifier(0, bucket, rowIdOffset + reader.getNumberOfRows() - 1); + //DOH if this IS the last file, have to leave it a null + } + } + } + } else { + originalFiles = Collections.emptyList();//todo: should set maxKey here if we know there are more files (how do we know ?) + AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); + int numFilesInBucket= 0; + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucket() == bucket) { + numFilesInBucket++; + if(numFilesInBucket > 1) { + //if here it means current file (reader) is not the last file of the logical buckret + //since copyIndex of the current file is 0 + this.maxKey = new RecordIdentifier(0, bucket, reader.getNumberOfRows() - 1); + break; + } + } + } + } + originalFileRecordReader = recordReader; + } + //todo: is it ever OK for mergerOptions to be null? + //why isCompacting() check? + if(false && mergerOptions != null && !mergerOptions.isCompacting() && mergerOptions.getCopyIndex() > 0) { + //if copyIndex=0 that means rowIdOffset should be 0 so we can skip all this when + //there is just a single copy of each bucket file (should be most of the time) + FileSystem fs = mergerOptions.getBucketPath().getFileSystem(conf); + //is globStatus more efficient? does it really matter + FileStatus[] copies = fs.listStatus(mergerOptions.getBucketPath().getParent(), AcidUtils.copyFileFilter); + /*todo: above call won't handle any nested subdirectories tbl/part/1/ + but getAcidState() does. So in such a table, we could have a split from tbl/part/1/000_01 and tbl/part/2/000_01 that won't be treated as 1 logical bucket + We could do getAcidState.getOriginalFiles() and sort the paths to get a canonical order + Then pick out those that relate to the current bucket but preserve the order. + Then count the rows in all the files before the one whence the current split is from + Calling getAcidState for each split may get expensive for NN? Alternatively, getAcidState can be called + once on the client and each logical Bucket file set can be passed via Configuration or Split + */ + for(FileStatus f : copies) { + AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getPath(), conf); + if(bucketOptions.getBucket() != bucket) { + continue; + } + if(bucketOptions.getCopyNumber() < mergerOptions.getCopyIndex()) { + //this is a ...copy_M file with M < the copy we are processing in this OriginalReaderPair + /*open file, get number of rows in it and add to rowIdOffset*/ + Reader copyReader = OrcFile.createReader(f.getPath(), OrcFile.readerOptions(conf)); + rowIdOffset += copyReader.getNumberOfRows(); + //no close() since this just reads the footer to get metadata and closes underlying file + } + } + String zerothCopyName = mergerOptions.getBucketPath().getName().substring(0, mergerOptions.getBucketPath().getName().indexOf(Utilities.COPY_KEYWORD)); + Reader zerothFile = OrcFile.createReader(new Path(mergerOptions.getBucketPath().getParent(), zerothCopyName), OrcFile.readerOptions(conf)); + rowIdOffset += zerothFile.getNumberOfRows(); + if(rowIdOffset > 0) { + /**fix min/max key since these are used by {@link #next(OrcStruct)} which uses + * {@link #rowIdOffset} + * to generate rowId for the key. Clear? + * todo: move discovery of min/max key to ReaderPair c'tor instead? - less convoluted*/ + if(minKey != null) { + minKey.setRowId(minKey.getRowId() + rowIdOffset); + } + if(maxKey != null) { + maxKey.setRowId(maxKey.getRowId() + rowIdOffset); + } + } + } + } + @Override + void advnaceToMinKey() throws IOException { + super.advnaceToMinKey(); } @Override void next(OrcStruct next) throws IOException { - if (recordReader.hasNext()) { - long nextRowId = recordReader.getRowNumber(); - // have to do initialization here, because the super's constructor - // calls next and thus we need to initialize before our constructor - // runs - if (next == null) { - nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); - IntWritable operation = + while(true) { + if (originalFileRecordReader.hasNext()) { + /*this produces a file-global row number even with PPD*/ + long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset; + // have to do initialization here, because the super's constructor + // calls next and thus we need to initialize before our constructor + // runs + if (next == null) { + nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS); + IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); - nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); - nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation); + nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, new LongWritable(0)); - nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, + nextRecord.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, + nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(nextRowId)); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(null)); - } else { - nextRecord = next; - ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) + nextRecord.setFieldValue(OrcRecordUpdater.ROW, + originalFileRecordReader.next(null)); + } else { + nextRecord = next; + ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) .set(0); - ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) + ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucket); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) .set(0); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); - nextRecord.setFieldValue(OrcRecordUpdater.ROW, - recordReader.next(OrcRecordUpdater.getRow(next))); - } - key.setValues(0L, bucket, nextRowId, 0L, 0); - if (maxKey != null && key.compareRow(maxKey) > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("key " + key + " > maxkey " + maxKey); + nextRecord.setFieldValue(OrcRecordUpdater.ROW, + originalFileRecordReader.next(OrcRecordUpdater.getRow(next))); + } + key.setValues(0L, bucket, nextRowId, 0L, 0); + if (maxKey != null && key.compareRow(maxKey) > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("key " + key + " > maxkey " + maxKey); + } + nextRecord = null; + originalFileRecordReader.close(); + } + return; + } else { +// nextRecord = null; +// originalFileRecordReader.close(); + + if (originalFiles.size() <= nextFileIndex) { + //no more original files + nextRecord = null; + originalFileRecordReader.close(); + return; + } else { + rowIdOffset += numRowsInCurrentFile; + originalFileRecordReader.close(); + while(originalFiles.get(nextFileIndex).getFileStatus().getLen() <= 0) { + //sometimes we get empty files which ORC can't read + nextFileIndex++; + } + Reader reader = OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf)); + numRowsInCurrentFile = reader.getNumberOfRows(); + originalFileRecordReader = reader.rowsOptions(options); } - nextRecord = null; - recordReader.close(); } - } else { - nextRecord = null; - recordReader.close(); } } @@ -339,9 +531,16 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, ) throws IOException { long rowLength = 0; long rowOffset = 0; + //this would usually be at block boundary long offset = options.getOffset(); + //this would usually be at block boundary long maxOffset = options.getMaxOffset(); boolean isTail = true; + /*options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't + * necessarily match stripe boundary. So we want to come up with minKey to be the 1st row of the + * first stripe that starts after getOffset() and maxKey to be the last row of the stripe that + * contains getMaxOffset(). This breaks if getOffset() and getMaxOffset() are inside the same + * stripe*/ for(StripeInformation stripe: reader.getStripes()) { if (offset > stripe.getOffset()) { rowOffset += stripe.getNumberOfRows(); @@ -354,7 +553,7 @@ private void discoverOriginalKeyBounds(Reader reader, int bucket, } if (rowOffset > 0) { minKey = new RecordIdentifier(0, bucket, rowOffset - 1); - } + }//TODO: DOH! shouldn't this consider copy_N files? - done in OriginalReaderPair() but need to refactor if (!isTail) { maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1); } @@ -416,6 +615,54 @@ private void discoverKeyBounds(Reader reader, return result; } + static class Options { + private int copyIndex = 0; + private boolean isCompacting = false; + private Path bucketPath; + /** + * Partition folder (Table folder if not partitioned) + */ + private Path rootPath; + Options copyIndex(int copyIndex) { + assert copyIndex >= 0;//??? + this.copyIndex = copyIndex; + return this; + } + Options isCompacting(boolean isCompacting) { + //todo:maybe this is not needed since we have collapseEvents - actually NO + //minor compaction doesn't collapse; but Minor doesn't use a base file + this.isCompacting = isCompacting; + return this; + } + Options bucketPath(Path bucketPath) { + this.bucketPath = bucketPath; + return this; + } + Options rootPath(Path rootPath) { + this.rootPath = rootPath; + return this; + } + int getCopyIndex() { + return copyIndex; + } + boolean isCompacting() { + return isCompacting; + } + Path getBucketPath() { + return bucketPath; + } + Path getRootPath() { return rootPath; } + } +/* OrcRawRecordMerger(Configuration conf, + boolean collapseEvents, + Reader reader, + boolean isOriginal, + int bucket, + ValidTxnList validTxnList, + Reader.Options options, + Path[] deltaDirectory) throws IOException { + this(conf, collapseEvents, reader, isOriginal, bucket, validTxnList, options, deltaDirectory, null); + }*/ /** * Create a reader that merge sorts the ACID events together. * @param conf the configuration @@ -433,9 +680,9 @@ private void discoverKeyBounds(Reader reader, int bucket, ValidTxnList validTxnList, Reader.Options options, - Path[] deltaDirectory) throws IOException { + Path[] deltaDirectory, Options mergerOptions) throws IOException { this.conf = conf; - this.collapse = collapseEvents; + this.collapse = collapseEvents;//todo: this already tells us if Compacting - NO - minor compaction doesn't collapse this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; @@ -462,20 +709,29 @@ private void discoverKeyBounds(Reader reader, // use the min/max instead of the byte range ReaderPair pair; ReaderKey key = new ReaderKey(); + /*1. move discoverKeyBounds into ReaderPair - this makes testing difficult + * 2. make sure you have access to root partition path + * 3. inside OriginalReaderPair do the work for compaction or for read + * 4. note that OrcInputFormat.getReader() assumes that root is at certain distance from file + * which may not be the case - need to look at OrcInputFormat - pass it through OrcSplit? where can we access the metastore?*/ if (isOriginal) { options = options.clone(); pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey, - options); + options, mergerOptions, conf, validTxnList); + baseReader = ((OriginalReaderPair)pair).originalFileRecordReader; + minKey = pair.minKey; + maxKey = pair.maxKey; } else { pair = new ReaderPair(key, reader, bucket, minKey, maxKey, eventOptions, 0); + baseReader = pair.recordReader; } - + pair.advnaceToMinKey(); // if there is at least one record, put it in the map if (pair.nextRecord != null) { readers.put(key, pair); } - baseReader = pair.recordReader; +// baseReader = pair.recordReader; } // we always want to read all of the deltas @@ -504,6 +760,7 @@ private void discoverKeyBounds(Reader reader, ReaderPair deltaPair; deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); + deltaPair.advnaceToMinKey(); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -648,6 +905,7 @@ public void close() throws IOException { @Override public float getProgress() throws IOException { + //this is not likely to do the right thing for Compaction of "original" files when there are copyN files return baseReader == null ? 1 : baseReader.getProgress(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..0cdb4cec47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -360,8 +360,11 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + //todo: mergerOptions are incomplete here - maybe not. I think only applies when we no longer have isOriginal files + OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket, - validTxnList, readerOptions, deleteDeltas); + validTxnList, readerOptions, deleteDeltas, + mergerOptions); this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey(); this.deleteRecordValue = this.deleteRecords.createValue(); // Initialize the first value in the delete reader. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5b908e8453..9ecb37f63e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3103,7 +3103,7 @@ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, * I'll leave the below loop for now until a better approach is found. */ for (int counter = 1; destFs.exists(destFilePath); counter++) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); } if (isRenameAllowed) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db8bf..3aaa548770 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -251,7 +251,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, // There are original format files for (HdfsFileStatusWithId stat : originalFiles) { Path path = stat.getFileStatus().getPath(); - dirsToSearch.add(path); + dirsToSearch.add(path);//todo: so this (based on getAcidState()) will contain all files including those in subdirs rather than dirs as the name suggests LOG.debug("Adding original file " + path + " to dirs to search"); } // Set base to the location so that the input format reads the original files. @@ -275,6 +275,12 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, su.gatherStats(); } + + /** + * @param baseDir if not null, it's either table/partition root folder or base_xxxx. + * If it's base_xxxx, it's in dirsToSearch, else the actual original files + * (all leaves recursively) are in the dirsToSearch list + */ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List parsedDeltas, @@ -363,7 +369,7 @@ public CompactorInputSplit() { * @param hadoopConf * @param bucket bucket to be processed by this split * @param files actual files this split should process. It is assumed the caller has already - * parsed out the files in base and deltas to populate this list. + * parsed out the files in base and deltas to populate this list. Includes copy_N * @param base directory of the base, or the partition/table location if the files are in old * style. Can be null. * @param deltas directories of the delta files. @@ -377,7 +383,7 @@ public CompactorInputSplit() { this.deltas = deltas; locations = new ArrayList(); - for (Path path : files) { + for (Path path : files) {//todo: here we still have both 000001_0 and 000001_0_copy_1 but this is not propagated as part of the split FileSystem fs = path.getFileSystem(hadoopConf); FileStatus stat = fs.getFileStatus(path); length += stat.getLen(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 7c66955e14..b64525af5e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hive.ql; +import javafx.scene.control.Tab; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.output.StringBuilderWriter; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -105,6 +106,7 @@ public void setUp() throws Exception { hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); @@ -880,4 +882,149 @@ public void testMergeCase() throws Exception { runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)"); runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))"); } + + /** + * This is to demonstrate the issue in HIVE-16177 - ROW__IDs are not unique. + */ + @Test + public void testNonAcidToAcidConversion0() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,3)"); + //we should now have non-empty bucket files 000001_0 and 000001_0_copy_1 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + + //check that the files are named as expected and that Acid Reader produces unique ROW__ID.rowId + // across all files making up a logical bucket + List rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile")); + Assert.assertTrue(rs.get(0), rs.get(0).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("/warehouse/nonacidorctbl/000001_0\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\tfile")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("/warehouse/nonacidorctbl/000001_0_copy_1\t1\t3")); + + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + //more BS - we had 2 rows in the table and now we have just 1 after compaction.... + //Compactor doesn't know about copy_N files either - would have to take all of them together for a given bucket + //and read as a single stream in order... but OrcRawRecordMerge takes a Reader as input which is reading 1 file + //so we'd have to implement a new Reader that keeps track of a collection of inner readers and chains them + //together.... + //Worse yet, is that there is no guarantee in Hive that all data files in a partition are 1st level children + //some may be in subdirs, according to Sergey: + //[5:17 PM] Sergey Shelukhin: 1) list bucketing + //[5:18 PM] Sergey Shelukhin: 2) any time, if the MR recursive-whatever setting is enabled + //[5:18 PM] Sergey Shelukhin: 3) iirc Hive can produce it sometimes from unions but I'm not sure + //try ctas and insert with union in the query on Tez + Assert.assertEquals("wrong RS cardinality",2, rs.size()); + Assert.assertTrue(rs.get(1), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile")); + Assert.assertTrue(rs.get(0), rs.get(0).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("warehouse/nonacidorctbl/base_-9223372036854775808/bucket_00001\t1\t2")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\tfile")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("warehouse/nonacidorctbl/base_-9223372036854775808/bucket_00001\t1\t3")); + } + @Test + public void testNonAcidToAcidConversion01() throws Exception { + //create 2 rows in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)"); + //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)"); + //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + List rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL); + //create a couple of delta directories + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)"); + + //make sure we assign correct Ids + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); +// Assert.assertEquals("", 10, rs.size()); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } + + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); +// Assert.assertEquals("", 10, rs.size()); +/* + {"transactionid":0,"bucketid":0,"rowid":0} 0 13 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000000_0_copy_1 + {"transactionid":0,"bucketid":0,"rowid":1} 0 12 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000000_0_copy_1 + {"transactionid":0,"bucketid":1,"rowid":0} 1 3 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0 + {"transactionid":0,"bucketid":1,"rowid":1} 1 2 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0 + {"transactionid":0,"bucketid":1,"rowid":2} 1 5 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0_copy_1 + {"transactionid":0,"bucketid":1,"rowid":3} 1 4 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0_copy_1 + {"transactionid":0,"bucketid":1,"rowid":4} 1 6 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0_copy_2 + {"transactionid":15,"bucketid":0,"rowid":0} 0 15 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000000_0_copy_1 + {"transactionid":15,"bucketid":1,"rowid":0} 1 16 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0_copy_2 + {"transactionid":15,"bucketid":1,"rowid":0} 1 16 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0_copy_1 + {"transactionid":15,"bucketid":1,"rowid":0} 1 16 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000001_0 + {"transactionid":16,"bucketid":0,"rowid":0} 0 17 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495134475362/warehouse/nonacidorctbl/000000_0_copy_1 + */ + //check we have the right IDs + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + + //make sure they are the same before and after compaction + + } + @Test + public void testNonAcidToAcidConversion02() throws Exception { + //create 1 rows in a file 000001_0 (and an empty 000000_0) + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + //create 1 rows in a file 000000_0_copy1 and 1 rows in a file 000001_0_copy1 + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)"); + + //convert the table to Acid + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + //create a couple of delta directories + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)"); + + //make sure we assign correct Ids + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); +// Assert.assertEquals("", 10, rs.size()); + LOG.warn("before compact"); + for(String s : rs) { + LOG.warn(s); + } +/* +{"transactionid":0,"bucketid":0,"rowid":0} 0 12 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/000000_0_copy_1 +{"transactionid":0,"bucketid":1,"rowid":0} 1 2 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/000001_0 +{"transactionid":0,"bucketid":1,"rowid":1} 1 5 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/000001_0_copy_1 +{"transactionid":15,"bucketid":1,"rowid":0} 1 17 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/000001_0_copy_1 +{"transactionid":15,"bucketid":1,"rowid":0} 1 17 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/000001_0 + */ + + //run Compaction + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"); +// Assert.assertEquals("", 10, rs.size()); +/* +{"transactionid":0,"bucketid":0,"rowid":1} 1 2 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00000 +{"transactionid":0,"bucketid":0,"rowid":2} 1 5 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00000 +{"transactionid":0,"bucketid":1,"rowid":0} 0 12 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00001 +{"transactionid":0,"bucketid":1,"rowid":1} 1 2 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00001 +{"transactionid":0,"bucketid":1,"rowid":2} 1 5 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00001 +{"transactionid":15,"bucketid":1,"rowid":0} 1 17 file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1495152543428/warehouse/nonacidorctbl/base_0000015/bucket_00001 + */ + //check we have the right IDs + LOG.warn("after compact"); + for(String s : rs) { + LOG.warn(s); + } + + //make sure they are the same before and after compaction + + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f659..1f0587cff7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1499,25 +1499,127 @@ public void testMerge3() throws Exception { } /** * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception */ - @Ignore @Test - public void testMultiInsert() throws Exception { + public void testMultiInsert1() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + //supposedly this gives non-unique IDs + List r = runStatementOnDriver(" from data1 " + //send everything to bucket 0 + "insert into srcpart partition(z) select 0,0,x,x " + + "insert into srcpart partition(z=1) select 0,0,17"); + r = runStatementOnDriver("select * from srcpart order by c,z"); + //this is what we should get but we end up loosing 1 row +// int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1},{0,0,17,1}}; + int[][] rExpected = {{0,0,1,1},{0,0,2,2},{0,0,3,3},{0,0,17,1},{0,0,17,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + So this is what we get. All the data is in there. 1 leg of multi-insert has statementId=0, the other statementId=1 + I don't know why we have an empty bucket for the Static Partition case. + Partition z=1 has 2 rows with ROW_ID(1,0,0). + + This wouldn't happen in multi-statement txn case since statements are sequential, and N+1st statement "looks back" to + find the rowId from Nth statement. + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":17}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 606] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=2/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489022258756/warehouse/srcpart/z=3/delta_0000001_0000001_0001/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":3}} +________________________________________________________________________________________________________________________ +*/ + } + @Test + public void testMultiInsert2() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); + runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + + "partitioned by (z int) clustered by (a) into 2 buckets " + + "stored as orc tblproperties('transactional'='true')"); + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); + + runStatementOnDriver("insert into data1 values (1),(2),(3)"); +// runStatementOnDriver("insert into data2 values (4),(5),(6)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + Listr = runStatementOnDriver("from data1 " + + "insert into srcpart partition(z) select 0,0,x,1 where x % 2 = 0 " + + "insert into srcpart partition(z=1) select 2,0,x where x % 2 = 1"); + r = runStatementOnDriver("select * from srcpart order by a,b,c,z"); + int[][] rExpected = {{0,0,2,1},{2,0,1,1},{2,0,3,1}}; + Assert.assertEquals(stringifyValues(rExpected), r); + /* + We get correct results and ROW_IDs look correct - but this is a fluke (I guess the optimizer figures out that DP branch is also actually SP with z=1 (change 1 -> x and you end up with dup ROW_IDs) + (maybe some other reason: either way OrcRecordUpdate.findRowIdOffsetForInsert() ends up doing the right thing) + Once again, there is an empty bucket in static part branch + + ekoifman:apache-hive-2.2.0-SNAPSHOT-bin ekoifman$ ./bin/hive --orcfiledump -j -p -d /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1 +SLF4J: Class path contains multiple SLF4J bindings. +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hiverwgit/packaging/target/apache-hive-2.2.0-SNAPSHOT-bin/apache-hive-2.2.0-SNAPSHOT-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: Found binding in [jar:file:/Users/ekoifman/dev/hwxhadoop/hadoop-dist/target/hadoop-2.7.3.2.6.0.0-SNAPSHOT/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] +SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. +SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0000/bucket_00000 [length: 619] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":0,"currentTransaction":1,"row":{"_col0":0,"_col1":0,"_col2":2}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00000 [length: 625] +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":1,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":3}} +{"operation":0,"originalTransaction":1,"bucket":0,"rowId":2,"currentTransaction":1,"row":{"_col0":2,"_col1":0,"_col2":1}} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands2-1489026761787/warehouse/srcpart/z=1/delta_0000001_0000001_0001/bucket_00001 [length: 204] +________________________________________________________________________________________________________________________ +*/ + + } + @Test + public void testMultiInsert3() throws Exception { + runStatementOnDriver("drop table if exists srcpart"); runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + "partitioned by (z int) clustered by (a) into 2 buckets " + "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); + runStatementOnDriver("create temporary table if not exists data2 (x int)"); runStatementOnDriver("insert into data1 values (1),(2),(3)"); // runStatementOnDriver("insert into data2 values (4),(5),(6)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + //this produces 3 rows (as it should I'd think) + List r = runStatementOnDriver("select 0,0,1,1 from data1 where x % 2 = 0 UNION ALL select 2,0.0,1,1 from data1 where x % 2 = 1"); } /** * Investigating DP and WriteEntity, etc diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index bb7985711f..41649a7d40 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -910,7 +910,7 @@ public void testEtlCombinedStrategy() throws Exception { MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException { OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path); return OrcInputFormat.determineSplitStrategies(combineCtx, context, - adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } @@ -924,7 +924,7 @@ public void testEtlCombinedStrategy() throws Exception { OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException { OrcInputFormat.AcidDirInfo adi = gen.call(); return OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 1ce1bfb1dd..f82163991d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -189,6 +189,7 @@ public void testReaderPair() throws Exception { RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -215,6 +216,7 @@ public void testReaderPairNoMin() throws Exception { ReaderPair pair = new ReaderPair(key, reader, 20, null, null, new Reader.Options(), 0); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -291,7 +293,8 @@ public void testOriginalReaderPair() throws Exception { RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); boolean[] includes = new boolean[]{true, true}; ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, - new Reader.Options().include(includes)); + new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(new Path("db/table/part")), null, new ValidReadTxnList()); + pair.advnaceToMinKey(); RecordReader recordReader = pair.recordReader; assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -320,7 +323,8 @@ public void testOriginalReaderPairNoMin() throws Exception { ReaderKey key = new ReaderKey(); Reader reader = createMockOriginalReader(); ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, - new Reader.Options()); + new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(new Path("db/table/part")), null, new ValidReadTxnList()); + pair.advnaceToMinKey(); assertEquals("first", value(pair.nextRecord)); assertEquals(0, key.getTransactionId()); assertEquals(10, key.getBucketId()); @@ -423,7 +427,7 @@ public void testNewBase() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, false, 10, createMaximalTxnList(), - new Reader.Options().range(1000, 1000), null); + new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options()); RecordReader rr = merger.getCurrentReader().recordReader; assertEquals(0, merger.getOtherReaders().size()); @@ -531,7 +535,7 @@ public void testEmpty() throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); RecordIdentifier key = merger.createKey(); OrcStruct value = merger.createValue(); assertEquals(false, merger.next(key, value)); @@ -606,7 +610,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); assertEquals(null, merger.getMinKey()); assertEquals(null, merger.getMaxKey()); RecordIdentifier id = merger.createKey(); @@ -681,7 +685,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { // make a merger that doesn't collapse events merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.UPDATE_OPERATION, @@ -780,7 +784,7 @@ private void testNewBaseAndDelta(boolean use130Format) throws Exception { merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, txns, new Reader.Options(), - AcidUtils.getPaths(directory.getCurrentDirectories())); + AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false)); for(int i=0; i < values.length; ++i) { assertEquals(true, merger.next(id, event)); LOG.info("id = " + id + "event = " + event); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 6bf13129b8..73bc1ab852 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -182,7 +182,7 @@ public void setup() throws Exception { OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null); OrcInputFormat.AcidDirInfo adi = gen.call(); List> splitStrategies = OrcInputFormat.determineSplitStrategies( - null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, + null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas, null, null, true); assertEquals(1, splitStrategies.size()); List splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 0483e91c4b..48a6017373 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.TreeMap; import javax.security.auth.Subject; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; @@ -735,6 +736,10 @@ public FileStatus getFileStatus() { public Long getFileId() { return fileId; } + @Override + public int compareTo(HdfsFileStatusWithId o) { + return getFileStatus().compareTo(o.getFileStatus()); + } } @Override diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index c280d49227..dbbb9d0c98 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -269,7 +269,7 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor */ public void hflush(FSDataOutputStream stream) throws IOException; - public interface HdfsFileStatusWithId { + public interface HdfsFileStatusWithId extends Comparable{ public FileStatus getFileStatus(); public Long getFileId(); }