diff --git packaging/pom.xml packaging/pom.xml
index cae1fa9822..b58338fc60 100644
--- packaging/pom.xml
+++ packaging/pom.xml
@@ -284,7 +284,12 @@
org.apache.hive
- hive-upgrade-acid
+ hive-pre-upgrade
+ ${project.version}
+
+
+ org.apache.hive
+ hive-post-upgrade
${project.version}
diff --git pom.xml pom.xml
index 4ca963e247..16481ad40d 100644
--- pom.xml
+++ pom.xml
@@ -997,11 +997,6 @@
slf4j-api
${slf4j.version}
-
- org.apache.hive
- hive-upgrade-acid
- ${project.version}
-
org.mockito
mockito-all
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 7fce67fc3e..bf4e74d3dd 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -209,6 +209,9 @@ private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSche
BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
}
else {
+ /**
+ * hmm this doesn't seem to match {@link #ORIGINAL_PATTERN}
+ */
return new Path(subdir,
String.format(BUCKET_DIGITS, bucket));
}
@@ -300,6 +303,49 @@ public static Path createFilename(Path directory,
return createBucketFile(new Path(directory, subdir), options.getBucketId());
}
+ /**
+ * Represents bucketId and copy_N suffix
+ */
+ public static final class BucketMetaData {
+ private static final BucketMetaData INVALID = new BucketMetaData(-1, 0);
+ /**
+ * @param bucketFileName {@link #ORIGINAL_PATTERN} or {@link #ORIGINAL_PATTERN_COPY}
+ */
+ public static BucketMetaData parse(String bucketFileName) {
+ if (ORIGINAL_PATTERN.matcher(bucketFileName).matches()) {
+ int bucketId = Integer
+ .parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_')));
+ return new BucketMetaData(bucketId, 0);
+ }
+ else if(ORIGINAL_PATTERN_COPY.matcher(bucketFileName).matches()) {
+ int copyNumber = Integer.parseInt(
+ bucketFileName.substring(bucketFileName.lastIndexOf('_') + 1));
+ int bucketId = Integer
+ .parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_')));
+ return new BucketMetaData(bucketId, copyNumber);
+ }
+ else if (bucketFileName.startsWith(BUCKET_PREFIX)) {
+ return new BucketMetaData(Integer
+ .parseInt(bucketFileName.substring(bucketFileName.indexOf('_') + 1)), 0);
+ }
+ return INVALID;
+ }
+ public static BucketMetaData parse(Path bucketFile) {
+ return parse(bucketFile.getName());
+ }
+ /**
+ * -1 if non-standard file name
+ */
+ public final int bucketId;
+ /**
+ * 0 means no copy_N suffix
+ */
+ public final int copyNumber;
+ private BucketMetaData(int bucketId, int copyNumber) {
+ this.bucketId = bucketId;
+ this.copyNumber = copyNumber;
+ }
+ }
/**
* Get the write id from a base directory name.
* @param path the base directory name
@@ -320,13 +366,7 @@ public static long parseBase(Path path) {
* @return - bucket id
*/
public static int parseBucketId(Path bucketFile) {
- String filename = bucketFile.getName();
- if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
- return Integer.parseInt(filename.substring(0, filename.indexOf('_')));
- } else if (filename.startsWith(BUCKET_PREFIX)) {
- return Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
- }
- return -1;
+ return BucketMetaData.parse(bucketFile).bucketId;
}
/**
@@ -341,7 +381,7 @@ public static int parseBucketId(Path bucketFile) {
Configuration conf) throws IOException {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
- int bucket = parseBucketId(bucketFile);
+ int bucket = parseBucketId(bucketFile);//todo: use BucketMetaData
if (ORIGINAL_PATTERN.matcher(filename).matches()) {
result
.setOldStyle(true)
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
index dfd4452ad4..2c969f5cc8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
@@ -105,8 +105,8 @@ public int encode(AcidOutputFormat.Options options) {
private static final int NUM_BUCKET_ID_BITS = 12;
private static final int NUM_STATEMENT_ID_BITS = 12;
private static final int MAX_VERSION = (1 << NUM_VERSION_BITS) - 1;
- private static final int MAX_BUCKET_ID = (1 << NUM_BUCKET_ID_BITS) - 1;
- private static final int MAX_STATEMENT_ID = (1 << NUM_STATEMENT_ID_BITS) - 1;
+ public static final int MAX_BUCKET_ID = (1 << NUM_BUCKET_ID_BITS) - 1;
+ public static final int MAX_STATEMENT_ID = (1 << NUM_STATEMENT_ID_BITS) - 1;
public static BucketCodec determineVersion(int bucket) {
assert 7 << 29 == BucketCodec.TOP3BITS_MASK;
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index a4d34a7513..c8a768bf01 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -897,12 +897,14 @@ public void testNonAcidToAcidConversion01() throws Exception {
rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
Assert.assertTrue(rs.get(3),
- rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000001_0000001_0000/bucket_00001"));
+ rs.get(3).startsWith("{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3)
+ .endsWith("nonacidorctbl/delta_10000001_10000001_0000/bucket_00001"));
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
- rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID");
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +
+ Table.NONACIDORCTBL + " order by ROW__ID");
LOG.warn("after compact");
for(String s : rs) {
LOG.warn(s);
@@ -918,8 +920,9 @@ public void testNonAcidToAcidConversion01() throws Exception {
rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
Assert.assertTrue(rs.get(3),
- rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
+ rs.get(3)
+ .startsWith("{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_10000001/bucket_00001"));
//make sure they are the same before and after compaction
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a547a84da9..7c201b6b34 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -379,14 +379,14 @@ public void testNonAcidToAcidConversion02() throws Exception {
*/
String[][] expected = {
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t0\t13", "bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t15", "bucket_00001"},
- {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t0\t17", "bucket_00001"},
- {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t0\t120", "bucket_00001"},
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":1}\t0\t15", "bucket_00001"},
+ {"{\"writeid\":10000003,\"bucketid\":536936448,\"rowid\":0}\t0\t17", "bucket_00001"},
+ {"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t0\t120", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":6}\t1\t6", "bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
@@ -773,11 +773,11 @@ public void testNonAcidToAcidConversion3() throws Exception {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_10000001_10000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_10000002_10000002_0000", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
@@ -786,7 +786,7 @@ public void testNonAcidToAcidConversion3() throws Exception {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals("delete_delta_10000001_10000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -833,7 +833,7 @@ public void testNonAcidToAcidConversion3() throws Exception {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000002", status[i].getPath().getName());
+ Assert.assertEquals("base_10000002", status[i].getPath().getName());
Assert.assertEquals(2, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
}
@@ -859,7 +859,7 @@ public void testNonAcidToAcidConversion3() throws Exception {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000002", status[0].getPath().getName());
+ Assert.assertEquals("base_10000002", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(2, buckets.length);
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 11c59309fb..fb88f25285 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -263,12 +263,18 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
//from Load Data into acid converted table
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"},
- {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"},
- {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "t/delta_10000001_10000001_0000/000000_0"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
+ "t/delta_10000001_10000001_0000/000000_0"},
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t2\t2",
+ "t/delta_10000001_10000001_0000/000001_0"},
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "t/delta_10000001_10000001_0000/000001_0"},
+ {"{\"writeid\":10000001,\"bucketid\":537001984,\"rowid\":0}\t4\t4",
+ "t/delta_10000001_10000001_0000/000002_0"},
+ {"{\"writeid\":10000001,\"bucketid\":537001984,\"rowid\":1}\t5\t5",
+ "t/delta_10000001_10000001_0000/000002_0"},
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
@@ -279,9 +285,12 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
String[][] expected2 = new String[][] {
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"},
- {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"}
+ {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "t/base_10000002/000000_0"},
+ {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_10000002/000000_0"},
+ {"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+
+ "t/base_10000002/000001_0"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
@@ -291,10 +300,14 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"}
+ {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "t/base_10000003/bucket_00000"},
+ {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
+ "t/base_10000003/bucket_00000"},
+ {"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+ "t/base_10000003/bucket_00001"},
+ {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t9\t9",
+ "t/base_10000003/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
}
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index c15c5a6220..f071531cec 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -375,7 +375,7 @@ logical bucket (tranche)
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00000"},
};
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
checkExpected(rs, expected3,"after converting to acid (no compaction with updates)");
@@ -387,15 +387,24 @@ logical bucket (tranche)
/*Compaction preserves location of rows wrt buckets/tranches (for now)*/
String expected4[][] = {
- {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000002/bucket_00002"},
- {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000002/bucket_00002"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t5\t6", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t9\t10", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/base_0000002/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2",
+ "warehouse/t/base_10000002/bucket_00002"},
+ {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4",
+ "warehouse/t/base_10000002/bucket_00002"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t5\t6",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t9\t10",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60",
+ "warehouse/t/base_10000002/bucket_00000"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88",
+ "warehouse/t/base_10000002/bucket_00000"},
};
checkExpected(rs, expected4,"after major compact");
}
@@ -467,15 +476,24 @@ public void testToAcidConversion02() throws Exception {
* Also check the file name (only) after compaction for completeness
*/
String[][] expected = {
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13", "bucket_00000", "000000_0_copy_1"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000", "bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000", "bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000", "bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "bucket_00000", "000000_0"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4", "bucket_00000", "000000_0_copy_1"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5", "bucket_00000", "000000_0_copy_1"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6", "bucket_00000", "000000_0_copy_2"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t16", "bucket_00000", "bucket_00000"}
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t0\t13",
+ "bucket_00000", "000000_0_copy_1"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t0\t15",
+ "bucket_00000", "bucket_00000"},
+ {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t0\t17",
+ "bucket_00000", "bucket_00000"},
+ {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t0\t120",
+ "bucket_00000", "bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "bucket_00000", "000000_0"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t1\t4",
+ "bucket_00000", "000000_0_copy_1"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t1\t5",
+ "bucket_00000", "000000_0_copy_1"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t1\t6",
+ "bucket_00000", "000000_0_copy_2"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":1}\t1\t16",
+ "bucket_00000", "bucket_00000"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
@@ -620,7 +638,7 @@ public void testNonAcidToAcidVectorzied() throws Exception {
query = "select ROW__ID, b from T where b > 0 order by a";
rs = runStatementOnDriver(query);
String[][] expected4 = {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}","17"},
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}","17"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}","4"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}","6"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}","8"},
@@ -641,11 +659,16 @@ public void testNonAcidToAcidVectorzied() throws Exception {
query = "select ROW__ID, a, b, INPUT__FILE__NAME from T where b > 0 order by a, b";
rs = runStatementOnDriver(query);
String[][] expected5 = {//the row__ids are the same after compaction
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "warehouse/t/base_0000001/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4", "warehouse/t/base_0000001/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_0000001/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8", "warehouse/t/base_0000001/bucket_00000"},
- {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_0000001/bucket_00000"}
+ {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t17",
+ "warehouse/t/base_10000001/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4",
+ "warehouse/t/base_10000001/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6",
+ "warehouse/t/base_10000001/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8",
+ "warehouse/t/base_10000001/bucket_00000"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10",
+ "warehouse/t/base_10000001/bucket_00000"}
};
checkExpected(rs, expected5, "After major compaction");
//vectorized because there is INPUT__FILE__NAME
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index c3d99c3926..56da1151cc 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -39,6 +40,7 @@
import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
import org.apache.hadoop.hive.metastore.events.PreEventContext;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
@@ -190,6 +192,15 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw
}
}
checkSorted(newTable);
+ if(TxnUtils.isAcidTable(newTable) && !TxnUtils.isAcidTable(oldTable)) {
+ /* we just made an existing table full acid which wasn't acid before and it passed all checks
+ initialize the Write ID sequence so that we can handle assigning ROW_IDs to 'original'
+ files already present in the table. */
+ TxnStore t = TxnUtils.getTxnStore(getConf());
+ //For now assume no partition may have > 10M files. Perhaps better to count them.
+ t.seedWriteIdOnAcidConversion(new InitializeTableWriteIdsRequest(newTable.getDbName(),
+ newTable.getTableName(), 10000000));
+ }
}
private void checkSorted(Table newTable) throws MetaException {
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/api/InitializeTableWriteIdsRequest.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/api/InitializeTableWriteIdsRequest.java
new file mode 100644
index 0000000000..d56b66a8f6
--- /dev/null
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/api/InitializeTableWriteIdsRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.api;
+
+public class InitializeTableWriteIdsRequest {
+ private final String dbName;
+ private final String tblName;
+ private final long seeWriteId;
+ public InitializeTableWriteIdsRequest(String dbName, String tblName, long seeWriteId) {
+ assert dbName != null;
+ assert tblName != null;
+ assert seeWriteId > 1;
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.seeWriteId = seeWriteId;
+ }
+ public String getDbName() {
+ return dbName;
+ }
+ public String getTblName() {
+ return tblName;
+ }
+
+ public long getSeeWriteId() {
+ return seeWriteId;
+ }
+}
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index d1b0d32614..f25e77acd7 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -67,54 +67,7 @@
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.HiveObjectType;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.TxnState;
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
@@ -1537,7 +1490,50 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds
return allocateTableWriteIds(rqst);
}
}
+ @Override
+ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst)
+ throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+ //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
+ //for this table. It also has a unique index in case 'should not' is violated
+
+ // First allocation of write id should add the table to the next_write_id meta table
+ // The initial value for write id should be 1 and hence we add 1 with number of write ids
+ // allocated here
+ String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+ + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," +
+ Long.toString(rqst.getSeeWriteId() + 1) + ")";
+ LOG.debug("Going to execute insert <" + s + ">");
+ stmt.execute(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "seedWriteIdOnAcidConversion(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(null, stmt, dbConn);
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ seedWriteIdOnAcidConversion(rqst);
+ }
+ }
@Override
@RetrySemantics.SafeToRetry
public void performWriteSetGC() {
diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 4695f0deef..ef447e1c99 100644
--- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -153,6 +153,12 @@ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException;
+ /**
+ * Called on conversion of existing table to full acid. Sets initial write ID to a high
+ * enough value so that we can assign unique ROW__IDs to data in existing files.
+ */
+ void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException;
+
/**
* Obtain a lock.
* @param rqst information on the lock to obtain. If the requester is part of a transaction
diff --git upgrade-acid/pom.xml upgrade-acid/pom.xml
index 77cd24094d..b5443032fd 100644
--- upgrade-acid/pom.xml
+++ upgrade-acid/pom.xml
@@ -17,7 +17,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
+
org.apache
apache
18
@@ -29,7 +32,7 @@
4.0.0-SNAPSHOT
hive-upgrade-acid
Hive Upgrade Acid
- jar
+ pom
@@ -47,250 +50,10 @@
${basedir}/checkstyle/
2.17
2.20.1
-
-
- ${project.build.directory}/testconf
- file://
- ${project.basedir}/src/test/resources
- ${project.build.directory}/tmp
- ${project.build.directory}/warehouse
- file://
- 1
- true
-
-
-
- commons-cli
- commons-cli
- 1.2
- provided
-
-
- org.apache.hive
- hive-metastore
- 2.3.3
- provided
-
-
- org.apache.hive
- hive-exec
- 2.3.3
- provided
-
-
- org.apache.hadoop
- hadoop-common
- 2.7.2
- provided
-
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-common
- 2.7.2
- provided
-
-
- org.apache.orc
- orc-core
- 1.3.3
- provided
-
-
-
-
-
-
-
- ${basedir}/src/main/resources
-
- package.jdo
-
-
-
+
+ pre-upgrade
+ post-upgrade
+
-
-
-
- org.apache.maven.plugins
- maven-antrun-plugin
- ${maven.antrun.plugin.version}
-
-
- ant-contrib
- ant-contrib
- ${ant.contrib.version}
-
-
- ant
- ant
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- ${maven.checkstyle.plugin.version}
-
-
- org.codehaus.mojo
- exec-maven-plugin
- ${maven.exec.plugin.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-antrun-plugin
-
-
- setup-test-dirs
- process-test-resources
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- setup-metastore-scripts
- process-test-resources
-
- run
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-failsafe-plugin
- 2.20.1
-
-
-
- integration-test
- verify
-
-
-
-
- true
- false
- -Xmx2048m
- false
-
- true
- ${test.tmp.dir}
- ${test.tmp.dir}
- true
-
-
- ${log4j.conf.dir}
-
- ${skipITests}
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
- ${maven.surefire.version}
-
- true
- false
- ${test.forkcount}
- -Xmx2048m
- false
-
- ${project.build.directory}
- true
- ${derby.version}
- ${test.tmp.dir}/derby.log
-
- ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties
- true
- ${test.tmp.dir}
-
- jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true
- false
- ${test.tmp.dir}
- ${test.warehouse.scheme}${test.warehouse.dir}
-
-
-
- ${log4j.conf.dir}
- ${test.conf.dir}
-
- ${test.conf.dir}/conf
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- test-jar
-
-
-
-
-
-
\ No newline at end of file
diff --git upgrade-acid/post-upgrade/pom.xml upgrade-acid/post-upgrade/pom.xml
new file mode 100644
index 0000000000..c0a901a4a5
--- /dev/null
+++ upgrade-acid/post-upgrade/pom.xml
@@ -0,0 +1,289 @@
+
+
+
+
+
+ org.apache.hive
+ hive-upgrade-acid
+ 4.0.0-SNAPSHOT
+ ../pom.xml
+
+
+
+ 4.0.0
+ 4.0.0-SNAPSHOT
+ hive-post-upgrade
+ Hive Post Upgrade Acid
+ jar
+
+
+
+ 3.1.0
+ 1.2
+ 1.5.0
+
+ ../..
+
+
+ ${project.build.directory}/testconf
+ file://
+ ${project.basedir}/src/test/resources
+ ${project.build.directory}/tmp
+ ${project.build.directory}/warehouse
+ file://
+ 1
+ true
+
+
+
+
+ commons-cli
+ commons-cli
+ ${commons-cli.version}
+ provided
+
+
+ org.apache.hive
+ hive-metastore
+ ${project.version}
+ provided
+
+
+ org.apache.hive
+ hive-exec
+ ${project.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ ${hadoop.version}
+ provided
+
+
+ org.apache.orc
+ orc-core
+ ${orc.version}
+ provided
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ package.jdo
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ ${maven.antrun.plugin.version}
+
+
+ ant-contrib
+ ant-contrib
+ ${ant.contrib.version}
+
+
+ ant
+ ant
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven.checkstyle.plugin.version}
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${maven.exec.plugin.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ setup-test-dirs
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ setup-metastore-scripts
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.20.1
+
+
+
+ integration-test
+ verify
+
+
+
+
+ true
+ false
+ -Xmx2048m
+ false
+
+ true
+ ${test.tmp.dir}
+ ${test.tmp.dir}
+ true
+
+
+ ${log4j.conf.dir}
+
+ ${skipITests}
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven.surefire.version}
+
+ true
+ false
+ ${test.forkcount}
+ -Xmx2048m
+ false
+
+ ${project.build.directory}
+ true
+ ${derby.version}
+ ${test.tmp.dir}/derby.log
+
+ ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties
+ true
+ ${test.tmp.dir}
+
+ jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true
+ false
+ ${test.tmp.dir}
+ ${test.warehouse.scheme}${test.warehouse.dir}
+
+
+
+ ${log4j.conf.dir}
+ ${test.conf.dir}
+
+ ${test.conf.dir}/conf
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git upgrade-acid/post-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java upgrade-acid/post-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
new file mode 100644
index 0000000000..ba107651fd
--- /dev/null
+++ upgrade-acid/post-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+
+/**
+ * This utility is designed to help with upgrading to Hive 3.0. On-disk layout for transactional
+ * tables has changed in 3.0 and require pre-processing before upgrade to ensure they are readable
+ * by Hive 3.0. Some transactional tables (identified by this utility) require Major compaction
+ * to be run on them before upgrading to 3.0. Once this compaction starts, no more
+ * update/delete/merge statements may be executed on these tables until upgrade is finished.
+ *
+ * Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These
+ * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may
+ * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands.
+ *
+ * This utility works in 2 modes: preUpgrade and postUpgrade.
+ * In preUpgrade mode it has to have 2.x Hive jars on the classpath. It will perform analysis on
+ * existing transactional tables, determine which require compaction and generate a set of SQL
+ * commands to launch all of these compactions.
+ *
+ * Note that depending on the number of tables/partitions and amount of data in them compactions
+ * may take a significant amount of time and resources. The script output by this utility includes
+ * some heuristics that may help estimate the time required. If no script is produced, no action
+ * is needed. For compactions to run an instance of standalone Hive Metastore must be running.
+ * Please make sure hive.compactor.worker.threads is sufficiently high - this specifies the limit
+ * of concurrent compactions that may be run. Each compaction job is a Map-Reduce job.
+ * hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be
+ * submitted.
+ *
+ * In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will
+ * find all the tables that may be made transactional (with ful CRUD support) and generate
+ * Alter Table commands to do so. It will also find all tables that may not support full CRUD
+ * but can be made insert-only transactional tables and generate corresponding Alter Table commands.
+ *
+ * TODO: rename files
+ *
+ * "execute" option may be supplied in both modes to have the utility automatically execute the
+ * equivalent of the generated commands
+ *
+ * "location" option may be supplied followed by a path to set the location for the generated
+ * scripts.
+ */
+public class UpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class);
+ private static final int PARTITION_BATCH_SIZE = 10000;
+ private final Options cmdLineOptions = new Options();
+
+ public static void main(String[] args) throws Exception {
+ UpgradeTool tool = new UpgradeTool();
+ tool.init();
+ CommandLineParser parser = new GnuParser();
+ CommandLine line ;
+ String outputDir = ".";
+ boolean execute = false;
+ try {
+ line = parser.parse(tool.cmdLineOptions, args);
+ } catch (ParseException e) {
+ System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
+ printAndExit(tool);
+ return;
+ }
+ if (line.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ return;
+ }
+ if(line.hasOption("location")) {
+ outputDir = line.getOptionValue("location");
+ }
+ if(line.hasOption("execute")) {
+ execute = true;
+ }
+ LOG.info("Starting with execute=" + execute + ", location=" + outputDir);
+
+ try {
+ String hiveVer = HiveVersionInfo.getShortVersion();
+ if(!hiveVer.startsWith("3.")) {
+ throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " +
+ hiveVer);
+ }
+ tool.performUpgradeInternal(outputDir, execute);
+ }
+ catch(Exception ex) {
+ LOG.error("UpgradeTool failed", ex);
+ throw ex;
+ }
+ }
+ private static void printAndExit(UpgradeTool tool) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ System.exit(1);
+ }
+
+ private void init() {
+ try {
+ cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 3.x " +
+ "cluster. This requires 3.x binaries on the classpath and hive-site.xml."));
+ Option exec = new Option("execute",
+ "Executes commands equivalent to generated scrips");
+ exec.setOptionalArg(true);
+ cmdLineOptions.addOption(exec);
+ cmdLineOptions.addOption(new Option("location", true,
+ "Location to write scripts to. Default is CWD."));
+ }
+ catch(Exception ex) {
+ LOG.error("init()", ex);
+ throw ex;
+ }
+ }
+ /**
+ * todo: this should accept a file of table names to exclude from non-acid to acid conversion
+ * todo: change script comments to a preamble instead of a footer
+ *
+ * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3?
+ * How does this actually get executed?
+ * all other actions are done via embedded JDBC
+ */
+ private void performUpgradeInternal(String scriptLocation, boolean execute)
+ throws HiveException, TException, IOException {
+ HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
+ boolean isAcidEnabled = isAcidEnabled(conf);
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ LOG.debug("Looking for databases");
+ List databases = hms.getAllDatabases();//TException
+ LOG.debug("Found " + databases.size() + " databases to process");
+ List convertToAcid = new ArrayList<>();
+ List convertToMM = new ArrayList<>();
+ Hive db = null;
+ if(execute) {
+ db = Hive.get(conf);
+ }
+
+ for(String dbName : databases) {
+ List tables = hms.getAllTables(dbName);
+ LOG.debug("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+ LOG.debug("processing table " + Warehouse.getQualifiedName(t));
+ if(isAcidEnabled) {
+ //if acid is off post upgrade, you can't make any tables acid - will throw
+ processConversion(t, convertToAcid, convertToMM, hms, db, execute);
+ }
+ /*todo: handle renaming files somewhere*/
+ }
+ }
+ makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+ makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade?
+ //todo: can different tables be in different FileSystems?
+ }
+
+ /**
+ * Actually makes the table transactional
+ */
+ private static void alterTable(Table t, Hive db, boolean isMM)
+ throws HiveException, InvalidOperationException {
+ org.apache.hadoop.hive.ql.metadata.Table metaTable =
+ //clone to make sure new prop doesn't leak
+ new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy());
+ metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ if(isMM) {
+ metaTable.getParameters()
+ .put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only");
+ }
+ db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null);
+ }
+
+ /**
+ * assumes https://issues.apache.org/jira/browse/HIVE-19750 is in
+ * How does this work with Storage Based Auth?
+ * @param p partition root or table root if not partitioned
+ */
+ private static void handleRenameFiles(Table t, Path p, boolean execute, HiveConf conf, boolean isBucketed)
+ throws IOException {
+ if (isBucketed) {
+ /* For bucketed tables we assume that Hive wrote them and 0000M_0 and 0000M_0_copy_8
+ are the only possibilities. Since we can't move files across buckets the only thing we
+ can do is put 0000M_0_copy_N into delta_N_N as 0000M_0.
+ *
+ * If M > 4096 - should error out - better yet, make this table external one - can those be bucketed? don't think so
+ */
+ //Known deltas
+ Map> deltaToFileMap = new HashMap<>();
+ FileSystem fs = FileSystem.get(conf);
+ RemoteIterator iter = fs.listFiles(p, true);
+ Function> makeList = new Function>() {//lambda?
+ @Override
+ public List apply(Integer aVoid) {
+ return new ArrayList<>();
+ }
+ };
+ while (iter.hasNext()) {
+ LocatedFileStatus lfs = iter.next();
+ if (lfs.isDirectory()) {
+ String msg = Warehouse.getQualifiedName(t) + " is bucketed and has a subdirectory: " +
+ lfs.getPath();
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ AcidUtils.BucketMetaData bmd = AcidUtils.BucketMetaData.parse(lfs.getPath());
+ if (bmd.bucketId < 0) {
+ //non-standard file name - don't know what bucket the rows belong to and we can't
+ //rename the file so tha it may end up treated like a different bucket id
+ String msg = "Bucketed table " + Warehouse.getQualifiedName(t) + " contains file " +
+ lfs.getPath() + " with non-standard name";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ } else {
+ if (bmd.bucketId > BucketCodec.MAX_BUCKET_ID) {
+ String msg = "Bucketed table " + Warehouse.getQualifiedName(t) + " contains file " +
+ lfs.getPath() + " with bucketId=" + bmd.bucketId + " that is out of range";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ if (bmd.copyNumber > 0) {
+ deltaToFileMap.computeIfAbsent(bmd.copyNumber, makeList).add(lfs.getPath());
+ }
+ }
+ }
+ for (Map.Entry> ent : deltaToFileMap.entrySet()) {
+ /* create delta and move each files to it. HIVE-19750 ensures wer have reserved
+ * enough write IDs to do this.*/
+ Path deltaDir = new Path(p, AcidUtils.deltaSubdir(ent.getKey(), ent.getKey()));
+ for (Path file : ent.getValue()) {
+ Path newFile = new Path(deltaDir, stripCopySuffix(file.getName()));
+ LOG.debug("need to rename: " + file + " to " + newFile);
+ if (fs.exists(newFile)) {
+ String msg = Warehouse.getQualifiedName(t) + ": " + newFile + " already exists?!";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ if (execute) {
+ if (!fs.rename(file, newFile)) {
+ String msg = Warehouse.getQualifiedName(t) + ": " + newFile + ": failed to rename";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ } else {
+ //todo: generate a rename command; what if FS is not hdfs?
+ }
+ }
+ }
+ return;
+ }
+ List renames = new ArrayList<>();
+ FileSystem fs = FileSystem.get(conf);
+ RemoteIterator iter = fs.listFiles(p, true);
+ /**
+ * count some heuristics - bad file is something not in {@link AcidUtils#ORIGINAL_PATTERN} or
+ * {@link AcidUtils#ORIGINAL_PATTERN_COPY} format. This has to be renamed for acid to work.
+ */
+ int numBadFileNames = 0;
+ /**
+ * count some heuristics - num files in {@link AcidUtils#ORIGINAL_PATTERN_COPY} format. These
+ * are supported but if there are a lot of them there will be a perf hit on read until
+ * major compaction
+ */
+ int numCopyNFiles = 0;
+ int fileId = 0;//ordinal of the file in the iterator
+ long numBytesInPartition = getDataSize(p, conf);
+ int numBuckets = guessNumBuckets(numBytesInPartition);
+ while (iter.hasNext()) {
+ LocatedFileStatus lfs = iter.next();
+ if(lfs.isDirectory()) {
+ continue;
+ }
+ AcidUtils.BucketMetaData bmd = AcidUtils.BucketMetaData.parse(lfs.getPath());
+ if(bmd.bucketId < 0) {
+ numBadFileNames++;
+ }
+ if(bmd.copyNumber > 0) {
+ //todo: what about same file name in subdir like Union All? ROW_ID generation will handle it
+ //but will have to look at ORC footers - treat these as copyN files?
+ numCopyNFiles++;
+ }
+ int wrtieId = fileId / numBuckets + 1;//start with delta_1 (not delta_0)
+ Path deltaDir = new Path(p, AcidUtils.deltaSubdir(wrtieId, wrtieId));
+ Path newPath = new Path(deltaDir, String.format(AcidUtils.BUCKET_DIGITS, fileId % numBuckets)+ "_0");
+ /*we could track reason for rename in RenamePair so that the decision can be made later to
+ rename or not. For example, if we need to minimize renames (say we are on S3), then we'd
+ only rename if it's absolutely required, i.e. if it's a 'bad file name'*/
+ renames.add(new RenamePair(lfs.getPath(), newPath));
+ fileId++;
+ }
+ if(numBadFileNames <= 0 && numCopyNFiles <=0) {
+ //if here, the only reason we'd want to rename is to spread the data into logical buckets to
+ //help 3.0 Compactor generated more balanced splits
+ return;
+ }
+ for(RenamePair renamePair : renames) {
+ LOG.debug("need to rename: " + renamePair.getOldPath() + " to " + renamePair.getNewPath());
+ if (fs.exists(renamePair.getNewPath())) {
+ String msg = Warehouse.getQualifiedName(t) + ": " + renamePair.getNewPath() + " already exists?!";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ if (execute) {
+ if (!fs.rename(renamePair.getOldPath(), renamePair.getNewPath())) {
+ String msg = Warehouse.getQualifiedName(t) + ": " + renamePair.getNewPath() + ": failed to rename";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ } else {
+ //todo: generate a rename command; what if FS is not hdfs?
+ }
+
+ }
+ }
+
+ /**
+ * Need better impl to be more memory efficient - there could be a lot of these objects.
+ * For example, remember partition root Path elsewhere,
+ * and have this object remember relative path to old file and bucketid/deletaid of new one
+ */
+ private static final class RenamePair {
+ private Path oldPath;
+ private Path newPath;
+ private RenamePair(Path old, Path newPath) {
+ this.oldPath = old;
+ this.newPath = newPath;
+ }
+ private Path getOldPath() {
+ return oldPath;
+ }
+ private Path getNewPath() {
+ return newPath;
+ }
+ }
+ /**
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static long getDataSize(Path location, HiveConf conf) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ ContentSummary cs = fs.getContentSummary(location);
+ return cs.getLength();
+ }
+
+ /**
+ * @param fileName - matching {@link AcidUtils#ORIGINAL_PATTERN_COPY}
+ */
+ private static String stripCopySuffix(String fileName) {
+ //0000_0_copy_N -> 0000_0
+ return fileName.substring(0, fileName.indexOf('_', 1 + fileName.indexOf('_', 0)));
+ }
+
+ /**
+ * Since current compactor derives its parallelism from file names, we need to name files in
+ * a way to control this parallelism. This should be a function of data size.
+ * @param partitionSizeInBytes
+ * @return cannot exceed 4096
+ */
+ public static int guessNumBuckets(long partitionSizeInBytes) {
+ long OneGB = 1000000000;
+ if(partitionSizeInBytes <= 1000000000) {
+ return 1;//1 bucket
+ }
+ if(partitionSizeInBytes <= 100 * OneGB) {
+ return 8;
+ }
+ if(partitionSizeInBytes <= 1000 * OneGB) {//TB
+ return 16;
+ }
+ if(partitionSizeInBytes <= 10 * 1000 * OneGB) {//10 TB
+ return 32;
+ }
+ if(partitionSizeInBytes <= 100 * 1000 * OneGB) {//100TB
+ return 64;
+ }
+ if(partitionSizeInBytes <= 1000 * 1000 * OneGB) {//PT
+ return 128;
+ }
+ if(partitionSizeInBytes <= 10 * 1000* 1000 * OneGB) {//10 PT
+ return 256;
+ }
+ if(partitionSizeInBytes <= 100 * 1000 * 1000 * OneGB) {//100 PT
+ return 512;
+ }
+ if(partitionSizeInBytes <= 1000 * 1000 *1000 * OneGB) {//1000 PT
+ return 1024;
+ }
+ return 2048;
+ }
+ /**
+ * todo: handle exclusion list
+ * Figures out which tables to make Acid, MM and (optionally, performs the operation)
+ */
+ private static void processConversion(Table t, List convertToAcid,
+ List convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute)
+ throws TException, HiveException, IOException {
+ if(isFullAcidTable(t)) {
+ return;
+ }
+ if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
+ return;
+ }
+ //todo: are HBase, Druid talbes managed in 2.x? 3.0?
+ String fullTableName = Warehouse.getQualifiedName(t);
+ /*
+ * ORC uses table props for settings so things like bucketing, I/O Format, etc should
+ * be the same for each partition.
+ */
+ boolean canBeMadeAcid = canBeMadeAcid(fullTableName, t.getSd());
+ if(t.getPartitionKeysSize() <= 0) {
+ if(canBeMadeAcid) {
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ //do this before alterTable in case files need to be renamed, else
+ // TransactionalMetastoreListerner will squak
+ handleRenameFiles(t, new Path(t.getSd().getLocation()), execute, db.getConf(),
+ t.getSd().getBucketColsSize() > 0);
+ if(execute) {
+ alterTable(t, db, false);
+ }
+ }
+ else {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ if(execute) {
+ alterTable(t, db, true);
+ }
+ }
+ }
+ else {
+ if(!canBeMadeAcid) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ if(execute) {
+ alterTable(t, db, true);
+ }
+ return;
+ }
+ //now that we know it can be made acid, rename files as needed
+ //process in batches in case there is a huge # of partitions
+ List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = PARTITION_BATCH_SIZE;
+ int numWholeBatches = partNames.size()/batchSize;
+ for(int i = 0; i < numWholeBatches; i++) {
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition part : partitionList) {
+ handleRenameFiles(t, new Path(part.getSd().getLocation()), execute, db.getConf(),
+ t.getSd().getBucketColsSize() > 0);
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for(Partition part : partitionList) {
+ handleRenameFiles(t, new Path(part.getSd().getLocation()), execute, db.getConf(),
+ t.getSd().getBucketColsSize() > 0);
+ }
+ }
+ //if here, handled all parts and they are no wAcid compatible - make it acid
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ if(execute) {
+ alterTable(t, db, false);
+ }
+ }
+ }
+ private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
+ return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
+ }
+ private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
+ try {
+ Class inputFormatClass = sd.getInputFormat() == null ? null :
+ Class.forName(sd.getInputFormat());
+ Class outputFormatClass = sd.getOutputFormat() == null ? null :
+ Class.forName(sd.getOutputFormat());
+
+ if (inputFormatClass != null && outputFormatClass != null &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
+ .isAssignableFrom(inputFormatClass) &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
+ .isAssignableFrom(outputFormatClass)) {
+ return true;
+ }
+ } catch (ClassNotFoundException e) {
+ //if a table is using some custom I/O format and it's not in the classpath, we won't mark
+ //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
+ //Acid format
+ LOG.error("Could not determine if " + fullTableName +
+ " can be made Acid due to: " + e.getMessage(), e);
+ return false;
+ }
+ return false;
+ }
+ private static void makeConvertTableScript(List alterTableAcid, List alterTableMm,
+ String scriptLocation) throws IOException {
+ if (alterTableAcid.isEmpty()) {
+ LOG.info("No acid conversion is necessary");
+ } else {
+ String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
+ LOG.debug("Writing CRUD conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
+ //todo: fix this - it has to run in 3.0 since tables may be unbucketed
+ pw.println("-- These commands may be executed by Hive 1.x later");
+ }
+ }
+
+ if (alterTableMm.isEmpty()) {
+ LOG.info("No managed table conversion is necessary");
+ } else {
+ String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
+ LOG.debug("Writing managed table conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
+ pw.println("-- These commands must be executed by Hive 3.0 or later");
+ }
+ }
+ }
+
+ private static PrintWriter createScript(List commands, String fileName,
+ String scriptLocation) throws IOException {
+ FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd + ";");
+ }
+ return pw;
+ }
+ private static void makeRenameFileScript(String scriptLocation) throws IOException {
+ List commands = Collections.emptyList();
+ if (commands.isEmpty()) {
+ LOG.info("No file renaming is necessary");
+ } else {
+ String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
+ LOG.debug("Writing file renaming commands to " + fileName);
+ PrintWriter pw = createScript(commands, fileName, scriptLocation);
+ pw.close();
+ }
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
+ private static boolean isAcidEnabled(HiveConf hiveConf) {
+ String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+ boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+ return txnMgr.equals(dbTxnMgr) && concurrency;
+ }
+ /**
+ * can set it from tests to test when config needs something other than default values
+ * For example, that acid is enabled
+ */
+ @VisibleForTesting
+ static HiveConf hiveConf = null;
+}
diff --git upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
new file mode 100644
index 0000000000..c8fca1ca5a
--- /dev/null
+++ upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class TestUpgradeTool extends TxnCommandsBaseForTests {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ /**
+ * includes 'execute' for postUpgrade
+ */
+ @Test
+ public void testPostUpgrade() throws Exception {
+ int[][] data = {{1, 2}, {3, 4}, {5, 6}};
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic");
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+
+ //should be converted to Acid
+ runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets" +
+ " stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("insert into TAcid" + makeValuesClause(data));
+ runStatementOnDriver("insert into TAcid" + makeValuesClause(data));//should now be copy_1
+ runStatementOnDriver("insert into TAcid" + makeValuesClause(data));//should now be copy_2
+
+ //should be converted to Acid
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int)" +
+ " clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ //to create some partitions
+ runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
+ //and copy_1 files
+ runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
+
+ //should be converted to Acid
+ //todo add some files with non-standard names
+ runStatementOnDriver("create table TFlat (a int, b int) stored as orc " +
+ "tblproperties('transactional'='false')");
+ runStatementOnDriver("insert into TFlat values(1,2)");//create 0000_0
+ runStatementOnDriver("insert into TFlat values(2,3)");//create 0000_0_copy_1
+ runStatementOnDriver("insert into TFlat values(3,4)");//create 0000_0_copy_2
+ runStatementOnDriver("insert into TFlat values(4,5)");//create 0000_0_copy_3
+ runStatementOnDriver("insert into TFlat values(5,6)");//create 0000_0_copy_4
+
+ /*
+ ├── 000000_0
+ ├── 000000_0_copy_1
+ ├── 000000_0_copy_2
+ ├── 000000_0_copy_3
+ └── 000000_0_copy_4
+
+ to
+
+ ├── 000000_0
+ ├── 000000_0_copy_2
+ ├── 1
+ │ └── 000000_0
+ ├── 2
+ │ └── 000000_0
+ └── subdir
+ └── part-0001
+
+*/
+ FileSystem fs = FileSystem.get(hiveConf);
+ //simulate Spark (non-Hive) write
+ fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_1"),
+ new Path(getWarehouseDir() + "/tflat/subdir/part-0001"));
+ //simulate Insert ... Select ... Union All...
+ fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_3"),
+ new Path(getWarehouseDir() + "/tflat/1/000000_0"));
+ fs.rename(new Path(getWarehouseDir() + "/tflat/000000_0_copy_4"),
+ new Path(getWarehouseDir() + "/tflat/2/000000_0"));
+ String testQuery0 = "select a, b from TFlat order by a";
+ String[][] expected0 = new String[][] {
+ {"1\t2",""},
+ {"2\t3",""},
+ {"3\t4",""},
+ {"4\t5",""},
+ {"5\t6",""},
+ };
+ checkResult(expected0, testQuery0, true, "TFlat pre-check", LOG);
+
+
+ //should be converted to MM
+ runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile " +
+ "tblproperties('transactional'='false')");
+
+ Hive db = Hive.get(hiveConf);
+ org.apache.hadoop.hive.ql.metadata.Table tacid = db.getTable("default", "tacid");
+ Assert.assertEquals("Expected TAcid to not be full acid", false,
+ AcidUtils.isFullAcidTable(tacid));
+ org.apache.hadoop.hive.ql.metadata.Table tacidpart = db.getTable("default", "tacidpart");
+ Assert.assertEquals("Expected TAcidPart to not be full acid", false,
+ AcidUtils.isFullAcidTable(tacidpart));
+
+ org.apache.hadoop.hive.ql.metadata.Table t = db.getTable("default", "tflat");
+ Assert.assertEquals("Expected TAcid to not be full acid", false,
+ AcidUtils.isFullAcidTable(t));
+ t = db.getTable("default", "tflattext");
+ Assert.assertEquals("Expected TAcidPart to not be full acid", false,
+ AcidUtils.isInsertOnlyTable(tacidpart));
+
+
+ String[] args2 = {"-location", getTestDataDir(), "-execute"};
+ UpgradeTool.hiveConf = hiveConf;
+ UpgradeTool.main(args2);
+
+ tacid = db.getTable("default", "tacid");
+ Assert.assertEquals("Expected TAcid to become full acid", true,
+ AcidUtils.isFullAcidTable(tacid));
+ tacidpart = db.getTable("default", "tacidpart");
+ Assert.assertEquals("Expected TAcidPart to become full acid", true,
+ AcidUtils.isFullAcidTable(tacidpart));
+
+ t = db.getTable("default", "tflat");
+ Assert.assertEquals("Expected TAcid to become acid", true, AcidUtils.isFullAcidTable(t));
+ t = db.getTable("default", "tflattext");
+ Assert.assertEquals("Expected TAcidPart to become MM", true,
+ AcidUtils.isInsertOnlyTable(t));
+
+ /*make sure we still get the same data and row_ids are assigned and deltas are as expected:
+ * each set of copy_N goes into matching delta_N_N.*/
+ String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TAcid order by a, b, ROW__ID";
+ String[][] expected = new String[][] {
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2",
+ "tacid/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2",
+ "tacid/delta_0000001_0000001/000000_0"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t2",
+ "tacid/delta_0000002_0000002/000000_0"},
+ {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t3\t4",
+ "tacid/000001_0"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4",
+ "tacid/delta_0000001_0000001/000001_0"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t3\t4",
+ "tacid/delta_0000002_0000002/000001_0"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tacid/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tacid/delta_0000001_0000001/000000_0"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tacid/delta_0000002_0000002/000000_0"}
+ };
+ checkResult(expected, testQuery, false, "TAcid post-check", LOG);
+
+
+ testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TAcidPart order by a, b, p, ROW__ID";
+ String[][] expected2 = new String[][] {
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "warehouse/tacidpart/p=10/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "tacidpart/p=10/delta_0000001_0000001/000000_0"},
+ {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t3\t4",
+ "tacidpart/p=11/000001_0"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4",
+ "tacidpart/p=11/delta_0000001_0000001/000001_0"},
+ {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tacidpart/p=12/000000_0"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tacidpart/p=12/delta_0000001_0000001/000000_0"}
+ };
+ checkResult(expected2, testQuery, false, "TAcidPart post-check", LOG);
+
+ /* Verify that we re-arranged/renamed so that files names follow hive naming convention
+ and are spread among deltas/buckets
+ The order of files in RemoteIterator iter = fs.listFiles(p, true)
+ is what determines which delta/file any original file ends up in */
+ testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TFlat order by a";
+ String[][] expected3 = new String[][] {
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+ "tflat/delta_0000001_0000001/00000_0"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\t3",
+ "tflat/delta_0000002_0000002/00000_0"},
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t3\t4",
+ "tflat/delta_0000004_0000004/00000_0"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t4\t5",
+ "tflat/delta_0000003_0000003/00000_0"},
+ {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
+ "tflat/delta_0000005_0000005/00000_0"}
+ };
+ checkResult(expected3, testQuery, false, "TFlat post-check", LOG);
+
+ }
+ @Test
+ public void testGuessNumBuckets() {
+ Assert.assertEquals(1, UpgradeTool.guessNumBuckets(123));
+ Assert.assertEquals(1, UpgradeTool.guessNumBuckets(30393930));
+ Assert.assertEquals(1, UpgradeTool.guessNumBuckets((long) Math.pow(10, 9)));
+ Assert.assertEquals(32, UpgradeTool.guessNumBuckets((long) Math.pow(10, 13)));//10 TB
+ Assert.assertEquals(128, UpgradeTool.guessNumBuckets((long) Math.pow(10, 15)));//PB
+ }
+}
diff --git upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TxnCommandsBaseForTests.java upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TxnCommandsBaseForTests.java
new file mode 100644
index 0000000000..376d58549f
--- /dev/null
+++ upgrade-acid/post-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TxnCommandsBaseForTests.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a hack: copied from ql package
+ */
+public abstract class TxnCommandsBaseForTests {
+ private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class);
+ //bucket count for test tables; set it to 1 for easier debugging
+ final static int BUCKET_COUNT = 2;
+ @Rule
+ public TestName testName = new TestName();
+ HiveConf hiveConf;
+ Driver d;
+ enum Table {
+ ACIDTBL("acidTbl"),
+ ACIDTBLPART("acidTblPart"),
+ ACIDTBL2("acidTbl2"),
+ NONACIDORCTBL("nonAcidOrcTbl"),
+ NONACIDORCTBL2("nonAcidOrcTbl2"),
+ NONACIDNONBUCKET("nonAcidNonBucket");
+
+ final String name;
+ @Override
+ public String toString() {
+ return name;
+ }
+ Table(String name) {
+ this.name = name;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ setUpInternal();
+ }
+ void initHiveConf() {
+ hiveConf = new HiveConf(this.getClass());
+ }
+ void setUpInternal() throws Exception {
+ initHiveConf();
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "local");
+ hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "system");
+ hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "staging");
+ hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "temp");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf
+ .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ hiveConf.setBoolean("mapred.input.dir.recursive", true);
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.prepDb(hiveConf);
+ File f = new File(getWarehouseDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getWarehouseDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getWarehouseDir());
+ }
+ SessionState ss = SessionState.start(hiveConf);
+ ss.applyAuthorizationPolicy();
+ d = new Driver(new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build(), null);
+ d.setMaxRows(10000);
+ dropTables();
+ runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='false')");
+ }
+ protected void dropTables() throws Exception {
+ for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) {
+ runStatementOnDriver("drop table if exists " + t);
+ }
+ }
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (d != null) {
+ dropTables();
+ d.close();
+ d.destroy();
+ d = null;
+ }
+ } finally {
+ TxnDbUtil.cleanDb(hiveConf);
+// FileUtils.deleteDirectory(new File(getTestDataDir()));
+ }
+ }
+ String getWarehouseDir() {
+ return getTestDataDir() + "/warehouse";
+ }
+ abstract String getTestDataDir();
+ /**
+ * takes raw data and turns it into a string as if from Driver.getResults()
+ * sorts rows in dictionary order
+ */
+ List stringifyValues(int[][] rowsIn) {
+ assert rowsIn.length > 0;
+ int[][] rows = rowsIn.clone();
+ Arrays.sort(rows, new RowComp());
+ List rs = new ArrayList();
+ for(int[] row : rows) {
+ assert row.length > 0;
+ StringBuilder sb = new StringBuilder();
+ for(int value : row) {
+ sb.append(value).append("\t");
+ }
+ sb.setLength(sb.length() - 1);
+ rs.add(sb.toString());
+ }
+ return rs;
+ }
+ String makeValuesClause(int[][] rows) {
+ assert rows.length > 0;
+ StringBuilder sb = new StringBuilder(" values");
+ for(int[] row : rows) {
+ assert row.length > 0;
+ if(row.length > 1) {
+ sb.append("(");
+ }
+ for(int value : row) {
+ sb.append(value).append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ if(row.length > 1) {
+ sb.append(")");
+ }
+ sb.append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ return sb.toString();
+ }
+
+ static class RowComp implements Comparator {
+ @Override
+ public int compare(int[] row1, int[] row2) {
+ assert row1 != null && row2 != null && row1.length == row2.length;
+ for(int i = 0; i < row1.length; i++) {
+ int comp = Integer.compare(row1[i], row2[i]);
+ if(comp != 0) {
+ return comp;
+ }
+ }
+ return 0;
+ }
+ }
+
+ List runStatementOnDriver(String stmt) throws Exception {
+ LOG.info("Running the query: " + stmt);
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(stmt + " failed: " + cpr);
+ }
+ List rs = new ArrayList();
+ d.getResults(rs);
+ return rs;
+ }
+ CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ return cpr;
+ }
+ throw new RuntimeException("Didn't get expected failure!");
+ }
+
+ /**
+ * Runs Vectorized Explain on the query and checks if the plan is vectorized as expected
+ * @param vectorized {@code true} - assert that it's vectorized
+ */
+ void assertVectorized(boolean vectorized, String query) throws Exception {
+ List rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
+ for(String line : rs) {
+ if(line != null && line.contains("Execution mode: vectorized")) {
+ Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
+ return;
+ }
+ }
+ Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+ }
+ /**
+ * Will assert that actual files match expected.
+ * @param expectedFiles - suffixes of expected Paths. Must be the same length
+ * @param rootPath - table or patition root where to start looking for actual files, recursively
+ */
+ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Exception {
+ int suffixLength = 0;
+ for(String s : expectedFiles) {
+ if(suffixLength > 0) {
+ assert suffixLength == s.length() : "all entries must be the same length. current: " + s;
+ }
+ suffixLength = s.length();
+ }
+ FileSystem fs = FileSystem.get(hiveConf);
+ Set actualFiles = new HashSet<>();
+ RemoteIterator remoteIterator = fs.listFiles(new Path(rootPath), true);
+ while (remoteIterator.hasNext()) {
+ LocatedFileStatus lfs = remoteIterator.next();
+ if(!lfs.isDirectory() && org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER.accept(lfs.getPath())) {
+ String p = lfs.getPath().toString();
+ actualFiles.add(p.substring(p.length() - suffixLength, p.length()));
+ }
+ }
+ Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
+ }
+ void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
+ LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
+ logResult(LOG, rs);
+ Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs,
+ expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ if(checkFileName) {
+ Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ }
+ }
+ void logResult(Logger LOG, List rs) {
+ StringBuilder sb = new StringBuilder();
+ for(String s : rs) {
+ sb.append(s).append('\n');
+ }
+ LOG.info(sb.toString());
+ }
+ /**
+ * We have to use a different query to check results for Vectorized tests because to get the
+ * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME}
+ * which will currently make the query non-vectorizable. This means we can't check the file name
+ * for vectorized version of the test.
+ */
+ void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{
+ List rs = runStatementOnDriver(query);
+ checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized);
+ assertVectorized(isVectorized, query);
+ }
+}
diff --git upgrade-acid/pre-upgrade/pom.xml upgrade-acid/pre-upgrade/pom.xml
new file mode 100644
index 0000000000..b5ef824ef3
--- /dev/null
+++ upgrade-acid/pre-upgrade/pom.xml
@@ -0,0 +1,282 @@
+
+
+
+
+
+ org.apache.hive
+ hive-upgrade-acid
+ 4.0.0-SNAPSHOT
+ ../pom.xml
+
+
+
+ 4.0.0
+
+ 4.0.0-SNAPSHOT
+ hive-pre-upgrade
+ Hive Pre Upgrade Acid
+ jar
+
+
+ ../..
+
+
+ ${project.build.directory}/testconf
+ file://
+ ${project.basedir}/src/test/resources
+ ${project.build.directory}/tmp
+ ${project.build.directory}/warehouse
+ file://
+ 1
+ true
+
+
+
+
+ commons-cli
+ commons-cli
+ 1.2
+ provided
+
+
+ org.apache.hive
+ hive-metastore
+ 2.3.3
+ provided
+
+
+ org.apache.hive
+ hive-exec
+ 2.3.3
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.7.2
+ provided
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ 2.7.2
+ provided
+
+
+ org.apache.orc
+ orc-core
+ 1.3.3
+ provided
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ package.jdo
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ ${maven.antrun.plugin.version}
+
+
+ ant-contrib
+ ant-contrib
+ ${ant.contrib.version}
+
+
+ ant
+ ant
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven.checkstyle.plugin.version}
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${maven.exec.plugin.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ setup-test-dirs
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ setup-metastore-scripts
+ process-test-resources
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 2.20.1
+
+
+
+ integration-test
+ verify
+
+
+
+
+ true
+ false
+ -Xmx2048m
+ false
+
+ true
+ ${test.tmp.dir}
+ ${test.tmp.dir}
+ true
+
+
+ ${log4j.conf.dir}
+
+ ${skipITests}
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven.surefire.version}
+
+ true
+ false
+ ${test.forkcount}
+ -Xmx2048m
+ false
+
+ ${project.build.directory}
+ true
+ ${derby.version}
+ ${test.tmp.dir}/derby.log
+
+ ${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties
+ true
+ ${test.tmp.dir}
+
+ jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true
+ false
+ ${test.tmp.dir}
+ ${test.warehouse.scheme}${test.warehouse.dir}
+
+
+
+ ${log4j.conf.dir}
+ ${test.conf.dir}
+
+ ${test.conf.dir}/conf
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
similarity index 75%
rename from upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
rename to upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
index 78c084392d..f3562c7d72 100644
--- upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
+++ upgrade-acid/pre-upgrade/src/main/java/org/apache/hadoop/hive/upgrade/acid/PreUpgradeTool.java
@@ -33,7 +33,6 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -41,7 +40,6 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -109,22 +107,22 @@
* "location" option may be supplied followed by a path to set the location for the generated
* scripts.
*/
-public class UpgradeTool {
- private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class);
+public class PreUpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(PreUpgradeTool.class);
private static final int PARTITION_BATCH_SIZE = 10000;
private final Options cmdLineOptions = new Options();
public static void main(String[] args) throws Exception {
- UpgradeTool tool = new UpgradeTool();
+ PreUpgradeTool tool = new PreUpgradeTool();
tool.init();
CommandLineParser parser = new GnuParser();
CommandLine line ;
String outputDir = ".";
- boolean preUpgrade = false, postUpgrade = false, execute = false, nonBlocking = false;
+ boolean execute = false;
try {
line = parser.parse(tool.cmdLineOptions, args);
} catch (ParseException e) {
- System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
+ System.err.println("PreUpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
printAndExit(tool);
return;
}
@@ -139,39 +137,21 @@ public static void main(String[] args) throws Exception {
if(line.hasOption("execute")) {
execute = true;
}
- if(line.hasOption("preUpgrade")) {
- preUpgrade = true;
- }
- if(line.hasOption("postUpgrade")) {
- postUpgrade = true;
- }
- LOG.info("Starting with preUpgrade=" + preUpgrade + ", postUpgrade=" + postUpgrade +
- ", execute=" + execute + ", location=" + outputDir);
- if(preUpgrade && postUpgrade) {
- throw new IllegalArgumentException("Cannot specify both preUpgrade and postUpgrade");
- }
+ LOG.info("Starting with execute=" + execute + ", location=" + outputDir);
try {
String hiveVer = HiveVersionInfo.getShortVersion();
- if(preUpgrade) {
- if(!hiveVer.startsWith("2.")) {
- throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer);
- }
+ if(!hiveVer.startsWith("2.")) {
+ throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer);
}
- if(postUpgrade && execute && !isTestMode) {
- if(!hiveVer.startsWith("3.")) {
- throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " +
- hiveVer);
- }
- }
- tool.prepareAcidUpgradeInternal(outputDir, preUpgrade, postUpgrade, execute);
+ tool.prepareAcidUpgradeInternal(outputDir, execute);
}
catch(Exception ex) {
- LOG.error("UpgradeTool failed", ex);
+ LOG.error("PreUpgradeTool failed", ex);
throw ex;
}
}
- private static void printAndExit(UpgradeTool tool) {
+ private static void printAndExit(PreUpgradeTool tool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
System.exit(1);
@@ -179,13 +159,8 @@ private static void printAndExit(UpgradeTool tool) {
private void init() {
try {
- cmdLineOptions.addOption(new Option("help", "print this message"));
- cmdLineOptions.addOption(new Option("preUpgrade",
- "Generates a script to execute on 2.x cluster. This requires 2.x binaries" +
- " on the classpath and hive-site.xml."));
- cmdLineOptions.addOption(new Option("postUpgrade",
- "Generates a script to execute on 3.x cluster. This requires 3.x binaries" +
- " on the classpath and hive-site.xml."));
+ cmdLineOptions.addOption(new Option("help", "Generates a script to execute on 2.x" +
+ " cluster. This requires 2.x binaries on the classpath and hive-site.xml."));
Option exec = new Option("execute",
"Executes commands equivalent to generated scrips");
exec.setOptionalArg(true);
@@ -208,8 +183,8 @@ private void init() {
*
*
*/
- private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrade,
- boolean postUpgrade, boolean execute) throws HiveException, TException, IOException {
+ private void prepareAcidUpgradeInternal(String scriptLocation, boolean execute)
+ throws HiveException, TException, IOException {
HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
boolean isAcidEnabled = isAcidEnabled(conf);
HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
@@ -232,9 +207,9 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrad
for(String tableName : tables) {
Table t = hms.getTable(dbName, tableName);
LOG.debug("processing table " + Warehouse.getQualifiedName(t));
- if(preUpgrade && isAcidEnabled) {
+ if(isAcidEnabled) {
//if acid is off, there can't be any acid tables - nothing to compact
- if(execute && txns == null) {
+ if(txns == null) {
/*
This API changed from 2.x to 3.0. so this won't even compile with 3.0
but it doesn't need to since we only run this preUpgrade
@@ -246,18 +221,12 @@ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrad
getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns);
compactions.addAll(compactionCommands);
}
- if(postUpgrade && isAcidEnabled) {
- //if acid is off post upgrade, you can't make any tables acid - will throw
- processConversion(t, convertToAcid, convertToMM, hms, db, execute);
- }
/*todo: handle renaming files somewhere*/
}
}
makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
- makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
- makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade?
- //todo: can different tables be in different FileSystems?
- if(preUpgrade && execute) {
+
+ if(execute) {
while(compactionMetaInfo.compactionIds.size() > 0) {
LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() +
" compactions to complete");
@@ -321,113 +290,6 @@ private static void alterTable(Table t, Hive db, boolean isMM)
db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null);
}
- /**
- * todo: handle exclusion list
- * Figures out which tables to make Acid, MM and (optionally, performs the operation)
- */
- private static void processConversion(Table t, List convertToAcid,
- List convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute)
- throws TException, HiveException {
- if(isFullAcidTable(t)) {
- return;
- }
- if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
- return;
- }
- String fullTableName = Warehouse.getQualifiedName(t);
- if(t.getPartitionKeysSize() <= 0) {
- if(canBeMadeAcid(fullTableName, t.getSd())) {
- convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true')");
- if(execute) {
- alterTable(t, db, false);
- }
- }
- else {
- convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true', 'transactional_properties'='insert_only')");
- if(execute) {
- alterTable(t, db, true);
- }
- }
- }
- else {
- /*
- each Partition may have different I/O Format so have to check them all before deciding to
- make a full CRUD table.
- Run in batches to prevent OOM
- */
- List partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
- int batchSize = PARTITION_BATCH_SIZE;
- int numWholeBatches = partNames.size()/batchSize;
- for(int i = 0; i < numWholeBatches; i++) {
- List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
- partNames.subList(i * batchSize, (i + 1) * batchSize));
- if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
- return;
- }
- }
- if(numWholeBatches * batchSize < partNames.size()) {
- //last partial batch
- List partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
- partNames.subList(numWholeBatches * batchSize, partNames.size()));
- if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
- return;
- }
- }
- //if here checked all parts and they are Acid compatible - make it acid
- convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true')");
- if(execute) {
- alterTable(t, db, false);
- }
- }
- }
- /**
- * @return true if table was converted/command generated
- */
- private static boolean alterTable(String fullTableName, List partitionList,
- List convertToMM, Table t, Hive db, boolean execute)
- throws InvalidOperationException, HiveException {
- for(Partition p : partitionList) {
- if(!canBeMadeAcid(fullTableName, p.getSd())) {
- convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true', 'transactional_properties'='insert_only')");
- if(execute) {
- alterTable(t, db, true);
- }
- return true;
- }
- }
- return false;
- }
- private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
- return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
- }
- private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
- try {
- Class inputFormatClass = sd.getInputFormat() == null ? null :
- Class.forName(sd.getInputFormat());
- Class outputFormatClass = sd.getOutputFormat() == null ? null :
- Class.forName(sd.getOutputFormat());
-
- if (inputFormatClass != null && outputFormatClass != null &&
- Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
- .isAssignableFrom(inputFormatClass) &&
- Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
- .isAssignableFrom(outputFormatClass)) {
- return true;
- }
- } catch (ClassNotFoundException e) {
- //if a table is using some custom I/O format and it's not in the classpath, we won't mark
- //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
- //Acid format
- LOG.error("Could not determine if " + fullTableName +
- " can be made Acid due to: " + e.getMessage(), e);
- return false;
- }
- return false;
- }
/**
* Generates a set compaction commands to run on pre Hive 3 cluster
*/
@@ -464,29 +326,6 @@ private static void makeCompactionScript(List commands, String scriptLoc
"-- capacity of this queue appropriately");
}
}
- private static void makeConvertTableScript(List alterTableAcid, List alterTableMm,
- String scriptLocation) throws IOException {
- if (alterTableAcid.isEmpty()) {
- LOG.info("No acid conversion is necessary");
- } else {
- String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
- LOG.debug("Writing CRUD conversion commands to " + fileName);
- try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
- //todo: fix this - it has to run in 3.0 since tables may be unbucketed
- pw.println("-- These commands may be executed by Hive 1.x later");
- }
- }
-
- if (alterTableMm.isEmpty()) {
- LOG.info("No managed table conversion is necessary");
- } else {
- String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
- LOG.debug("Writing managed table conversion commands to " + fileName);
- try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
- pw.println("-- These commands must be executed by Hive 3.0 or later");
- }
- }
- }
private static PrintWriter createScript(List commands, String fileName,
String scriptLocation) throws IOException {
@@ -497,17 +336,6 @@ private static PrintWriter createScript(List commands, String fileName,
}
return pw;
}
- private static void makeRenameFileScript(String scriptLocation) throws IOException {
- List commands = Collections.emptyList();
- if (commands.isEmpty()) {
- LOG.info("No file renaming is necessary");
- } else {
- String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
- LOG.debug("Writing file renaming commands to " + fileName);
- PrintWriter pw = createScript(commands, fileName, scriptLocation);
- pw.close();
- }
- }
/**
* @return any compaction commands to run for {@code Table t}
*/
@@ -795,11 +623,6 @@ void onWaitForCompaction() throws MetaException {}
static Callback callback;
@VisibleForTesting
static int pollIntervalMs = 1000*30;
- /**
- * Also to enable testing until I set up Maven profiles to be able to run with 3.0 jars
- */
- @VisibleForTesting
- static boolean isTestMode = false;
/**
* can set it from tests to test when config needs something other than default values
*/
diff --git upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java
similarity index 72%
rename from upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
rename to upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java
index c8964a4a4c..4fe7007c96 100644
--- upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
+++ upgrade-acid/pre-upgrade/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestPreUpgradeTool.java
@@ -52,10 +52,10 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-public class TestUpgradeTool {
- private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class);
+public class TestPreUpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(TestPreUpgradeTool.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
- File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ File.separator + TestPreUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
private String getTestDataDir() {
@@ -78,6 +78,7 @@ public void testUpgrade() throws Exception {
runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p tinyint) clustered by (b) into 2 buckets stored" +
" as orc TBLPROPERTIES ('transactional'='true')");
+ //on 2.x these are guaranteed to not be acid
runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
@@ -99,19 +100,19 @@ public void testUpgrade() throws Exception {
//todo: add partitioned table that needs conversion to MM/Acid
//todo: rename files case
- String[] args = {"-location", getTestDataDir(), "-preUpgrade", "-execute"};
- UpgradeTool.callback = new UpgradeTool.Callback() {
+ String[] args = {"-location", getTestDataDir(), "-execute"};
+ PreUpgradeTool.callback = new PreUpgradeTool.Callback() {
@Override
void onWaitForCompaction() throws MetaException {
runWorker(hiveConf);
}
};
- UpgradeTool.pollIntervalMs = 1;
- UpgradeTool.hiveConf = hiveConf;
- UpgradeTool.main(args);
+ PreUpgradeTool.pollIntervalMs = 1;
+ PreUpgradeTool.hiveConf = hiveConf;
+ PreUpgradeTool.main(args);
/*
todo: parse
- target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286256834/compacts_1527286277624.sql
+ target/tmp/org.apache.hadoop.hive.upgrade.acid.TestPreUpgradeTool-1527286256834/compacts_1527286277624.sql
make sure it's the only 'compacts' file and contains
ALTER TABLE default.tacid COMPACT 'major';
ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major';
@@ -125,68 +126,13 @@ void onWaitForCompaction() throws MetaException {
Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState());
}
- String[] args2 = {"-location", getTestDataDir(), "-postUpgrade"};
- UpgradeTool.main(args2);
+ String[] args2 = {"-location", getTestDataDir()};
+ PreUpgradeTool.main(args2);
/*
- * todo: parse
- * convertToAcid_1527286288784.sql make sure it has
- * ALTER TABLE default.tflat SET TBLPROPERTIES ('transactional'='true');
- * convertToMM_1527286288784.sql make sure it has
- * ALTER TABLE default.tflattext SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only');
+ * todo: parse compacts script - make sure there is nothing in it
* */
}
- /**
- * includes 'execute' for postUpgrade
- * @throws Exception
- */
- @Test
- public void testPostUpgrade() throws Exception {
- int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
- hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic");
- runStatementOnDriver("drop table if exists TAcid");
- runStatementOnDriver("drop table if exists TAcidPart");
- runStatementOnDriver("drop table if exists TFlat");
- runStatementOnDriver("drop table if exists TFlatText");
-
- runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')");
- runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" +
- " as orc TBLPROPERTIES ('transactional'='false')");
- //to create some partitions
- runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
-
-
- //todo: to test these need to link against 3.x libs - maven profiles?
- //runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
- //runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
-
- Hive db = Hive.get(hiveConf);
- Table tacid = db.getTable("default", "tacid");
- Assert.assertEquals("Expected TAcid to become full acid", false, AcidUtils.isAcidTable(tacid));
- Table tacidpart = db.getTable("default", "tacidpart");
- Assert.assertEquals("Expected TAcidPart to become full acid", false,
- AcidUtils.isAcidTable(tacidpart));
-
-
- String[] args2 = {"-location", getTestDataDir(), "-postUpgrade", "-execute"};
- UpgradeTool.isTestMode = true;
- UpgradeTool.hiveConf = hiveConf;
- UpgradeTool.main(args2);
-
- tacid = db.getTable("default", "tacid");
- Assert.assertEquals("Expected TAcid to become full acid", true, AcidUtils.isAcidTable(tacid));
- tacidpart = db.getTable("default", "tacidpart");
- Assert.assertEquals("Expected TAcidPart to become full acid", true,
- AcidUtils.isAcidTable(tacidpart));
-
- /**
- todo: parse
- target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286026461/convertToAcid_1527286063065.sql
- make sure it has:
- ALTER TABLE default.tacid SET TBLPROPERTIES ('transactional'='true');
- ALTER TABLE default.tacidpart SET TBLPROPERTIES ('transactional'='true');
- */
- }
private static void runWorker(HiveConf hiveConf) throws MetaException {
AtomicBoolean stop = new AtomicBoolean(true);
Worker t = new Worker();