diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 169fed857d..47aba00964 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -364,10 +364,10 @@ public void testBootstrapFailedDump() throws IOException { String replicatedDbName = dbName + "_dupe"; - EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true); + EximUtil.DataCopyPath.setNullSrcPath(hconf, true); verifyFail("REPL DUMP " + dbName, driver); advanceDumpDir(); - EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false); + EximUtil.DataCopyPath.setNullSrcPath(hconf, false); Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); advanceDumpDir(); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d7b360cd93..3f53e7f48e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -114,6 +114,42 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { replicatedDbName + ".testFunctionTwo" }); } + @Test + public void testCreateFunctionOnHDFSIncrementalReplication() throws Throwable { + Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar"); + Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar"); + Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar"); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath); + setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc1 as 'IdentityStringUDF' " + + "using jar '" + identityUdf1HdfsPath.toString() + "'"); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1"}) + .run("SELECT " + replicatedDbName + ".idFunc1('MyName')") + .verifyResults(new String[] { "MyName"}); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".idFunc2 as 'IdentityStringUDF' " + + "using jar '" + identityUdf2HdfsPath.toString() + "'"); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'") + .verifyResults(new String[] { replicatedDbName + ".idFunc1", + replicatedDbName + ".idFunc2" }) + .run("SELECT " + replicatedDbName + ".idFunc2('YourName')") + .verifyResults(new String[] { "YourName"}); + } + @Test public void testBootstrapReplLoadRetryAfterFailureForFunctions() throws Throwable { String funcName1 = "f1"; @@ -1683,4 +1719,9 @@ private void ensureFailedReplOperation(List clause, String conf, boolean private String quote(String str) { return "'" + str + "'"; } + + private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPath) throws IOException { + FileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index c11f58219c..5ffc110c42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -91,7 +91,7 @@ protected int copyOnePath(Path fromPath, Path toPath) { Utilities.FILE_OP_LOGGER.debug("Copying file {} to {}", oneSrcPathStr, toPath); if (!FileUtils.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete source - true, // overwrite destination + work.isOverwrite(), // overwrite destination conf)) { console.printError("Failed to copy: '" + oneSrcPathStr + "to: '" + toPath.toString() + "'"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index b15b326b1c..97394e6bf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; -import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -58,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -244,6 +243,7 @@ private void initiateDataCopyTasks() throws SemanticException { } childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf)); + childTasks.addAll(work.functionsBinariesCopyTasks(taskTracker, conf)); if (childTasks.isEmpty()) { //All table data copy work finished. finishRemainingTasks(); @@ -465,7 +465,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String validTxnList = null; long waitUntilTime = 0; long bootDumpBeginReplId = -1; - List managedTableCopyPaths = Collections.emptyList(); + List managedTableCopyPaths = Collections.emptyList(); List extTableCopyWorks = Collections.emptyList(); List tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); // If we are bootstrapping ACID tables, we need to perform steps similar to a regular @@ -779,7 +779,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); List extTableCopyWorks = new ArrayList<>(); - List managedTableCopyPaths = new ArrayList<>(); + List managedTableCopyPaths = new ArrayList<>(); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -790,6 +790,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) //We can't reuse the previous write id as it might be invalid due to compaction metadataPath.getFileSystem(conf).delete(metadataPath, true); } + List functionsBinaryCopyPaths = Collections.emptyList(); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); // TODO : Currently we don't support separate table list for each database. @@ -813,8 +814,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) work.getMetricCollector().reportStageStart(getName(), metricMap); Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); - dumpFunctionMetadata(dbName, dbRoot, hiveDb); - + functionsBinaryCopyPaths = dumpFunctionMetadata(dbName, dbRoot, dbDataRoot, hiveDb); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; try (Writer writer = new Writer(dbRoot, conf)) { @@ -873,7 +873,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); dmd.write(true); - + work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); return bootDumpBeginReplId; @@ -921,7 +921,7 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive return dbRoot; } - List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, + List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, Path dbRootData, long lastReplId, Hive hiveDb, HiveWrapper.Tuple tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); @@ -941,7 +941,7 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - List managedTableCopyPaths = new TableExport( + List managedTableCopyPaths = new TableExport( exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); @@ -1059,24 +1059,30 @@ private Path getLatestDumpPath(Path dumpRoot) throws IOException { return null; } - void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception { - Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + List dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, + Hive hiveDb) throws Exception { + List functionsBinaryCopyPaths = new ArrayList<>(); + Path functionsMetaRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + Path functionsDataRoot = new Path(dbDataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { HiveWrapper.Tuple tuple = functionTuple(functionName, dbName, hiveDb); if (tuple == null) { continue; } - Path functionRoot = new Path(functionsRoot, functionName); - Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME); + Path functionMetaRoot = new Path(functionsMetaRoot, functionName); + Path functionMetadataFile = new Path(functionMetaRoot, FUNCTION_METADATA_FILE_NAME); + Path functionDataRoot = new Path(functionsDataRoot, functionName); try (JsonWriter jsonWriter = new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) { - FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); + FunctionSerializer serializer = new FunctionSerializer(tuple.object, functionDataRoot, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); + functionsBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths()); } work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1); replLogger.functionLog(functionName); } + return functionsBinaryCopyPaths; } void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index bccaf9417b..a5628c0b20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -54,7 +54,8 @@ static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; private transient Iterator dirCopyIterator; - private transient Iterator managedTableCopyPathIterator; + private transient Iterator managedTableCopyPathIterator; + private transient Iterator functionCopyPathIterator; private Path currentDumpPath; private List resultValues; private boolean shouldOverwrite; @@ -137,13 +138,20 @@ public void setDirCopyIterator(Iterator dirCopyIterator) { this.dirCopyIterator = dirCopyIterator; } - public void setManagedTableCopyPathIterator(Iterator managedTableCopyPathIterator) { + public void setManagedTableCopyPathIterator(Iterator managedTableCopyPathIterator) { if (this.managedTableCopyPathIterator != null) { throw new IllegalStateException("Managed table copy path iterator has already been initialized"); } this.managedTableCopyPathIterator = managedTableCopyPathIterator; } + public void setFunctionCopyPathIterator(Iterator functionCopyPathIterator) { + if (this.functionCopyPathIterator != null) { + throw new IllegalStateException("Function copy path iterator has already been initialized"); + } + this.functionCopyPathIterator = functionCopyPathIterator; + } + public boolean tableDataCopyIteratorsInitialized() { return dirCopyIterator != null || managedTableCopyPathIterator != null; } @@ -185,7 +193,7 @@ public void setResultValues(List resultValues) { } List> tasks = new ArrayList<>(); while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); + EximUtil.DataCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); Task copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite); @@ -196,6 +204,22 @@ public void setResultValues(List resultValues) { return tasks; } + public List> functionsBinariesCopyTasks(TaskTracker tracker, HiveConf conf) { + List> tasks = new ArrayList<>(); + if (functionCopyPathIterator != null) { + while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next(); + Task copyTask = ReplCopyTask.getLoadCopyTask( + binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf + ); + tasks.add(copyTask); + tracker.addTask(copyTask); + LOG.debug("added task for {}", binaryCopyPath); + } + } + return tasks; + } + public void setShouldOverwrite(boolean shouldOverwrite) { this.shouldOverwrite = shouldOverwrite; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index fb6a38cd43..02b457efb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -159,15 +159,15 @@ public void setOpenTxnTask(Task openTxnTask) { } /** - * Wrapper class for mapping source and target path for copying managed table data. + * Wrapper class for mapping source and target path for copying managed table data and function's binary. */ - public static class ManagedTableCopyPath { + public static class DataCopyPath { private ReplicationSpec replicationSpec; private static boolean nullSrcPathForTest = false; private Path srcPath; private Path tgtPath; - public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { + public DataCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { this.replicationSpec = replicationSpec; if (srcPath == null) { throw new IllegalArgumentException("Source path can not be null."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 3de583276e..0b8a352a23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; @@ -74,10 +74,10 @@ this.callersSession = SessionState.get(); } - List write(final ReplicationSpec forReplicationSpec, boolean isExportTask) + List write(final ReplicationSpec forReplicationSpec, boolean isExportTask) throws InterruptedException, HiveException { List> futures = new LinkedList<>(); - List managedTableCopyPaths = new LinkedList<>(); + List managedTableCopyPaths = new LinkedList<>(); ExecutorService producer = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { @@ -122,7 +122,7 @@ .export(isExportTask); Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); - return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), + return new DataCopyPath(forReplicationSpec, partition.getDataLocation(), dataDumpDir); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); @@ -134,7 +134,7 @@ try { Object retVal = future.get(); if (retVal != null) { - ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal; + DataCopyPath managedTableCopyPath = (DataCopyPath) retVal; managedTableCopyPaths.add(managedTableCopyPath); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index b11afe80a1..8b5c7b1609 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -96,8 +96,8 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.mmCtx = mmCtx; } - public List write(boolean isExportTask) throws SemanticException { - List managedTableCopyPaths = Collections.emptyList(); + public List write(boolean isExportTask) throws SemanticException { + List managedTableCopyPaths = Collections.emptyList(); if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -161,9 +161,9 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } } - private List writeData(PartitionIterable partitions, boolean isExportTask) + private List writeData(PartitionIterable partitions, boolean isExportTask) throws SemanticException { - List managedTableCopyPaths = new ArrayList<>(); + List managedTableCopyPaths = new ArrayList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { @@ -175,7 +175,7 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), + managedTableCopyPaths.add(new DataCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), paths.dataExportDir())); new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(isExportTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index c9e1041fc1..88cae5c302 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -20,13 +20,25 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; +import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath; + +import javax.security.auth.login.LoginException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + class CreateFunctionHandler extends AbstractEventHandler { CreateFunctionHandler(NotificationEvent event) { super(event); @@ -41,13 +53,36 @@ CreateFunctionMessage eventMessage(String stringRepresentation) { public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} CREATE_FUNCTION message : {}", fromEventId(), eventMessageAsJSON); Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); - + List functionBinaryCopyPaths = new ArrayList<>(); try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { - new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf) - .writeTo(jsonWriter, withinContext.replicationSpec); + FunctionSerializer serializer = new FunctionSerializer(eventMessage.getFunctionObj(), + dataPath, withinContext.hiveConf); + serializer.writeTo(jsonWriter, withinContext.replicationSpec); + functionBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths()); } withinContext.createDmd(this).write(); + copyFunctionBinaries(functionBinaryCopyPaths, withinContext.hiveConf); + } + + private void copyFunctionBinaries(List functionBinaryCopyPaths, HiveConf hiveConf) + throws MetaException, IOException, LoginException, HiveFatalException { + if (!functionBinaryCopyPaths.isEmpty()) { + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + List filePaths = new ArrayList<>(); + for (DataCopyPath funcBinCopyPath : functionBinaryCopyPaths) { + String [] decodedURISplits = ReplChangeManager.decodeFileUri(funcBinCopyPath.getSrcPath().toString()); + ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), + decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf); + filePaths.add(fileInfo); + Path destRoot = funcBinCopyPath.getTargetPath().getParent(); + FileSystem dstFs = destRoot.getFileSystem(hiveConf); + CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); + copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), false); + copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths); + } + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index 733bab522f..318c38aa12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; @@ -40,10 +41,13 @@ public static final String FIELD_NAME = "function"; private Function function; private HiveConf hiveConf; + private Path functionDataRoot; + private List functionBinaryCopyPaths = new ArrayList<>(); - public FunctionSerializer(Function function, HiveConf hiveConf) { + public FunctionSerializer(Function function, Path functionDataRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; this.function = function; + this.functionDataRoot = functionDataRoot; } @Override @@ -58,9 +62,12 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi FileSystem fileSystem = inputPath.getFileSystem(hiveConf); Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem); String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem); - String newFileUri = ReplChangeManager.getInstance(hiveConf) + String encodedSrcUri = ReplChangeManager.getInstance(hiveConf) .encodeFileUri(qualifiedUri.toString(), checkSum, null); - resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri)); + Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName()); + resourceUris.add(new ResourceUri(uri.getResourceType(),newBinaryPath.toString())); + functionBinaryCopyPaths.add(new EximUtil.DataCopyPath(additionalPropertiesProvider, + new Path(encodedSrcUri), newBinaryPath)); } else { resourceUris.add(uri); } @@ -84,4 +91,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } } + + public List getFunctionBinaryCopyPaths() { + return functionBinaryCopyPaths; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 948d201ddc..bea24d07d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import java.io.IOException; @@ -193,10 +194,8 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri) new Path(functionsRootDir).getFileSystem(context.hiveConf) ); - Task copyTask = ReplCopyTask.getLoadCopyTask( - metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, - context.hiveConf - ); + Task copyTask = TaskFactory.get( + new CopyWork(new Path(sourceUri), qualifiedDestinationPath, true, true), context.hiveConf); replCopyTasks.add(copyTask); ResourceUri destinationUri = new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 018983f6dc..f69776ad7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,6 +33,7 @@ private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; + private boolean overwrite = true; public CopyWork() { } @@ -42,6 +43,12 @@ public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) this.setErrorOnSrcEmpty(errorOnSrcEmpty); } + public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty, boolean overwrite) { + this(new Path[] { fromPath }, new Path[] { toPath }); + this.setErrorOnSrcEmpty(errorOnSrcEmpty); + this.setOverwrite(overwrite); + } + public CopyWork(final Path[] fromPath, final Path[] toPath) { if (fromPath.length != toPath.length) { throw new RuntimeException( @@ -87,4 +94,12 @@ public void setErrorOnSrcEmpty(boolean errorOnSrcEmpty) { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 8454b9c420..ebdd943070 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -87,7 +87,9 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) { } @Override - void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) { + List dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, + Hive hiveDb) { + return Collections.emptyList(); } @Override @@ -131,7 +133,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - List dumpTable(String dbName, String tblName, String validTxnList, + List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, Path dbRootData, long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple)