diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 13b918d..689ca76 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -524,4 +524,100 @@ public void testIncrementalDumpOfWarehouse() throws Throwable { */ } + + @Test + public void testReplLoadFromSourceUsingWithClause() throws Throwable { + HiveConf replicaConf = replica.getConf(); + List withConfigs = Arrays.asList( + "'hive.metastore.warehouse.dir'='" + replicaConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) + "'", + "'hive.metastore.uris'='" + replicaConf.getVar(HiveConf.ConfVars.METASTOREURIS) + "'", + "'hive.repl.replica.functions.root.dir'='" + replicaConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR) + "'"); + + //////////// Bootstrap //////////// + WarehouseInstance.Tuple bootstrapTuple = primary + .run("use " + primaryDbName) + .run("create table table1 (i int)") + .run("create table table2 (id int) partitioned by (country string)") + .run("insert into table1 values (1)") + .dump(primaryDbName, null); + + // Run load on primary itself + primary.load(replicatedDbName, bootstrapTuple.dumpLocation, withConfigs) + .status(replicatedDbName, withConfigs) + .verifyResult(bootstrapTuple.lastReplicationId); + + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table1", "table2" }) + .run("select * from table1") + .verifyResults(new String[]{ "1" }); + + //////////// First Incremental //////////// + WarehouseInstance.Tuple incrementalOneTuple = primary + .run("use " + primaryDbName) + .run("alter table table1 rename to renamed_table1") + .run("insert into table2 partition(country='india') values (1) ") + .run("insert into table2 partition(country='usa') values (2) ") + .run("create table table3 (i int)") + .run("insert into table3 values(10)") + .run("create function " + primaryDbName + + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + // Run load on primary itself + primary.load(replicatedDbName, incrementalOneTuple.dumpLocation, withConfigs) + .status(replicatedDbName, withConfigs) + .verifyResult(incrementalOneTuple.lastReplicationId); + + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "renamed_table1", "table2", "table3" }) + .run("select * from renamed_table1") + .verifyResults(new String[] { "1" }) + .run("select id from table2 order by id") + .verifyResults(new String[] { "1", "2" }) + .run("select * from table3") + .verifyResults(new String[] { "10" }) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); + + //////////// Second Incremental //////////// + WarehouseInstance.Tuple secondIncremental = primary + .run("use " + primaryDbName) + .run("alter table table2 add columns (zipcode int)") + .run("alter table table3 set tblproperties('custom.property'='custom.value')") + .run("drop table renamed_table1") + .run("alter table table2 drop partition(country='usa')") + .run("truncate table table3") + .run("drop function " + primaryDbName + ".testFunctionOne ") + .dump(primaryDbName, incrementalOneTuple.lastReplicationId); + + // Run load on primary itself + primary.load(replicatedDbName, secondIncremental.dumpLocation, withConfigs) + .status(replicatedDbName, withConfigs) + .verifyResult(secondIncremental.lastReplicationId); + + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table2", "table3"}) + .run("desc table2") + .verifyResults(new String[] { + "id \tint \t ", + "country \tstring \t ", + "zipcode \tint \t ", + "\t \t ", + "# Partition Information\t \t ", + "# col_name \tdata_type \tcomment ", + "country \tstring \t ", + }) + .run("show tblproperties table3('custom.property')") + .verifyResults(new String[] { "custom.value\t " }) + .run("select id from table2 order by id") + .verifyResults(new String[] { "1" }) + .run("select * from table3") + .verifyResults(Collections.emptyList()) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(null); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index feb1191..481992d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -147,6 +147,10 @@ private Path mkDir(DistributedFileSystem fs, String pathString) return PathBuilder.fullyQualifiedHDFSUri(path, fs); } + public HiveConf getConf() { + return hiveConf; + } + private int next = 0; private void advanceDumpDir() { @@ -202,6 +206,25 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro return this; } + WarehouseInstance load(String replicatedDbName, String dumpLocation, List withClauseOptions) + throws Throwable { + String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; + if (!withClauseOptions.isEmpty()) { + replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + run("EXPLAIN " + replLoadCmd); + printOutput(); + return run(replLoadCmd); + } + + WarehouseInstance status(String replicatedDbName, List withClauseOptions) throws Throwable { + String replStatusCmd = "REPL STATUS " + replicatedDbName; + if (!withClauseOptions.isEmpty()) { + replStatusCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return run(replStatusCmd); + } + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 9a91e3f..e862589 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -424,7 +424,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); loadTableWork.setStmtId(stmtId); MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); - Task loadTableTask = TaskFactory.get(mv); + Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -466,11 +466,7 @@ private static boolean isAcid(Long writeId) { } else { partSpec.setLocation(ptn.getLocation()); // use existing location } - return TaskFactory.get(new DDLWork( - x.getInputs(), - x.getOutputs(), - addPartitionDesc - )); + return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, @@ -484,8 +480,8 @@ private static boolean isAcid(Long writeId) { + partSpecToString(partSpec.getPartSpec())); // addPartitionDesc already has the right partition location @SuppressWarnings("unchecked") - Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), - x.getOutputs(), addPartitionDesc)); + Task addPartTask = TaskFactory.get( + new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); return addPartTask; } else { String srcLocation = partSpec.getLocation(); @@ -518,8 +514,9 @@ private static boolean isAcid(Long writeId) { copyTask = TaskFactory.get(cw); } - Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), - x.getOutputs(), addPartitionDesc)); + Task addPartTask = TaskFactory.get( + new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import? // See setLoadFileType and setIsAcidIow calls elsewhere for an example. LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), @@ -528,8 +525,9 @@ private static boolean isAcid(Long writeId) { writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); - Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false)); + Task loadPartTask = TaskFactory.get( + new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false), + x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); @@ -830,7 +828,7 @@ private static void createRegularImportTasks( if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); - Task ict = createImportCommitTask( + Task ict = createImportCommitTask(x.getConf(), table.getDbName(), table.getTableName(), writeId, stmtId, AcidUtils.isInsertOnlyTable(table.getParameters())); @@ -867,7 +865,7 @@ private static void createRegularImportTasks( x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED)); if (isPartitioned(tblDesc)) { - Task ict = createImportCommitTask( + Task ict = createImportCommitTask(x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { @@ -903,10 +901,10 @@ private static void createRegularImportTasks( } private static Task createImportCommitTask( - String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) { + HiveConf conf, String dbName, String tblName, Long writeId, int stmtId, boolean isMmTable) { // TODO: noop, remove? Task ict = (!isMmTable) ? null : TaskFactory.get( - new ImportCommitWork(dbName, tblName, writeId, stmtId)); + new ImportCommitWork(dbName, tblName, writeId, stmtId), conf); return ict; } @@ -994,7 +992,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { - Task ict = createImportCommitTask( + Task ict = createImportCommitTask(x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { @@ -1021,8 +1019,8 @@ private static void createReplImportTasks( Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; Task ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask( - tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, - AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); + x.getConf(), tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, + AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 753f039..9f18efd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -82,8 +82,9 @@ // of any other queries running in the session private HiveConf conf; - // This will be set to true only if repl-status is fired with "WITH" keyword - private boolean needNewdb; + // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again + // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands. + private Hive db; private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -93,6 +94,7 @@ ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); + this.db = super.db; this.conf = new HiveConf(super.conf); } @@ -226,6 +228,13 @@ private void initReplLoad(ASTNode ast) throws SemanticException { for (Map.Entry config : replConfigs.entrySet()) { conf.set(config.getKey(), config.getValue()); } + + // As hive conf is changed, need to get the Hive DB again with it. + try { + db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } } break; default: @@ -420,7 +429,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo())); ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); - Task barrierTask = TaskFactory.get(replStateLogWork); + Task barrierTask = TaskFactory.get(replStateLogWork, conf); taskChainTail.addDependentTask(barrierTask); LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", taskChainTail.getClass(), taskChainTail.getId(), @@ -560,7 +569,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ - needNewdb = false; dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); int numChildren = ast.getChildCount(); for (int i = 1; i < numChildren; i++) { @@ -576,8 +584,14 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ for (Map.Entry config : replConfigs.entrySet()) { conf.set(config.getKey(), config.getValue()); } + + // As hive conf is changed, need to get the Hive DB again with it. + try { + db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } } - needNewdb = true; break; default: throw new SemanticException("Unrecognized token in REPL STATUS statement"); @@ -592,16 +606,9 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { String replLastId = null; try { - Hive newDb; - if (needNewdb) { - newDb = Hive.get(conf, false); - } else { - newDb = db; - } - if (tblNameOrPattern != null) { // Checking for status of table - Table tbl = newDb.getTable(dbNameOrPattern, tblNameOrPattern); + Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern); if (tbl != null) { inputs.add(new ReadEntity(tbl)); Map params = tbl.getParameters(); @@ -611,8 +618,7 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { } } else { // Checking for status of a db - - Database database = newDb.getDatabase(dbNameOrPattern); + Database database = db.getDatabase(dbNameOrPattern); if (database != null) { inputs.add(new ReadEntity(database)); Map params = database.getParameters(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 8f76230..bba7692 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -69,7 +69,8 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), fks, new ArrayList(), context.eventOnlyReplicationSpec()); - Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc)); + Task addConstraintsTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index 7889e03..2b8a32c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -66,7 +66,8 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), new ArrayList(), nns, new ArrayList(), context.eventOnlyReplicationSpec()); - Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc)); + Task addConstraintsTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index f9a615a..e8966ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -64,7 +64,8 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, pks, new ArrayList(), new ArrayList(), context.eventOnlyReplicationSpec()); - Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc)); + Task addConstraintsTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index 757381a..81f1c5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -64,7 +64,8 @@ AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList(), new ArrayList(), uks, context.eventOnlyReplicationSpec()); - Task addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc)); + Task addConstraintsTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index 6c6ee02..00ce977 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -80,7 +80,7 @@ } Task alterDbTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, alterDbDesc)); + new DDLWork(readEntitySet, writeEntitySet, alterDbDesc), context.hiveConf); context.log.debug("Added alter database task : {}:{}", alterDbTask.getId(), actualDbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java index 0dc72e0..f8d8d1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java @@ -61,12 +61,12 @@ new CreateDatabaseDesc(destinationDBName, db.getDescription(), null, true); createDatabaseDesc.setDatabaseProperties(db.getParameters()); Task createDBTask = TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), createDatabaseDesc)); + new DDLWork(new HashSet<>(), new HashSet<>(), createDatabaseDesc), context.hiveConf); if (!db.getParameters().isEmpty()) { AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(destinationDBName, db.getParameters(), context.eventOnlyReplicationSpec()); Task alterDbProperties = TaskFactory - .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc)); + .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), context.hiveConf); createDBTask.addDependentTask(alterDbProperties); } if (StringUtils.isNotEmpty(db.getOwnerName())) { @@ -74,7 +74,7 @@ new PrincipalDesc(db.getOwnerName(), db.getOwnerType()), context.eventOnlyReplicationSpec()); Task alterDbTask = TaskFactory - .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbOwner)); + .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbOwner), context.hiveConf); createDBTask.addDependentTask(alterDbTask); } updatedMetadata diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 77c2dd2..f7c9040 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -65,7 +65,7 @@ public String getFunctionName() { context.log.debug("Loading function desc : {}", descToLoad.toString()); Task createTask = TaskFactory.get( - new FunctionWork(descToLoad)); + new FunctionWork(descToLoad), context.hiveConf); context.log.debug("Added create function task : {}:{},{}", createTask.getId(), descToLoad.getFunctionName(), descToLoad.getClassName()); // This null check is specifically done as the same class is used to handle both incremental and @@ -92,7 +92,7 @@ public String getFunctionName() { * which should only happen when the last task is finished, at which point the child of the barrier task is picked up. */ Task barrierTask = - TaskFactory.get(new DependencyCollectionWork()); + TaskFactory.get(new DependencyCollectionWork(), context.hiveConf); builder.replCopyTasks.forEach(t -> t.addDependentTask(barrierTask)); barrierTask.addDependentTask(createTask); return builder.replCopyTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java index 7c7ce1d..5f9f879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java @@ -39,7 +39,8 @@ AlterTableDesc dropConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, constraintName, context.eventOnlyReplicationSpec()); - Task dropConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, dropConstraintsDesc)); + Task dropConstraintsTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, dropConstraintsDesc), context.hiveConf); context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropConstraintsTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java index 363f08c..8b11a9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java @@ -40,7 +40,7 @@ new DropDatabaseDesc(actualDbName, true, context.eventOnlyReplicationSpec()); Task dropDBTask = TaskFactory - .get(new DDLWork(new HashSet<>(), new HashSet<>(), desc)); + .get(new DDLWork(new HashSet<>(), new HashSet<>(), desc), context.hiveConf); context.log.info( "Added drop database task : {}:{}", dropDBTask.getId(), desc.getDatabaseName()); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java index 1fc7e13..fee2bb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -39,7 +39,7 @@ FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName); DropFunctionDesc desc = new DropFunctionDesc( qualifiedFunctionName, false, context.eventOnlyReplicationSpec()); - Task dropFunctionTask = TaskFactory.get(new FunctionWork(desc)); + Task dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf); context.log.debug( "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 1a28eec..7281a1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -53,7 +53,7 @@ DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, partSpecs, null, true, context.eventOnlyReplicationSpec()); Task dropPtnTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc) + new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc), context.hiveConf ); context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), dropPtnDesc.getTableName(), msg.getPartitions()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index 3e567e9..62784e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -40,7 +40,7 @@ null, true, true, context.eventOnlyReplicationSpec(), false ); Task dropTableTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, dropTableDesc) + new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), context.hiveConf ); context.log.debug( "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 396b7ee..43f2cbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -62,7 +62,7 @@ RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc( tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec()); Task renamePtnTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc)); + new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); context.log.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 98bf625..83433d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -61,7 +61,7 @@ AlterTableDesc renameTableDesc = new AlterTableDesc( oldName, newName, false, context.eventOnlyReplicationSpec()); Task renameTableTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renameTableDesc)); + new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf); context.log.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 1e7fa10..b4fc000 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -58,7 +58,7 @@ actualDbName + "." + actualTblName, partSpec, context.eventOnlyReplicationSpec()); Task truncatePtnTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc)); + new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), context.hiveConf); context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index bdef67f..fe73a18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -39,7 +39,7 @@ actualDbName + "." + actualTblName, null, context.eventOnlyReplicationSpec()); Task truncateTableTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc)); + new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), context.hiveConf); context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName());