diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 948fc6eb04..6d0c287d29 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.apache.hadoop.hive.metastore.*; @@ -68,10 +69,13 @@ import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; @@ -111,6 +115,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; +import static org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData.DUMP_METADATA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -2166,6 +2171,65 @@ public void testConfiguredDeleteOfPrevDumpDir() throws IOException { verifySetupSteps = verifySetupOriginal; } + @Test + public void testDumpMetadataBackwardCompatibility() throws IOException, SemanticException { + boolean verifySetupOriginal = verifySetupSteps; + verifySetupSteps = true; + String nameOfTest = testName.getMethodName(); + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + //ensure bootstrap load runs with earlier format of dumpmetadata + Tuple bootstrapDump = replDumpDb(dbName); + deleteNewMetadataFields(bootstrapDump); + loadAndVerify(replDbName, dbName, bootstrapDump.lastReplId); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + //ensure first incremental load runs with earlier format of dumpmetadata + Tuple incDump = replDumpDb(dbName); + deleteNewMetadataFields(incDump); + loadAndVerify(replDbName, dbName, incDump.lastReplId); + verifyRun("SELECT * from " + replDbName + ".unptned", unptnData, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + //verify 2nd incremental load + incDump = replDumpDb(dbName); + deleteNewMetadataFields(incDump); + loadAndVerify(replDbName, dbName, incDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + verifySetupSteps = verifySetupOriginal; + } + @Test public void testReplConfiguredCleanupOfNotificationEvents() throws Exception { @@ -4532,4 +4596,33 @@ public static Path getNonRecoverablePath(Path dumpDir, String dbName, HiveConf c } return null; } + + private void deleteNewMetadataFields(Tuple dump) throws SemanticException { + Path dumpHiveDir = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(dumpHiveDir, hconf); + Path dumpMetaPath = new Path(dumpHiveDir, DUMP_METADATA); + + List> listValues = new ArrayList<>(); + DumpType dumpType = dmd.getDumpType(); + Long eventFrom = dmd.getEventFrom(); + Long eventTo = dmd.getEventTo(); + String cmRoot = "testCmRoot"; + String payload = dmd.getPayload(); + Long dumpExecutionId = dmd.getDumpExecutionId(); + ReplScope replScope = dmd.getReplScope(); + listValues.add( + Arrays.asList( + dumpType.toString(), + eventFrom.toString(), + eventTo.toString(), + cmRoot, + dumpExecutionId.toString(), + payload) + ); + if (replScope != null) { + listValues.add(dmd.prepareReplScopeValues()); + } + org.apache.hadoop.hive.ql.parse.repl.dump + .Utils.writeOutput(listValues, dumpMetaPath, hconf, true); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index eb87bb925d..d4e8e3dfa9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -38,7 +38,7 @@ public class DumpMetaData { // wrapper class for reading and writing metadata about a dump // responsible for _dumpmetadata files - private static final String DUMP_METADATA = "_dumpmetadata"; + public static final String DUMP_METADATA = "_dumpmetadata"; private static final Logger LOG = LoggerFactory.getLogger(DumpMetaData.class); private DumpType dumpType; @@ -130,7 +130,8 @@ private void loadDumpFromFile() throws SemanticException { lineContents[2].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[2]), lineContents[3].equals(Utilities.nullStringOutput) ? null : new Path(lineContents[3]), lineContents[4].equals(Utilities.nullStringOutput) ? null : Long.valueOf(lineContents[4]), - Boolean.valueOf(lineContents[6])); + (lineContents.length < 7 || lineContents[6].equals(Utilities.nullStringOutput)) ? + Boolean.valueOf(false) : Boolean.valueOf(lineContents[6])); setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( @@ -203,7 +204,7 @@ private void initializeIfNot() throws SemanticException { } } - private List prepareReplScopeValues() { + public List prepareReplScopeValues() { assert(replScope != null); List values = new ArrayList<>();