diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b020351..146fb37 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; @@ -298,7 +297,6 @@ public void testBasicWithCM() throws Exception { String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; - String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"}; String[] empty = new String[]{}; String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); @@ -309,7 +307,6 @@ public void testBasicWithCM() throws Exception { createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - createTestDataFile(ptn_locn_2_later, ptn_data_2_later); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); run("SELECT * from " + dbName + ".unptned", driver); @@ -332,15 +329,9 @@ public void testBasicWithCM() throws Exception { // Table dropped after "repl dump" run("DROP TABLE " + dbName + ".unptned", driver); + // Partition droppped after "repl dump" run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); - // File changed after "repl dump" - Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2"); - Path loc = new Path(p.getSd().getLocation()); - FileSystem fs = loc.getFileSystem(hconf); - Path file = fs.listStatus(loc)[0].getPath(); - fs.delete(file, false); - fs.copyFromLocalFile(new Path(ptn_locn_2_later), file); run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); printOutput(driverMirror); @@ -353,10 +344,8 @@ public void testBasicWithCM() throws Exception { verifyResults(unptn_data, driverMirror); run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror); verifyResults(ptn_data_1, driverMirror); - // Since partition(b=2) changed manually, Hive cannot find - // it in original location and cmroot, thus empty run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror); - verifyResults(empty, driverMirror); + verifyResults(ptn_data_2, driverMirror); run("SELECT a from " + dbName + ".ptned_empty", driverMirror); verifyResults(empty, driverMirror); run("SELECT * from " + dbName + ".unptned_empty", driverMirror); @@ -1281,6 +1270,7 @@ public void testEventTypesForDynamicAddPartitionByInsert() throws IOException { String[] ptn_data = new String[]{ "ten"}; run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data[0] + "')", driver); + run("DROP TABLE " + dbName + ".ptned", driver); // Inject a behaviour where it throws exception if an INSERT event is found // As we dynamically add a partition through INSERT INTO cmd, it should just add ADD_PARTITION @@ -1300,6 +1290,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event if (event.getDbName().equalsIgnoreCase(dbName)) { if (event.getEventType() == "INSERT") { // If an insert event is found, then return null hence no event is dumped. + LOG.error("Encountered INSERT event when it was not expected to"); return null; } } @@ -1316,7 +1307,7 @@ public NotificationEventResponse apply(@Nullable NotificationEventResponse event eventTypeValidator.assertInjectionsPerformed(true,false); InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror); + verifyIfTableNotExist(replDbName , "ptned", metaStoreClientMirror); } @Test @@ -2546,32 +2537,58 @@ public void testConcatenatePartitionedTable() throws IOException { // Replicate all the events happened so far Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); - verifySetup("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); - verifySetup("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror); } @Test - public void testStatus() throws IOException { - // first test ReplStateMap functionality - Map cmap = new ReplStateMap(); + public void testIncrementalLoadFailAndRetry() throws IOException { + String testName = "incrementalLoadFailAndRetry"; + String dbName = createDB(testName, driver); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); - Long oldV; - oldV = cmap.put("a",1L); - assertEquals(1L,cmap.get("a").longValue()); - assertEquals(null,oldV); + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - cmap.put("b",2L); - oldV = cmap.put("b",-2L); - assertEquals(2L, cmap.get("b").longValue()); - assertEquals(2L, oldV.longValue()); + // Prefixed with incrementalLoadFailAndRetry to avoid finding entry in cmpath + String[] ptn_data_1 = new String[] { "incrementalLoadFailAndRetry_fifteen" }; + String[] empty = new String[] {}; - cmap.put("c",3L); - oldV = cmap.put("c",33L); - assertEquals(33L, cmap.get("c").longValue()); - assertEquals(3L, oldV.longValue()); + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver); + run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver); - // Now, to actually testing status - first, we bootstrap. + // Move the data files of this newly created partition to a temp location + Partition ptn = null; + try { + ptn = metaStoreClient.getPartition(dbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + } catch (Exception e) { + assert(false); + } + + Path ptnLoc = new Path(ptn.getSd().getLocation()); + Path tmpLoc = new Path(TEST_PATH + "/incrementalLoadFailAndRetry"); + FileSystem dataFs = ptnLoc.getFileSystem(hconf); + assert(dataFs.rename(ptnLoc, tmpLoc)); + + // Replicate all the events happened so far. It should fail as the data files missing in + // original path and not available in CM as well. + Tuple incrDump = replDumpDb(dbName, bootstrapDump.lastReplId, null, null); + verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror); + + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror); + verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror); + // Move the files back to original data location + assert(dataFs.rename(tmpLoc, ptnLoc)); + loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId); + + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror); + } + + @Test + public void testStatus() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name, driver); advanceDumpDir(); @@ -3000,8 +3017,9 @@ private void verifyFail(String cmd, Driver myDriver) throws RuntimeException { } catch (AssertionError ae){ LOG.warn("AssertionError:",ae); throw new RuntimeException(ae); + } catch (Exception e) { + success = false; } - assertFalse(success); } @@ -3034,7 +3052,7 @@ private static boolean run(String cmd, boolean errorOnFail, Driver myDriver) thr boolean success = false; try { CommandProcessorResponse ret = myDriver.run(cmd); - success = (ret.getException() == null); + success = ((ret.getException() == null) && (ret.getErrorMessage() == null)); if (!success){ LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), ret.getErrorMessage(), cmd); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 185ac1d..a0653b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3626,7 +3626,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { if (allPartitions == null) { db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), alterTbl.getEnvironmentContext()); } else { - db.alterPartitions(tbl.getTableName(), allPartitions, alterTbl.getEnvironmentContext()); + db.alterPartitions(alterTbl.getOldName(), allPartitions, alterTbl.getEnvironmentContext()); } // Add constraints if necessary addConstraints(db, alterTbl); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 7330f56..296d003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -187,9 +187,8 @@ protected int execute(DriverContext driverContext) { // If the srcMap is not empty which means we made the list of files for distCp. // If there are files from different filesystems, then the map will have multiple entries. if (!srcMap.isEmpty()) { - for (final HashMap.Entry> entry : srcMap.entrySet()) { - FileSystem actualSrcFs = entry.getKey(); - List srcPaths = entry.getValue(); + for (final FileSystem actualSrcFs : srcMap.keySet()) { + List srcPaths = srcMap.get(actualSrcFs); if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) { console.printError("Failed to copy: " + srcPaths.size() + " files to: '" + toPath.toString() + "'"); @@ -257,8 +256,9 @@ private static boolean isLocalFileSystem(FileSystem fs) { fileWithChksum[1], conf); ret.add(f); } catch (MetaException e) { - // skip and issue warning for missing file + // issue warning for missing file and throw exception LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot"); + throw new IOException(e.getMessage()); } // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 34b6737..e91f054 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -166,7 +166,9 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex } private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { - return getNewReplicationSpec(eventId.toString(), eventId.toString()); + ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString()); + rspec.setIsIncrementalDump(true); + return rspec; } private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { @@ -250,12 +252,13 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except private ReplicationSpec getNewReplicationSpec() throws TException { ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC() - .getCurrentNotificationEventId().getEventId())); + .getCurrentNotificationEventId().getEventId())); return rspec; } private ReplicationSpec getNewReplicationSpec(String evState, String objState) { - return new ReplicationSpec(true, false, evState, objState, false, true, true); + return new ReplicationSpec(true, false, false, evState, objState, + false, true, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 37edd5c..606a414 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -59,10 +59,10 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -143,7 +143,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), - null, null); + null); } catch (SemanticException e) { throw e; @@ -180,7 +180,7 @@ public static boolean prepareImport( String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, - Map dbsUpdated, Map tablesUpdated + UpdatedMetaDataTracker updatedMetadata ) throws IOException, MetaException, HiveException, URISyntaxException { // initialize load path @@ -200,6 +200,8 @@ public static boolean prepareImport( ReplicationSpec replicationSpec = rv.getReplicationSpec(); if (replicationSpec.isNoop()){ // nothing to do here, silently return. + x.getLOG().debug("Current update with ID:{} is noop", + replicationSpec.getCurrentReplicationState()); return false; } @@ -208,11 +210,6 @@ public static boolean prepareImport( // If the parsed statement contained a db.tablename specification, prefer that. dbname = parsedDbName; } - if (dbsUpdated != null){ - dbsUpdated.put( - dbname, - Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); - } // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. @@ -223,7 +220,7 @@ public static boolean prepareImport( throw new HiveException(e); } - if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ tblDesc.setReplicationSpec(replicationSpec); } @@ -242,11 +239,6 @@ public static boolean prepareImport( if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){ tblDesc.setTableName(parsedTableName); } - if (tablesUpdated != null){ - tablesUpdated.put( - dbname + "." + tblDesc.getTableName(), - Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); - } List partitionDescs = new ArrayList(); Iterable partitions = rv.getPartitions(); @@ -305,8 +297,8 @@ public static boolean prepareImport( } else { createReplImportTasks( tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x); + replicationSpec, waitOnPrecursor, table, + fromURI, fs, wh, x, updatedMetadata); } return tableExists; } @@ -357,14 +349,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } - private static Task dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){ - return TaskFactory.get(new DDLWork( - x.getInputs(), - x.getOutputs(), - new DropTableDesc(table.getTableName(), null, true, true, null) - ), x.getConf()); - } - private static Task alterTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { tableDesc.setReplaceMode(true); @@ -790,12 +774,12 @@ private static void createRegularImportTasks( private static void createReplImportTasks( ImportTableDesc tblDesc, List partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor, + ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x) + EximUtil.SemanticAnalyzerWrapperContext x, + UpdatedMetaDataTracker updatedMetadata) throws HiveException, URISyntaxException, IOException, MetaException { - Task dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -813,16 +797,27 @@ private static void createReplImportTasks( if (table != null) { if (!replicationSpec.allowReplacementInto(table.getParameters())) { // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + x.getLOG().info("Table {}.{} is not replaced as it is newer than the update", + tblDesc.getDatabaseName(), tblDesc.getTableName()); return; } } else { // If table doesn't exist, allow creating a new one only if the database state is older than the update. if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) { // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + x.getLOG().info("Table {}.{} is not created as the database is newer than the update", + tblDesc.getDatabaseName(), tblDesc.getTableName()); return; } } + if (updatedMetadata != null) { + updatedMetadata.set(replicationSpec.getReplicationState(), + tblDesc.getDatabaseName(), + tblDesc.getTableName(), + null); + } + if (tblDesc.getLocation() == null) { if (!waitOnPrecursor){ tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString()); @@ -847,8 +842,6 @@ private static void createReplImportTasks( behave like a noop or a pure MD alter. */ if (table == null) { - // Either we're dropping and re-creating, or the table didn't exist, and we're creating. - if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } @@ -862,20 +855,17 @@ private static void createReplImportTasks( addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x)); } } - if (dr == null){ - // Simply create - x.getTasks().add(t); - } else { - // Drop and recreate - dr.addDependentTask(t); - x.getTasks().add(dr); - } + // Simply create + x.getTasks().add(t); } else { // Table existed, and is okay to replicate into, not dropping and re-creating. if (table.isPartitioned()) { @@ -889,6 +879,9 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { // If replicating, then the partition already existing means we need to replace, maybe, if @@ -901,14 +894,14 @@ private static void createReplImportTasks( x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } - } else { - // ignore this ptn, do nothing, not an error. } } - } if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){ // MD-ONLY table alter @@ -919,9 +912,6 @@ private static void createReplImportTasks( } } else { x.getLOG().debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table.getParameters())){ - return; // silently return, table is newer than our replacement. - } if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x); @@ -934,7 +924,6 @@ private static void createReplImportTasks( } } x.getOutputs().add(new WriteEntity(table,lockType)); - } public static boolean isPartitioned(ImportTableDesc tblDesc) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index d4fc340..3e2c513 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; @@ -45,6 +47,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.io.FileNotFoundException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -328,9 +331,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { int evstage = 0; int evIter = 0; - Long lastEvid = null; - Map dbsUpdated = new ReplicationSpec.ReplStateMap(); - Map tablesUpdated = new ReplicationSpec.ReplStateMap(); REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL", (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", @@ -360,8 +360,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { String locn = dir.getPath().toUri().toString(); DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); List> evTasks = analyzeEventLoad( - dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, - dbsUpdated, tablesUpdated, eventDmd); + dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd); evIter++; REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + "with ID: {}, Type: {}, Path: {}", @@ -380,80 +379,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; evstage++; - lastEvid = dmd.getEventTo(); } } - - // Now, we need to update repl.last.id for the various parent objects that were updated. - // This update logic will work differently based on what "level" REPL LOAD was run on. - // a) If this was a REPL LOAD at a table level, i.e. both dbNameOrPattern and - // tblNameOrPattern were specified, then the table is the only thing we should - // update the repl.last.id for. - // b) If this was a db-level REPL LOAD, then we should update the db, as well as any - // tables affected by partition level operations. (any table level ops will - // automatically be updated as the table gets updated. Note - renames will need - // careful handling. - // c) If this was a wh-level REPL LOAD, then we should update every db for which there - // were events occurring, as well as tables for which there were ptn-level ops - // happened. Again, renames must be taken care of. - // - // So, what we're going to do is have each event load update dbsUpdated and tablesUpdated - // accordingly, but ignore updates to tablesUpdated & dbsUpdated in the case of a - // table-level REPL LOAD, using only the table itself. In the case of a db-level REPL - // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore tblsUpdated. - // And for wh-level, we do no special processing, and use all of dbsUpdated and - // tblsUpdated as-is. - - // Additional Note - although this var says "dbNameOrPattern", on REPL LOAD side, - // we do not support a pattern It can be null or empty, in which case - // we re-use the existing name from the dump, or it can be specified, - // in which case we honour it. However, having this be a pattern is an error. - // Ditto for tblNameOrPattern. - - - if (evstage > 0){ - if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){ - // if tblNameOrPattern is specified, then dbNameOrPattern will be too, and - // thus, this is a table-level REPL LOAD - only table needs updating. - // If any of the individual events logged any other dbs as having changed, - // null them out. - dbsUpdated.clear(); - tablesUpdated.clear(); - tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, lastEvid); - } else if ((dbNameOrPattern != null) && (!dbNameOrPattern.isEmpty())){ - // if dbNameOrPattern is specified and tblNameOrPattern isn't, this is a - // db-level update, and thus, the database needs updating. In addition. - dbsUpdated.clear(); - dbsUpdated.put(dbNameOrPattern, lastEvid); - } - } - - for (String tableName : tablesUpdated.keySet()){ - // weird - AlterTableDesc requires a HashMap to update props instead of a Map. - HashMap mapProp = new HashMap<>(); - String eventId = tablesUpdated.get(tableName).toString(); - - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); - AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId)); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName(tableName); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterTblDesc), conf); - taskChainTail.addDependentTask(updateReplIdTask); - taskChainTail = updateReplIdTask; - } - for (String dbName : dbsUpdated.keySet()){ - Map mapProp = new HashMap<>(); - String eventId = dbsUpdated.get(dbName).toString(); - - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId)); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterDbDesc), conf); - taskChainTail.addDependentTask(updateReplIdTask); - taskChainTail = updateReplIdTask; - } rootTasks.add(evTaskRoot); REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " + "(DDL/COPY/MOVE) tasks", @@ -465,12 +392,13 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); } - } private List> analyzeEventLoad( - String dbName, String tblName, String location, Task precursor, - Map dbsUpdated, Map tablesUpdated, DumpMetaData dmd) + String dbName, String tblName, + String location, + Task precursor, + DumpMetaData dmd) throws SemanticException { MessageHandler.Context context = new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG); @@ -484,10 +412,110 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); } } - dbsUpdated.putAll(messageHandler.databasesUpdated()); - tablesUpdated.putAll(messageHandler.tablesUpdated()); inputs.addAll(messageHandler.readEntities()); outputs.addAll(messageHandler.writeEntities()); + return addUpdateReplStateTasks(StringUtils.isEmpty(tblName), + messageHandler.getUpdatedMetadata(), tasks); + } + + private Task tableUpdateReplStateTask( + String dbName, + String tableName, + Map partSpec, + String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(dbName + "." + tableName); + alterTblDesc.setPartSpec((HashMap)partSpec); + + Task updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterTblDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private Task dbUpdateReplStateTask( + String dbName, + String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc( + dbName, mapProp, new ReplicationSpec(replState, replState)); + Task updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterDbDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private List> addUpdateReplStateTasks( + boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetadata, + List> importTasks) { + String replState = updatedMetadata.getReplicationState(); + String dbName = updatedMetadata.getDatabase(); + String tableName = updatedMetadata.getTable(); + + // If no import tasks generated by the event or no table updated for table level load, then no + // need to update the repl state to any object. + if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) { + LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load"); + return importTasks; + } + + // Create a barrier task for dependency collection of import tasks + Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task t : importTasks){ + t.addDependentTask(barrierTask); + LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + + List> tasks = new ArrayList<>(); + Task updateReplIdTask; + + // If any partition is updated, then update repl state in partition object + for (final Map partSpec : updatedMetadata.getPartitions()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (tableName != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // At least one task would have been added to update the repl state return tasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 4badea6..1c54d29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -24,7 +24,6 @@ import javax.annotation.Nullable; import java.text.Collator; -import java.util.HashMap; import java.util.Map; /** @@ -39,6 +38,7 @@ private boolean isInReplicationScope = false; // default is that it's not in a repl scope private boolean isMetadataOnly = false; // default is full export/import, not metadata-only + private boolean isIncrementalDump = false; // default is replv2 bootstrap dump or replv1 export or import/load. private String eventId = null; private String currStateId = null; private boolean isNoop = false; @@ -71,34 +71,6 @@ public String toString(){ static private Collator collator = Collator.getInstance(); /** - * Class that extends HashMap with a slightly different put semantic, where - * put behaves as follows: - * a) If the key does not already exist, then retains existing HashMap.put behaviour - * b) If the map already contains an entry for the given key, then will replace only - * if the new value is "greater" than the old value. - * - * The primary goal for this is to track repl updates for dbs and tables, to replace state - * only if the state is newer. - */ - public static class ReplStateMap extends HashMap { - @Override - public V put(K k, V v){ - if (!containsKey(k)){ - return super.put(k,v); - } - V oldValue = get(k); - if (v.compareTo(oldValue) > 0){ - return super.put(k,v); - } - // we did no replacement, but return the old value anyway. This - // seems most consistent with HashMap behaviour, becuse the "put" - // was effectively processed and consumed, although we threw away - // the enw value. - return oldValue; - } - } - - /** * Constructor to construct spec based on either the ASTNode that * corresponds to the replication clause itself, or corresponds to * the parent node, and will scan through the children to instantiate @@ -134,14 +106,16 @@ public ReplicationSpec(){ } public ReplicationSpec(String fromId, String toId) { - this(true, false, fromId, toId, false, true, false); + this(true, false, false, fromId, toId, false, true, false); } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, + boolean isIncrementalDump, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy, boolean isReplace) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; + this.isIncrementalDump = isIncrementalDump; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; @@ -151,8 +125,9 @@ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, public ReplicationSpec(Function keyFetcher) { String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString()); - this.isMetadataOnly = false; this.isInReplicationScope = false; + this.isMetadataOnly = false; + this.isIncrementalDump = false; if (scope != null) { if (scope.equalsIgnoreCase("metadata")) { this.isMetadataOnly = true; @@ -258,6 +233,17 @@ public boolean isInReplicationScope(){ } /** + * @return true if this statement refers to incremental dump operation. + */ + public boolean isIncrementalDump(){ + return isIncrementalDump; + } + + public void setIsIncrementalDump(boolean isIncrementalDump){ + this.isIncrementalDump = isIncrementalDump; + } + + /** * @return true if this statement refers to metadata-only operation. */ public boolean isMetadataOnly(){ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index ae37c73..6a05ea4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -32,18 +32,20 @@ @Override public ReplicationSpec fromMetaStore() throws HiveException { try { + long currentNotificationId = db.getMSC() + .getCurrentNotificationEventId().getEventId(); ReplicationSpec replicationSpec = new ReplicationSpec( true, false, + false, "replv2", "will-be-set", false, true, false ); - long currentNotificationId = db.getMSC() - .getCurrentNotificationEventId().getEventId(); + replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId)); return replicationSpec; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 077d39b..2c7414f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -42,9 +42,13 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { if (additionalPropertiesProvider.isInReplicationScope()) { - partition.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - additionalPropertiesProvider.getCurrentReplicationState()); + // Current replication state must be set on the Partition object only for bootstrap dump. + // Event replication State will be null in case of bootstrap dump. + if (!additionalPropertiesProvider.isIncrementalDump()) { + partition.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + } if (isPartitionExternal()) { // Replication destination will not be external partition.putToParameters("EXTERNAL", "FALSE"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 948cb39..c443e53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -68,17 +68,21 @@ private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvide private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { if (additionalPropertiesProvider.isInReplicationScope()) { - table.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - additionalPropertiesProvider.getCurrentReplicationState()); + // Current replication state must be set on the Table object only for bootstrap dump. + // Event replication State will be null in case of bootstrap dump. + if (!additionalPropertiesProvider.isIncrementalDump()) { + table.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + } if (isExternalTable(table)) { // Replication destination will not be external - override if set table.putToParameters("EXTERNAL", "FALSE"); - } + } if (isExternalTableType(table)) { // Replication dest will not be external - override if set table.setTableType(TableType.MANAGED_TABLE.toString()); - } + } } else { // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java new file mode 100644 index 0000000..5714b21 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import java.util.ArrayList; +import java.util.Map; +import java.util.List; + +/** + * Utility class to help track and return the metadata which are updated by repl load + */ +public class UpdatedMetaDataTracker { + private String replState; + private String dbName; + private String tableName; + private List> partitionsList; + + public UpdatedMetaDataTracker() { + this.replState = null; + this.dbName = null; + this.tableName = null; + this.partitionsList = new ArrayList<>(); + } + + public void copyUpdatedMetadata(UpdatedMetaDataTracker other) { + this.replState = other.replState; + this.dbName = other.dbName; + this.tableName = other.tableName; + this.partitionsList = other.getPartitions(); + } + + public void set(String replState, String dbName, String tableName, Map partSpec) { + this.replState = replState; + this.dbName = dbName; + this.tableName = tableName; + if (partSpec != null) { + addPartition(partSpec); + } + } + + public void addPartition(Map partSpec) { + partitionsList.add(partSpec); + } + + public String getReplicationState() { + return replState; + } + + public String getDatabase() { + return dbName; + } + + public String getTable() { + return tableName; + } + + public List> getPartitions() { + return partitionsList; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java index d6a95bf..5b26681 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -21,19 +21,15 @@ import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; abstract class AbstractMessageHandler implements MessageHandler { final HashSet readEntitySet = new HashSet<>(); final HashSet writeEntitySet = new HashSet<>(); - final Map tablesUpdated = new HashMap<>(), - databasesUpdated = new HashMap<>(); + final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker(); final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); @Override @@ -47,13 +43,6 @@ } @Override - public Map tablesUpdated() { - return tablesUpdated; - } - - @Override - public Map databasesUpdated() { - return databasesUpdated; - } + public UpdatedMetaDataTracker getUpdatedMetadata() { return updatedMetadata; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 39697bb..0873c1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -64,8 +64,7 @@ List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(addConstraintsTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index e2c1d1d..76cbe5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -65,8 +65,7 @@ List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(addConstraintsTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index 7babb6a..aee46da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -62,8 +62,7 @@ List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(addConstraintsTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index e7b404a..f0cb11e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -63,8 +63,7 @@ List> tasks = new ArrayList>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(addConstraintsTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index a6d35cf..3f176aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -67,7 +67,8 @@ // bootstrap.There should be a better way to do this but might required a lot of changes across // different handlers, unless this is a common pattern that is seen, leaving this here. if (context.dmd != null) { - databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), builder.destinationDbName, + null, null); } readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf)); if (builder.replCopyTasks.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java index 58aa214..459fac5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java @@ -44,8 +44,7 @@ List> tasks = new ArrayList>(); tasks.add(dropConstraintsTask); context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropConstraintsTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java index dae300f..fee2bb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -43,7 +43,7 @@ context.log.debug( "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() ); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); return Collections.singletonList(dropFunctionTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 771400e..5456416 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -58,8 +58,7 @@ ); context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), dropPtnDesc.getTableName(), msg.getPartitions()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropPtnTask); } else { throw new SemanticException( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index 3ee3949..d2f5248 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -46,7 +46,7 @@ context.log.debug( "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName() ); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); return Collections.singletonList(dropTableTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java index 40ed0b2..d412fd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -40,8 +40,7 @@ List> tasks = tableHandler.handle(currentContext); readEntitySet.addAll(tableHandler.readEntities()); writeEntitySet.addAll(tableHandler.writeEntities()); - databasesUpdated.putAll(tableHandler.databasesUpdated); - tablesUpdated.putAll(tableHandler.tablesUpdated); + getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata()); return tasks; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index 33c716f..8daff6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -25,11 +25,11 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.slf4j.Logger; import java.io.Serializable; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -42,9 +42,7 @@ Set writeEntities(); - Map tablesUpdated(); - - Map databasesUpdated(); + UpdatedMetaDataTracker getUpdatedMetadata(); class Context { final String dbName, tableName, location; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 5bd0532..43f2cbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -65,8 +65,7 @@ new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); context.log.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(tableName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec); return Collections.singletonList(renamePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 4785e55..e30abad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -65,9 +65,8 @@ renameTableTask.getId(), oldName, newName); // oldDbName and newDbName *will* be the same if we're here - databasesUpdated.put(newDbName, context.dmd.getEventTo()); - tablesUpdated.remove(oldName); - tablesUpdated.put(newName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), newDbName, + msg.getTableObjAfter().getTableName(), null); // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 65e1d6a..2c5c2d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -47,8 +47,7 @@ // Also, REPL LOAD doesn't support external table and hence no location set as well. ImportSemanticAnalyzer.prepareImport(false, false, false, (context.precursor != null), null, context.tableName, context.dbName, - null, context.location, x, - databasesUpdated, tablesUpdated); + null, context.location, x, updatedMetadata); return importTasks; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 3a8990a..b983f95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -62,8 +62,7 @@ context.hiveConf); context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); return Collections.singletonList(truncatePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 93ffa29..c6d7739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -44,8 +44,7 @@ context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(truncateTableTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java index 368db0f..ce63b38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java @@ -28,7 +28,7 @@ * AlterDatabaseDesc. * */ -@Explain(displayName = "Create Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +@Explain(displayName = "Alter Database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class AlterDatabaseDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L;