diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 6250ad6..0e9cd26 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; @@ -309,13 +308,6 @@ public void testBasicWithCM() throws Exception { run("DROP TABLE " + dbName + ".unptned"); // Partition droppped after "repl dump" run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)"); - // File changed after "repl dump" - Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2"); - Path loc = new Path(p.getSd().getLocation()); - FileSystem fs = loc.getFileSystem(hconf); - Path file = fs.listStatus(loc)[0].getPath(); - fs.delete(file, false); - fs.copyFromLocalFile(new Path(ptn_locn_2_later), file); run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); printOutput(); @@ -328,10 +320,8 @@ public void testBasicWithCM() throws Exception { verifyResults(unptn_data); run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); verifyResults(ptn_data_1); - // Since partition(b=2) changed manually, Hive cannot find - // it in original location and cmroot, thus empty run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(empty); + verifyResults(ptn_data_2); run("SELECT a from " + dbName + ".ptned_empty"); verifyResults(empty); run("SELECT * from " + dbName + ".unptned_empty"); @@ -2224,8 +2214,8 @@ public void testTruncateWithCM() throws IOException { } @Test - public void testIncrementalRepeatEventOnExistingObject() throws IOException { - String testName = "incrementalRepeatEventOnExistingObject"; + public void testIncrementalEventRepeatOnExistingObject() throws IOException { + String testName = "incrementalEventRepeatOnExistingObject"; String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); @@ -2301,8 +2291,8 @@ public void testIncrementalRepeatEventOnExistingObject() throws IOException { } @Test - public void testIncrementalRepeatEventOnMissingObject() throws IOException { - String testName = "incrementalRepeatEventOnMissingObject"; + public void testIncrementalEventRepeatOnMissingObject() throws IOException { + String testName = "incrementalEventRepeatOnMissingObject"; String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); @@ -2460,32 +2450,58 @@ public void testConcatenatePartitionedTable() throws IOException { // Replicate all the events happened so far Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); - verifySetup("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); - verifySetup("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); } @Test - public void testStatus() throws IOException { - // first test ReplStateMap functionality - Map cmap = new ReplStateMap(); + public void testIncrementalLoadFailAndRetry() throws IOException { + String testName = "incrementalLoadFailAndRetry"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); - Long oldV; - oldV = cmap.put("a",1L); - assertEquals(1L,cmap.get("a").longValue()); - assertEquals(null,oldV); + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - cmap.put("b",2L); - oldV = cmap.put("b",-2L); - assertEquals(2L, cmap.get("b").longValue()); - assertEquals(2L, oldV.longValue()); + // Prefixed with incrementalLoadFailAndRetry to avoid finding entry in cmpath + String[] ptn_data_1 = new String[] { "incrementalLoadFailAndRetry_fifteen" }; + String[] empty = new String[] {}; - cmap.put("c",3L); - oldV = cmap.put("c",33L); - assertEquals(33L, cmap.get("c").longValue()); - assertEquals(3L, oldV.longValue()); + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + run("CREATE TABLE " + dbName + ".ptned_tmp AS SELECT * FROM " + dbName + ".ptned"); - // Now, to actually testing status - first, we bootstrap. + // Move the data files of this newly created partition to a temp location + Partition ptn = null; + try { + ptn = metaStoreClient.getPartition(dbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + } catch (Exception e) { + assert(false); + } + + Path ptnLoc = new Path(ptn.getSd().getLocation()); + Path tmpLoc = new Path(TEST_PATH + "/incrementalLoadFailAndRetry"); + FileSystem dataFs = ptnLoc.getFileSystem(hconf); + assert(dataFs.rename(ptnLoc, tmpLoc)); + // Replicate all the events happened so far. It should fail as the data files missing in + // original path and not available in CM as well. + Tuple incrDump = replDumpDb(dbName, bootstrapDump.lastReplId, null, null); + verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'"); + + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty); + verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a"); + + // Move the files back to original data location + assert(dataFs.rename(tmpLoc, ptnLoc)); + loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId); + + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1); + } + + @Test + public void testStatus() throws IOException { String name = testName.getMethodName(); String dbName = createDB(name); advanceDumpDir(); @@ -2762,7 +2778,6 @@ private void verifyIfTableNotExist(String dbName, String tableName){ } private void verifyIfTableExist(String dbName, String tableName){ - Exception e = null; try { Table tbl = metaStoreClient.getTable(dbName, tableName); assertNotNull(tbl); @@ -2784,7 +2799,6 @@ private void verifyIfPartitionNotExist(String dbName, String tableName, List partValues){ - Exception e = null; try { Partition ptn = metaStoreClient.getPartition(dbName, tableName, partValues); assertNotNull(ptn); @@ -2816,8 +2830,9 @@ private void verifyFail(String cmd) throws RuntimeException { } catch (AssertionError ae){ LOG.warn("AssertionError:",ae); throw new RuntimeException(ae); + } catch (Exception e) { + success = false; } - assertFalse(success); } @@ -2850,7 +2865,7 @@ private static boolean run(String cmd, boolean errorOnFail) throws RuntimeExcept boolean success = false; try { CommandProcessorResponse ret = driver.run(cmd); - success = (ret.getException() == null); + success = ((ret.getException() == null) && (ret.getErrorMessage() == null)); if (!success){ LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), ret.getErrorMessage(), cmd); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 97bf839..3f61283 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3627,7 +3627,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { if (allPartitions == null) { db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), alterTbl.getEnvironmentContext()); } else { - db.alterPartitions(tbl.getTableName(), allPartitions, alterTbl.getEnvironmentContext()); + db.alterPartitions(alterTbl.getOldName(), allPartitions, alterTbl.getEnvironmentContext()); } // Add constraints if necessary addConstraints(db, alterTbl); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 7330f56..296d003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -187,9 +187,8 @@ protected int execute(DriverContext driverContext) { // If the srcMap is not empty which means we made the list of files for distCp. // If there are files from different filesystems, then the map will have multiple entries. if (!srcMap.isEmpty()) { - for (final HashMap.Entry> entry : srcMap.entrySet()) { - FileSystem actualSrcFs = entry.getKey(); - List srcPaths = entry.getValue(); + for (final FileSystem actualSrcFs : srcMap.keySet()) { + List srcPaths = srcMap.get(actualSrcFs); if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) { console.printError("Failed to copy: " + srcPaths.size() + " files to: '" + toPath.toString() + "'"); @@ -257,8 +256,9 @@ private static boolean isLocalFileSystem(FileSystem fs) { fileWithChksum[1], conf); ret.add(f); } catch (MetaException e) { - // skip and issue warning for missing file + // issue warning for missing file and throw exception LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot"); + throw new IOException(e.getMessage()); } // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f9bdff8..d48da0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -274,14 +274,13 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except } private ReplicationSpec getNewReplicationSpec() throws TException { - ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); - rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC() - .getCurrentNotificationEventId().getEventId())); - return rspec; + return getNewReplicationSpec(null, + String.valueOf(getHive().getMSC().getCurrentNotificationEventId().getEventId())); } private ReplicationSpec getNewReplicationSpec(String evState, String objState) { - return new ReplicationSpec(true, false, evState, objState, false, true, true); + return new ReplicationSpec(true, false, evState, objState, + false, true, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index f3f206b..143d730 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -59,10 +59,10 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -142,7 +142,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), - null, null); + null); } catch (SemanticException e) { throw e; @@ -179,7 +179,7 @@ public static boolean prepareImport( String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, - Map dbsUpdated, Map tablesUpdated + UpdatedMetaDataTracker updatedMetadata ) throws IOException, MetaException, HiveException, URISyntaxException { // initialize load path @@ -199,6 +199,8 @@ public static boolean prepareImport( ReplicationSpec replicationSpec = rv.getReplicationSpec(); if (replicationSpec.isNoop()){ // nothing to do here, silently return. + x.getLOG().debug("Current update with ID:{} is noop", + replicationSpec.getCurrentReplicationState()); return false; } @@ -207,11 +209,6 @@ public static boolean prepareImport( // If the parsed statement contained a db.tablename specification, prefer that. dbname = parsedDbName; } - if (dbsUpdated != null){ - dbsUpdated.put( - dbname, - Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); - } // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. @@ -222,7 +219,7 @@ public static boolean prepareImport( throw new HiveException(e); } - if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ tblDesc.setReplicationSpec(replicationSpec); } @@ -241,11 +238,6 @@ public static boolean prepareImport( if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){ tblDesc.setTableName(parsedTableName); } - if (tablesUpdated != null){ - tablesUpdated.put( - dbname + "." + tblDesc.getTableName(), - Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); - } List partitionDescs = new ArrayList(); Iterable partitions = rv.getPartitions(); @@ -304,8 +296,8 @@ public static boolean prepareImport( } else { createReplImportTasks( tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x); + replicationSpec, waitOnPrecursor, table, + fromURI, fs, wh, x, updatedMetadata); } return tableExists; } @@ -356,14 +348,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return tableDesc.getCreateTableTask(x); } - private static Task dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){ - return TaskFactory.get(new DDLWork( - x.getInputs(), - x.getOutputs(), - new DropTableDesc(table.getTableName(), null, true, true, null) - ), x.getConf()); - } - private static Task alterTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { tableDesc.setReplaceMode(true); @@ -788,12 +772,12 @@ private static void createRegularImportTasks( private static void createReplImportTasks( ImportTableDesc tblDesc, List partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor, + ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x) + EximUtil.SemanticAnalyzerWrapperContext x, + UpdatedMetaDataTracker updatedMetadata) throws HiveException, URISyntaxException, IOException, MetaException { - Task dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -811,16 +795,27 @@ private static void createReplImportTasks( if (table != null) { if (!replicationSpec.allowReplacementInto(table.getParameters())) { // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + x.getLOG().info("Table {}.{} is not replaced as it is newer than the update", + tblDesc.getDatabaseName(), tblDesc.getTableName()); return; } } else { // If table doesn't exist, allow creating a new one only if the database state is older than the update. if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) { // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + x.getLOG().info("Table {}.{} is not created as the database is newer than the update", + tblDesc.getDatabaseName(), tblDesc.getTableName()); return; } } + if (updatedMetadata != null) { + updatedMetadata.set(replicationSpec.getReplicationState(), + tblDesc.getDatabaseName(), + tblDesc.getTableName(), + null); + } + if (tblDesc.getLocation() == null) { if (!waitOnPrecursor){ tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString()); @@ -845,8 +840,6 @@ private static void createReplImportTasks( behave like a noop or a pure MD alter. */ if (table == null) { - // Either we're dropping and re-creating, or the table didn't exist, and we're creating. - if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } @@ -860,20 +853,17 @@ private static void createReplImportTasks( addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x)); } } - if (dr == null){ - // Simply create - x.getTasks().add(t); - } else { - // Drop and recreate - dr.addDependentTask(t); - x.getTasks().add(dr); - } + // Simply create + x.getTasks().add(t); } else { // Table existed, and is okay to replicate into, not dropping and re-creating. if (table.isPartitioned()) { @@ -887,6 +877,9 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { // If replicating, then the partition already existing means we need to replace, maybe, if @@ -899,14 +892,14 @@ private static void createReplImportTasks( x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } - } else { - // ignore this ptn, do nothing, not an error. } } - } if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){ // MD-ONLY table alter @@ -917,9 +910,6 @@ private static void createReplImportTasks( } } else { x.getLOG().debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table.getParameters())){ - return; // silently return, table is newer than our replacement. - } if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x); @@ -932,7 +922,6 @@ private static void createReplImportTasks( } } x.getOutputs().add(new WriteEntity(table,lockType)); - } private static boolean isPartitioned(ImportTableDesc tblDesc) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 48d9c94..932d53f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -20,6 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import org.antlr.runtime.tree.Tree; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +39,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; @@ -281,7 +283,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { // not an event dump, and table name pattern specified, this has to be a tbl-level dump - rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null, null, null)); + rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null)); return; } @@ -323,9 +325,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { int evstage = 0; int evIter = 0; - Long lastEvid = null; - Map dbsUpdated = new ReplicationSpec.ReplStateMap(); - Map tablesUpdated = new ReplicationSpec.ReplStateMap(); REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL", (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", @@ -355,8 +354,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { String locn = dir.getPath().toUri().toString(); DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); List> evTasks = analyzeEventLoad( - dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, - dbsUpdated, tablesUpdated, eventDmd); + dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd); evIter++; REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + "with ID: {}, Type: {}, Path: {}", @@ -375,80 +373,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; evstage++; - lastEvid = dmd.getEventTo(); } } - // Now, we need to update repl.last.id for the various parent objects that were updated. - // This update logic will work differently based on what "level" REPL LOAD was run on. - // a) If this was a REPL LOAD at a table level, i.e. both dbNameOrPattern and - // tblNameOrPattern were specified, then the table is the only thing we should - // update the repl.last.id for. - // b) If this was a db-level REPL LOAD, then we should update the db, as well as any - // tables affected by partition level operations. (any table level ops will - // automatically be updated as the table gets updated. Note - renames will need - // careful handling. - // c) If this was a wh-level REPL LOAD, then we should update every db for which there - // were events occurring, as well as tables for which there were ptn-level ops - // happened. Again, renames must be taken care of. - // - // So, what we're going to do is have each event load update dbsUpdated and tablesUpdated - // accordingly, but ignore updates to tablesUpdated & dbsUpdated in the case of a - // table-level REPL LOAD, using only the table itself. In the case of a db-level REPL - // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore tblsUpdated. - // And for wh-level, we do no special processing, and use all of dbsUpdated and - // tblsUpdated as-is. - - // Additional Note - although this var says "dbNameOrPattern", on REPL LOAD side, - // we do not support a pattern It can be null or empty, in which case - // we re-use the existing name from the dump, or it can be specified, - // in which case we honour it. However, having this be a pattern is an error. - // Ditto for tblNameOrPattern. - - - if (evstage > 0){ - if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){ - // if tblNameOrPattern is specified, then dbNameOrPattern will be too, and - // thus, this is a table-level REPL LOAD - only table needs updating. - // If any of the individual events logged any other dbs as having changed, - // null them out. - dbsUpdated.clear(); - tablesUpdated.clear(); - tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, lastEvid); - } else if ((dbNameOrPattern != null) && (!dbNameOrPattern.isEmpty())){ - // if dbNameOrPattern is specified and tblNameOrPattern isn't, this is a - // db-level update, and thus, the database needs updating. In addition. - dbsUpdated.clear(); - dbsUpdated.put(dbNameOrPattern, lastEvid); - } - } - - for (String tableName : tablesUpdated.keySet()){ - // weird - AlterTableDesc requires a HashMap to update props instead of a Map. - HashMap mapProp = new HashMap(); - String eventId = tablesUpdated.get(tableName).toString(); - - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); - AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId)); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName(tableName); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterTblDesc), conf); - taskChainTail.addDependentTask(updateReplIdTask); - taskChainTail = updateReplIdTask; - } - for (String dbName : dbsUpdated.keySet()){ - Map mapProp = new HashMap(); - String eventId = dbsUpdated.get(dbName).toString(); - - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId)); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterDbDesc), conf); - taskChainTail.addDependentTask(updateReplIdTask); - taskChainTail = updateReplIdTask; - } rootTasks.add(evTaskRoot); REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " + "(DDL/COPY/MOVE) tasks", @@ -460,12 +387,13 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); } - } private List> analyzeEventLoad( - String dbName, String tblName, String location, Task precursor, - Map dbsUpdated, Map tablesUpdated, DumpMetaData dmd) + String dbName, String tblName, + String location, + Task precursor, + DumpMetaData dmd) throws SemanticException { MessageHandler.Context context = new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG); @@ -479,10 +407,110 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); } } - dbsUpdated.putAll(messageHandler.databasesUpdated()); - tablesUpdated.putAll(messageHandler.tablesUpdated()); inputs.addAll(messageHandler.readEntities()); outputs.addAll(messageHandler.writeEntities()); + return addUpdateReplStateTasks(StringUtils.isEmpty(tblName), + messageHandler.getUpdatedMetadata(), tasks); + } + + private Task tableUpdateReplStateTask( + String dbName, + String tableName, + Map partSpec, + String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(dbName + "." + tableName); + alterTblDesc.setPartSpec((HashMap)partSpec); + + Task updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterTblDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private Task dbUpdateReplStateTask( + String dbName, + String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc( + dbName, mapProp, new ReplicationSpec(replState, replState)); + Task updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterDbDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private List> addUpdateReplStateTasks( + boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetadata, + List> importTasks) { + String replState = updatedMetadata.getReplicationState(); + String dbName = updatedMetadata.getDatabase(); + String tableName = updatedMetadata.getTable(); + + // If no import tasks generated by the event or no table updated for table level load, then no + // need to update the repl state to any object. + if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) { + LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load"); + return importTasks; + } + + // Create a barrier task for dependency collection of import tasks + Task barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task t : importTasks){ + t.addDependentTask(barrierTask); + LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + + List> tasks = new ArrayList<>(); + Task updateReplIdTask; + + // If any partition is updated, then update repl state in partition object + for (final Map partSpec : updatedMetadata.getPartitions()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (tableName != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // At least one task would have been added to update the repl state return tasks; } @@ -500,10 +528,8 @@ private boolean existEmptyDb(String dbName) throws InvalidOperationException, Hi throw new InvalidOperationException( "Database " + db.getName() + " is not empty. One or more functions exist."); } - return true; } - return false; } @@ -565,7 +591,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) { analyzeTableLoad( - dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null); + dbName, null, tableDir.getPath().toUri().toString(), dbRootTask); REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}", tableDir.getPath().toUri().toString()); } @@ -625,8 +651,7 @@ private void analyzeFunctionLoad(String dbName, FileStatus functionDir, private List> analyzeTableLoad( String dbName, String tblName, String locn, - Task precursor, - Map dbsUpdated, Map tablesUpdated) throws SemanticException { + Task precursor) throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. // or are both specified, in which case, that's what we are intended to create the new table as. @@ -651,7 +676,7 @@ private void analyzeFunctionLoad(String dbName, FileStatus functionDir, ctx); ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x, - dbsUpdated, tablesUpdated); + null); if (precursor != null) { for (Task t : importTasks) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 4badea6..5e0d97f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -24,7 +24,6 @@ import javax.annotation.Nullable; import java.text.Collator; -import java.util.HashMap; import java.util.Map; /** @@ -71,34 +70,6 @@ public String toString(){ static private Collator collator = Collator.getInstance(); /** - * Class that extends HashMap with a slightly different put semantic, where - * put behaves as follows: - * a) If the key does not already exist, then retains existing HashMap.put behaviour - * b) If the map already contains an entry for the given key, then will replace only - * if the new value is "greater" than the old value. - * - * The primary goal for this is to track repl updates for dbs and tables, to replace state - * only if the state is newer. - */ - public static class ReplStateMap extends HashMap { - @Override - public V put(K k, V v){ - if (!containsKey(k)){ - return super.put(k,v); - } - V oldValue = get(k); - if (v.compareTo(oldValue) > 0){ - return super.put(k,v); - } - // we did no replacement, but return the old value anyway. This - // seems most consistent with HashMap behaviour, becuse the "put" - // was effectively processed and consumed, although we threw away - // the enw value. - return oldValue; - } - } - - /** * Constructor to construct spec based on either the ASTNode that * corresponds to the replication clause itself, or corresponds to * the parent node, and will scan through the children to instantiate diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 077d39b..4e8e9bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -42,9 +42,13 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { if (additionalPropertiesProvider.isInReplicationScope()) { - partition.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - additionalPropertiesProvider.getCurrentReplicationState()); + // Current replication state must be set on the Partition object only for bootstrap dump. + // Event replication State will be null in case of bootstrap dump. + if (null == additionalPropertiesProvider.getReplicationState()) { + partition.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + } if (isPartitionExternal()) { // Replication destination will not be external partition.putToParameters("EXTERNAL", "FALSE"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 948cb39..dcbce29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -68,17 +68,21 @@ private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvide private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { if (additionalPropertiesProvider.isInReplicationScope()) { - table.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - additionalPropertiesProvider.getCurrentReplicationState()); + // Current replication state must be set on the Table object only for bootstrap dump. + // Event replication State will be null in case of bootstrap dump. + if (null == additionalPropertiesProvider.getReplicationState()) { + table.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + } if (isExternalTable(table)) { // Replication destination will not be external - override if set table.putToParameters("EXTERNAL", "FALSE"); - } + } if (isExternalTableType(table)) { // Replication dest will not be external - override if set table.setTableType(TableType.MANAGED_TABLE.toString()); - } + } } else { // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java new file mode 100644 index 0000000..5714b21 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import java.util.ArrayList; +import java.util.Map; +import java.util.List; + +/** + * Utility class to help track and return the metadata which are updated by repl load + */ +public class UpdatedMetaDataTracker { + private String replState; + private String dbName; + private String tableName; + private List> partitionsList; + + public UpdatedMetaDataTracker() { + this.replState = null; + this.dbName = null; + this.tableName = null; + this.partitionsList = new ArrayList<>(); + } + + public void copyUpdatedMetadata(UpdatedMetaDataTracker other) { + this.replState = other.replState; + this.dbName = other.dbName; + this.tableName = other.tableName; + this.partitionsList = other.getPartitions(); + } + + public void set(String replState, String dbName, String tableName, Map partSpec) { + this.replState = replState; + this.dbName = dbName; + this.tableName = tableName; + if (partSpec != null) { + addPartition(partSpec); + } + } + + public void addPartition(Map partSpec) { + partitionsList.add(partSpec); + } + + public String getReplicationState() { + return replState; + } + + public String getDatabase() { + return dbName; + } + + public String getTable() { + return tableName; + } + + public List> getPartitions() { + return partitionsList; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java index d6a95bf..01eaba3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -21,19 +21,14 @@ import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; abstract class AbstractMessageHandler implements MessageHandler { final HashSet readEntitySet = new HashSet<>(); final HashSet writeEntitySet = new HashSet<>(); - final Map tablesUpdated = new HashMap<>(), - databasesUpdated = new HashMap<>(); + final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker(); final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); @Override @@ -47,13 +42,6 @@ } @Override - public Map tablesUpdated() { - return tablesUpdated; - } - - @Override - public Map databasesUpdated() { - return databasesUpdated; - } + public UpdatedMetaDataTracker getUpdatedMetadata() { return updatedMetadata; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index a6d35cf..3f176aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -67,7 +67,8 @@ // bootstrap.There should be a better way to do this but might required a lot of changes across // different handlers, unless this is a common pattern that is seen, leaving this here. if (context.dmd != null) { - databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), builder.destinationDbName, + null, null); } readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf)); if (builder.replCopyTasks.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java index dae300f..fee2bb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -43,7 +43,7 @@ context.log.debug( "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() ); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); return Collections.singletonList(dropFunctionTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 771400e..5456416 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -58,8 +58,7 @@ ); context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), dropPtnDesc.getTableName(), msg.getPartitions()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropPtnTask); } else { throw new SemanticException( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index 3ee3949..d2f5248 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -46,7 +46,7 @@ context.log.debug( "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName() ); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); return Collections.singletonList(dropTableTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java index 40ed0b2..d412fd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -40,8 +40,7 @@ List> tasks = tableHandler.handle(currentContext); readEntitySet.addAll(tableHandler.readEntities()); writeEntitySet.addAll(tableHandler.writeEntities()); - databasesUpdated.putAll(tableHandler.databasesUpdated); - tablesUpdated.putAll(tableHandler.tablesUpdated); + getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata()); return tasks; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index 33c716f..8daff6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -25,11 +25,11 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.slf4j.Logger; import java.io.Serializable; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -42,9 +42,7 @@ Set writeEntities(); - Map tablesUpdated(); - - Map databasesUpdated(); + UpdatedMetaDataTracker getUpdatedMetadata(); class Context { final String dbName, tableName, location; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 5bd0532..43f2cbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -65,8 +65,7 @@ new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); context.log.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(tableName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec); return Collections.singletonList(renamePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 4785e55..e30abad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -65,9 +65,8 @@ renameTableTask.getId(), oldName, newName); // oldDbName and newDbName *will* be the same if we're here - databasesUpdated.put(newDbName, context.dmd.getEventTo()); - tablesUpdated.remove(oldName); - tablesUpdated.put(newName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), newDbName, + msg.getTableObjAfter().getTableName(), null); // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 65e1d6a..2c5c2d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -47,8 +47,7 @@ // Also, REPL LOAD doesn't support external table and hence no location set as well. ImportSemanticAnalyzer.prepareImport(false, false, false, (context.precursor != null), null, context.tableName, context.dbName, - null, context.location, x, - databasesUpdated, tablesUpdated); + null, context.location, x, updatedMetadata); return importTasks; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 3a8990a..b983f95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -62,8 +62,7 @@ context.hiveConf); context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); return Collections.singletonList(truncatePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 93ffa29..c6d7739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -44,8 +44,7 @@ context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); - databasesUpdated.put(actualDbName, context.dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(truncateTableTask); } }