diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6693134..04dcb3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1201,7 +1201,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ - private static final String COPY_KEYWORD = "_copy_"; // copy keyword + public static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827..7ec55ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -51,6 +51,10 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + /** + * Judging by {@link org.apache.hadoop.hive.ql.metadata.Hive.mvFile()} _copy_N starts with 1 + */ + private int copyNumber; private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -180,6 +184,18 @@ public Options bucket(int bucket) { } /** + * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket + * file. + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + * @param copyNumber the number of the copy ( > 0) + * @return this + */ + public Options copyNumber(int copyNumber) { + this.copyNumber = copyNumber; + return this; + } + + /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names * @return this diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index da00bb3..d9bc829 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -48,6 +48,8 @@ import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -99,6 +101,11 @@ public boolean accept(Path path) { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** + * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read + * a "delta" dir written by a real Acid write - cannot have any copies + */ + public static final PathFilter originalBucketFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -113,6 +120,11 @@ private AcidUtils() { private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+"); public static final PathFilter hiddenFileFilter = new PathFilter(){ @Override @@ -243,7 +255,21 @@ static long parseBase(Path path) { .maximumTransactionId(0) .bucket(bucket) .writingBase(true); - } else if (filename.startsWith(BUCKET_PREFIX)) { + } + else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + //arguably this should throw since we don't know how to handle this in OrcRawRecordMerger + int bucket = + Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); + result + .setOldStyle(true) + .minimumTransactionId(0) + .maximumTransactionId(0) + .bucket(bucket) + .copyNumber(copyNumber) + .writingBase(true); + } + else if (filename.startsWith(BUCKET_PREFIX)) { int bucket = Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { @@ -269,6 +295,7 @@ static long parseBase(Path path) { .bucket(bucket); } } else { + //why is this useful? what are we going to do with bucketId = -1? result.setOldStyle(true).bucket(-1).minimumTransactionId(0) .maximumTransactionId(0); } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ed854bf..aeb6019 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3035,14 +3035,14 @@ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, int counter = 1; if (!isRenameAllowed || isBlobStoragePath) { while (destFs.exists(destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } if (isRenameAllowed) { while (!destFs.rename(sourcePath, destFilePath)) { - destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : "")); + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); counter++; } } else if (isSrcLocal) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index b9df674..42f0d66 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -898,4 +898,20 @@ public void run() { //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired Assert.assertTrue(System.currentTimeMillis() > start + 2); } + @Test + public void testNonAcidToAcidConversion0() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,3)"); + //we should now have bucket files 000001_0 and 000001_0_copy_1 + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + + //why does this throw in OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(): ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT + //somehow it gets CompinedInputFormat... with Order by Actually it never goes into raiseAcidTablesMustBeReadWithAcidReaderException() w/o order by +// List rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL + " order by b"); + List rs = runStatementOnDriver("select ROW__ID, INPUT__FILE__NAME, a, b from " + Table.NONACIDORCTBL); + Assert.assertTrue(rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(0).endsWith("/warehouse/nonacidorctbl/000001_0\t1\t2")); + Assert.assertTrue(rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\tfile:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands")); + Assert.assertTrue(rs.get(1).endsWith("/warehouse/nonacidorctbl/000001_0_copy_1\t1\t3")); + } }