diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3d05db2949..7528f27f8c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1237,9 +1237,8 @@ public Boolean apply(CallerArguments args) { assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size()); // Retry with different dump should fail. - CommandProcessorResponse ret = replica.runCommand("REPL LOAD " + replicatedDbName + - " FROM '" + tuple2.dumpLocation + "'"); - Assert.assertEquals(ret.getResponseCode(), ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); + replica.loadFailure(replicatedDbName, tuple2.dumpLocation, null, + ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); // Verify if create table is not called on table t1 but called for t2 and t3. // Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 83f38fa6a9..a5d1032231 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -548,6 +549,35 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { ); } + @Test + public void retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesConfig() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create table t2 as select * from t1") + .dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); + WarehouseInstance.Tuple tupleNewIncWithExternalBootstrap + = primary.dump(primaryDbName, tupleBootstrapWithoutExternal.lastReplicationId, dumpWithClause); + + replica.load(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause); + + // Re-bootstrapping from different bootstrap dump without clean tables config should fail. + replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause, + ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); + } + private void assertExternalFileInfo(List expected, Path externalTableInfoFile) throws IOException { DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 56eae91546..c76d30c884 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -243,6 +243,18 @@ WarehouseInstance runFailure(String command) throws Throwable { return this; } + WarehouseInstance runFailure(String command, int errorCode) throws Throwable { + CommandProcessorResponse ret = driver.run(command); + if (ret.getException() == null) { + throw new RuntimeException("command execution passed for a invalid command" + command); + } + if (ret.getResponseCode() != errorCode) { + throw new RuntimeException("Command: " + command + " returned incorrect error code: " + + ret.getResponseCode() + " instead of " + errorCode); + } + return this; + } + Tuple dump(String dbName, String lastReplicationId, List withClauseOptions) throws Throwable { String dumpCommand = @@ -288,7 +300,7 @@ WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocatio WarehouseInstance load(String replicatedDbName, String dumpLocation, List withClauseOptions) throws Throwable { String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; - if (!withClauseOptions.isEmpty()) { + if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) { replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; } run("EXPLAIN " + replLoadCmd); @@ -303,26 +315,35 @@ WarehouseInstance status(String replicatedDbName) throws Throwable { WarehouseInstance status(String replicatedDbName, List withClauseOptions) throws Throwable { String replStatusCmd = "REPL STATUS " + replicatedDbName; - if (!withClauseOptions.isEmpty()) { + if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) { replStatusCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; } return run(replStatusCmd); } WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable { - runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + loadFailure(replicatedDbName, dumpLocation, null); return this; } WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List withClauseOptions) throws Throwable { String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; - if (!withClauseOptions.isEmpty()) { + if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) { replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; } return runFailure(replLoadCmd); } + WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List withClauseOptions, + int errorCode) throws Throwable { + String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; + if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) { + replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return runFailure(replLoadCmd, errorCode); + } + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index c7828db1df..c892b40224 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -60,30 +60,26 @@ public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty(); } - public TaskTracker tasks() throws SemanticException { - try { - Database dbInMetadata = readDbMetadata(); - String dbName = dbInMetadata.getName(); - Task dbRootTask = null; - ReplLoadOpType loadDbType = getLoadDbType(dbName); - switch (loadDbType) { - case LOAD_NEW: - dbRootTask = createDbTask(dbInMetadata); - break; - case LOAD_REPLACE: - dbRootTask = alterDbTask(dbInMetadata); - break; - default: - break; - } - if (dbRootTask != null) { - dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); - tracker.addTask(dbRootTask); - } - return tracker; - } catch (Exception e) { - throw new SemanticException(e.getMessage(), e); + public TaskTracker tasks() throws Exception { + Database dbInMetadata = readDbMetadata(); + String dbName = dbInMetadata.getName(); + Task dbRootTask = null; + ReplLoadOpType loadDbType = getLoadDbType(dbName); + switch (loadDbType) { + case LOAD_NEW: + dbRootTask = createDbTask(dbInMetadata); + break; + case LOAD_REPLACE: + dbRootTask = alterDbTask(dbInMetadata); + break; + default: + break; + } + if (dbRootTask != null) { + dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); + tracker.addTask(dbRootTask); } + return tracker; } Database readDbMetadata() throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index fa72527de2..c1773c93cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -101,22 +101,35 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); } - public TaskTracker tasks() throws SemanticException { - try { - /* - We are doing this both in load table and load partitions - */ - Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); - LoadTable.TableLocationTuple tableLocationTuple = - LoadTable.tableLocation(tableDesc, parentDb, tableContext, context); - tableDesc.setLocation(tableLocationTuple.location); - - if (table == null) { - //new table - table = tableDesc.toTable(context.hiveConf); - if (isPartitioned(tableDesc)) { + public TaskTracker tasks() throws Exception { + /* + We are doing this both in load table and load partitions + */ + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + LoadTable.TableLocationTuple tableLocationTuple = + LoadTable.tableLocation(tableDesc, parentDb, tableContext, context); + tableDesc.setLocation(tableLocationTuple.location); + + if (table == null) { + //new table + table = tableDesc.toTable(context.hiveConf); + if (isPartitioned(tableDesc)) { + updateReplicationState(initialReplicationState()); + if (!forNewTable().hasReplicationState()) { + // Add ReplStateLogTask only if no pending table load tasks left for next cycle + Task replLogTask + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + tracker.addDependentTask(replLogTask); + } + return tracker; + } + } else { + // existing + if (table.isPartitioned()) { + List partitionDescs = event.partitionDescriptions(tableDesc); + if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) { updateReplicationState(initialReplicationState()); - if (!forNewTable().hasReplicationState()) { + if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle Task replLogTask = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); @@ -124,26 +137,9 @@ public TaskTracker tasks() throws SemanticException { } return tracker; } - } else { - // existing - if (table.isPartitioned()) { - List partitionDescs = event.partitionDescriptions(tableDesc); - if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) { - updateReplicationState(initialReplicationState()); - if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { - // Add ReplStateLogTask only if no pending table load tasks left for next cycle - Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); - tracker.addDependentTask(replLogTask); - } - return tracker; - } - } } - return tracker; - } catch (Exception e) { - throw new SemanticException(e); } + return tracker; } private void updateReplicationState(ReplicationState replicationState) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 0d1a88c609..3b0b67aeff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -84,83 +84,79 @@ public LoadTable(TableEvent event, Context context, ReplLogger replLogger, this.tracker = new TaskTracker(limiter); } - public TaskTracker tasks() throws SemanticException { + public TaskTracker tasks() throws Exception { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. // or are both specified, in which case, that's what we are intended to create the new table as. - try { - if (event.shouldNotReplicate()) { - return tracker; - } - String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty; - // Create table associated with the import - // Executed if relevant, and used to contain all the other details about the table if not. - ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); - Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + if (event.shouldNotReplicate()) { + return tracker; + } + String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty; + // Create table associated with the import + // Executed if relevant, and used to contain all the other details about the table if not. + ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); + Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); - // Normally, on import, trying to create a table or a partition in a db that does not yet exist - // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying - // to create tasks to create a table inside a db that as-of-now does not exist, but there is - // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate - // defaults and do not error out in that case. - // the above will change now since we are going to split replication load in multiple execution - // tasks and hence we could have created the database earlier in which case the waitOnPrecursor will - // be false and hence if db Not found we should error out. - Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); - if (parentDb == null) { - if (!tableContext.waitOnPrecursor()) { - throw new SemanticException( - ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName())); - } + // Normally, on import, trying to create a table or a partition in a db that does not yet exist + // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying + // to create tasks to create a table inside a db that as-of-now does not exist, but there is + // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate + // defaults and do not error out in that case. + // the above will change now since we are going to split replication load in multiple execution + // tasks and hence we could have created the database earlier in which case the waitOnPrecursor will + // be false and hence if db Not found we should error out. + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + if (parentDb == null) { + if (!tableContext.waitOnPrecursor()) { + throw new SemanticException( + ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName())); } + } - Task tblRootTask = null; - ReplLoadOpType loadTblType = getLoadTableType(table); - switch (loadTblType) { - case LOAD_NEW: - break; - case LOAD_REPLACE: - tblRootTask = dropTableTask(table); - break; - case LOAD_SKIP: - return tracker; - default: - break; - } + Task tblRootTask = null; + ReplLoadOpType loadTblType = getLoadTableType(table); + switch (loadTblType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + tblRootTask = dropTableTask(table); + break; + case LOAD_SKIP: + return tracker; + default: + break; + } - TableLocationTuple - tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context); - tableDesc.setLocation(tableLocationTuple.location); + TableLocationTuple + tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context); + tableDesc.setLocation(tableLocationTuple.location); - /* Note: In the following section, Metadata-only import handling logic is - interleaved with regular repl-import logic. The rule of thumb being - followed here is that MD-only imports are essentially ALTERs. They do - not load data, and should not be "creating" any metadata - they should - be replacing instead. The only place it makes sense for a MD-only import - to create is in the case of a table that's been dropped and recreated, - or in the case of an unpartitioned table. In all other cases, it should - behave like a noop or a pure MD alter. - */ - newTableTasks(tableDesc, tblRootTask, tableLocationTuple); + /* Note: In the following section, Metadata-only import handling logic is + interleaved with regular repl-import logic. The rule of thumb being + followed here is that MD-only imports are essentially ALTERs. They do + not load data, and should not be "creating" any metadata - they should + be replacing instead. The only place it makes sense for a MD-only import + to create is in the case of a table that's been dropped and recreated, + or in the case of an unpartitioned table. In all other cases, it should + behave like a noop or a pure MD alter. + */ + newTableTasks(tableDesc, tblRootTask, tableLocationTuple); - // Set Checkpoint task as dependant to create table task. So, if same dump is retried for - // bootstrap, we skip current table update. - Task ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - null, - context.dumpDirectory, - context.hiveConf - ); - if (!isPartitioned(tableDesc)) { - Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); - ckptTask.addDependentTask(replLogTask); - } - tracker.addDependentTask(ckptTask); - return tracker; - } catch (Exception e) { - throw new SemanticException(e); + // Set Checkpoint task as dependant to create table task. So, if same dump is retried for + // bootstrap, we skip current table update. + Task ckptTask = ReplUtils.getTableCheckpointTask( + tableDesc, + null, + context.dumpDirectory, + context.hiveConf + ); + if (!isPartitioned(tableDesc)) { + Task replLogTask + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + ckptTask.addDependentTask(replLogTask); } + tracker.addDependentTask(ckptTask); + return tracker; } private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException {