diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index a1911b4..ec64f4b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -41,6 +41,7 @@ import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -164,7 +165,7 @@ public void tearDown() throws Throwable { assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc"))); - Table avroTable = replica.getTable(replicatedDbName, "avro_table"); + Table avroTable = primary.getTable(primaryDbName, "avro_table"); assertFalse(isTransactionalTable(avroTable)); assertFalse(MetaStoreUtils.isExternalTable(avroTable)); return tuple; @@ -325,4 +326,24 @@ public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable { ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName, replicatedDbName, selectStmtList, expectedValues, bootStrapDump.lastReplicationId); } + + @Test + public void testBootstrapLoadMigrationToAcidWithMoveOptimization() throws Throwable { + List withConfigs = + Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); + WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } + + @Test + public void testIncrementalLoadMigrationToAcidWithMoveOptimization() throws Throwable { + List withConfigs = + Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation); + tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index c7eaa0b..822051c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -119,6 +121,24 @@ protected int execute(DriverContext driverContext) { return 0; } } + + if (work.isCopyToMigratedTxnTable()) { + // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it + // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. + // The toPath received in ReplCopyWork is pointing to table/partition base location. + // So, just need to append the base or delta directory. + // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and + // hence need to create base directory. If false, then it is repl load for regular insert into or + // load flow and hence just create delta directory. + String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); + if (writeIdString == null) { + console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); + return 6; + } + long writeId = Long.parseLong(writeIdString); + toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, + driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement())); + } } else { // This flow is usually taken for IMPORT command FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); @@ -149,7 +169,7 @@ protected int execute(DriverContext driverContext) { if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { LOG.debug(" path " + toPath + " is cleaned before renaming"); hiveDb.cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), - work.getIsAutoPerge()); + work.getIsAutoPurge()); } if (!FileUtils.mkdir(dstFs, toPath, conf)) { @@ -248,16 +268,18 @@ public String getName() { } public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean isAutoPurge, boolean needRecycle) { + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); - if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); rcwork.setNeedRecycle(needRecycle); } + rcwork.setCopyToMigratedTxnTable(copyToMigratedTxnTable); LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); @@ -275,4 +297,9 @@ public String getName() { } return copyTask; } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 8102997..2e895a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -229,12 +229,18 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; - if (event.replicationSpec().isInReplicationScope() && !event.replicationSpec().isMigratingToTxnTable() && + if (event.replicationSpec().isInReplicationScope() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; + if (event.replicationSpec().isMigratingToTxnTable()) { + // Migrating to transactional tables in bootstrap load phase. + // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. + // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata. + tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)); + } } else { - loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : - event.replicationSpec().isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; + loadFileType = (event.replicationSpec().isReplace() || event.replicationSpec().isMigratingToTxnTable()) + ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } @@ -242,7 +248,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, - context.hiveConf, false, false + context.hiveConf ); Task movePartitionTask = null; @@ -284,14 +290,17 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { if (event.replicationSpec().isMigratingToTxnTable()) { - // Write-id is hardcoded to 1 so that for migration, we just move all original files under delta_1_1 dir. - // It is used only for transactional tables created after migrating from non-ACID table. + // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir. // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - loadFileType, 1L + loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID ); loadTableWork.setInheritTableSpecs(false); + loadTableWork.setStmtId(0); + + // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0. + loadTableWork.setInsertOverwrite(true); moveWork.setLoadTableWork(loadTableWork); } else { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 1d454fd..520b410 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -201,7 +201,7 @@ private void newTableTasks(ImportTableDesc tblDesc, Task tblRootTask) throws // The write-id 1 is used to copy data for the given table and also no writes are aborted. ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList( AcidUtils.getFullTableName(tblDesc.getDatabaseName(), tblDesc.getTableName()), - new long[0], new BitSet(), 1); + new long[0], new BitSet(), ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID); ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), null, validWriteIdList.writeToString(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); Task replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); @@ -237,12 +237,18 @@ private String location(ImportTableDesc tblDesc, Database parentDb) // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; - if (replicationSpec.isInReplicationScope() && !replicationSpec.isMigratingToTxnTable() && + if (replicationSpec.isInReplicationScope() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; + if (event.replicationSpec().isMigratingToTxnTable()) { + // Migrating to transactional tables in bootstrap load phase. + // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. + // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata. + tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)); + } } else { - loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : - replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; + loadFileType = (replicationSpec.isReplace() || replicationSpec.isMigratingToTxnTable()) + ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); } @@ -250,19 +256,21 @@ private String location(ImportTableDesc tblDesc, Database parentDb) + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, - false, false); + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { if (replicationSpec.isMigratingToTxnTable()) { - // Write-id is hardcoded to 1 so that for migration, we just move all original files under delta_1_1 dir. - // However, it unused if it is non-ACID table. + // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir. // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - loadFileType, 1L + loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID ); + loadTableWork.setStmtId(0); + + // Need to set insertOverwrite so base_1 is created instead of delta_1_1_0. + loadTableWork.setInsertOverwrite(true); moveWork.setLoadTableWork(loadTableWork); } else { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 7d2b616..014192b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -62,6 +62,9 @@ // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; + // Migrating to transactional tables in bootstrap load phase. + // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. + public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index e82a102..9c78108 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -436,10 +436,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - boolean isAutoPurge; - boolean needRecycle; + boolean isAutoPurge = false; + boolean needRecycle = false; + boolean copyToMigratedTxnTable = false; - if (replicationSpec.isInReplicationScope() && !replicationSpec.isMigratingToTxnTable() && + if (replicationSpec.isInReplicationScope() && x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { lft = LoadFileType.IGNORE; destPath = loadPath = tgtPath; @@ -450,6 +451,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); } + copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); } else { if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { String mmSubdir = replace ? AcidUtils.baseDir(writeId) @@ -471,8 +473,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, lft = replace ? LoadFileType.REPLACE_ALL : replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; } - needRecycle = false; - isAutoPurge = false; } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { @@ -488,7 +488,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isAutoPurge, needRecycle); + isAutoPurge, needRecycle, copyToMigratedTxnTable); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -565,8 +565,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); - boolean isAutoPurge; - boolean needRecycle; + boolean isAutoPurge = false; + boolean needRecycle = false; + boolean copyToMigratedTxnTable = false; if (tblDesc.isExternal() && tblDesc.getLocation() == null) { x.getLOG().debug("Importing in-place: adding AddPart for partition " @@ -586,7 +587,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, LoadFileType loadFileType; Path destPath; - if (replicationSpec.isInReplicationScope() && !replicationSpec.isMigratingToTxnTable() && + if (replicationSpec.isInReplicationScope() && x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { loadFileType = LoadFileType.IGNORE; destPath = tgtLocation; @@ -597,6 +598,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); } + copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); } else { loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : @@ -606,8 +608,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, replicationSpec.isInReplicationScope(); destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); - isAutoPurge = false; - needRecycle = false; } Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) || @@ -624,8 +624,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - copyTask = ReplCopyTask.getLoadCopyTask( - replicationSpec, new Path(srcLocation), destPath, x.getConf(), isAutoPurge, needRecycle); + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, + x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 14c7f06..3a32885 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -195,7 +195,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri) Task copyTask = ReplCopyTask.getLoadCopyTask( metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, - context.hiveConf, false, false + context.hiveConf ); replCopyTasks.add(copyTask); ResourceUri destinationUri = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index fac5aff..4d34f8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -57,6 +57,8 @@ private String distCpDoAsUser = null; + private boolean copyToMigratedTxnTable; + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { super(srcPath, destPath, errorOnSrcEmpty); } @@ -93,11 +95,19 @@ public void setNeedRecycle(boolean needRecycle) { this.needRecycle = needRecycle; } - public boolean getIsAutoPerge() { + public boolean getIsAutoPurge() { return isAutoPurge; } public void setAutoPurge(boolean isAutoPurge) { this.isAutoPurge = isAutoPurge; } + + public boolean isCopyToMigratedTxnTable() { + return copyToMigratedTxnTable; + } + + public void setCopyToMigratedTxnTable(boolean copyToMigratedTxnTable) { + this.copyToMigratedTxnTable = copyToMigratedTxnTable; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java index 381ea43..7b6c3e7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java @@ -92,7 +92,7 @@ public void createDestinationPath() throws IOException, SemanticException, URISy mockStatic(ReplCopyTask.class); Task mock = mock(Task.class); when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class), - any(HiveConf.class), any(Boolean.class), any(Boolean.class))).thenReturn(mock); + any(HiveConf.class))).thenReturn(mock); ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR, "hdfs://localhost:9000/user/someplace/ab.jar#e094828883"));