diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index c25e6e2159..81af2fe762 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -512,7 +512,7 @@ private void assertExternalFileInfo(List expected, Path externalTableInf tableNames.add(components[0]); assertTrue(components[1].length() > 0); } - assertTrue(expected.containsAll(tableNames)); + assertTrue(tableNames.containsAll(expected)); reader.close(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 8242b472f2..a48584b78b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -133,9 +133,9 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // db exists. boolean waitOnPrecursor = false; - for (int i = 1; i < ast.getChildCount(); ++i){ + for (int i = 1; i < ast.getChildCount(); ++i) { ASTNode child = (ASTNode) ast.getChild(i); - switch (child.getToken().getType()){ + switch (child.getToken().getType()) { case HiveParser.KW_EXTERNAL: isExternalSet = true; break; @@ -145,7 +145,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { break; case HiveParser.TOK_TAB: ASTNode tableNameNode = (ASTNode) child.getChild(0); - Map.Entry dbTablePair = getDbTableNamePair(tableNameNode); + Map.Entry dbTablePair = getDbTableNamePair(tableNameNode); parsedDbName = dbTablePair.getKey(); parsedTableName = dbTablePair.getValue(); // get partition metadata if partition specified @@ -164,10 +164,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } // parsing statement is now done, on to logic. tableExists = prepareImport(true, - isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, - parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), - new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), - null, getTxnMgr(), 0); + isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, + parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), + new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), + + null, getTxnMgr(), 0); } catch (SemanticException e) { throw e; @@ -185,12 +186,12 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap ASTNode partspec_val = (ASTNode) partspec.getChild(j); String val = null; String colName = unescapeIdentifier(partspec_val.getChild(0) - .getText().toLowerCase()); + .getText().toLowerCase()); if (partspec_val.getChildCount() < 2) { // DP in the form of T // partition (ds, hr) throw new SemanticException( - ErrorMsg.INVALID_PARTITION - .getMsg(" - Dynamic partitions not allowed")); + ErrorMsg.INVALID_PARTITION + .getMsg(" - Dynamic partitions not allowed")); } else { // in the form of T partition (ds="2010-03-03") val = stripQuotes(partspec_val.getChild(1).getText()); } @@ -220,16 +221,16 @@ private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table * Given that "repl load" now supports two modes "repl load dbName [location]" and * "repl load [location]" in which case the database name has to be taken from the table metadata * by default and then over-ridden if something specified on the command line. - * + *

* hence for import to work correctly we have to pass in the sessionState default Db via the * parsedDbName parameter */ public static boolean prepareImport(boolean isImportCmd, - boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, - String parsedLocation, String parsedTableName, String overrideDBName, - LinkedHashMap parsedPartSpec, - String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, - UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr, + boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, + String parsedLocation, String parsedTableName, String overrideDBName, + LinkedHashMap parsedPartSpec, + String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, + UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr, long writeId // Initialize with 0 for non-ACID and non-MM tables. ) throws IOException, MetaException, HiveException, URISyntaxException { @@ -242,7 +243,7 @@ public static boolean prepareImport(boolean isImportCmd, MetaData rv; try { - rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); } catch (IOException e) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); } @@ -253,10 +254,10 @@ public static boolean prepareImport(boolean isImportCmd, } ReplicationSpec replicationSpec = rv.getReplicationSpec(); - if (replicationSpec.isNoop()){ + if (replicationSpec.isNoop()) { // nothing to do here, silently return. x.getLOG().debug("Current update with ID:{} is noop", - replicationSpec.getCurrentReplicationState()); + replicationSpec.getCurrentReplicationState()); return false; } @@ -265,7 +266,7 @@ public static boolean prepareImport(boolean isImportCmd, } String dbname = rv.getTable().getDbName(); - if ((overrideDBName !=null) && (!overrideDBName.isEmpty())){ + if ((overrideDBName != null) && (!overrideDBName.isEmpty())) { // If the parsed statement contained a db.tablename specification, prefer that. dbname = overrideDBName; } @@ -305,7 +306,7 @@ public static boolean prepareImport(boolean isImportCmd, } boolean inReplicationScope = false; - if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()) { tblDesc.setReplicationSpec(replicationSpec); // Statistics for a non-transactional table will be replicated separately. Don't bother // with it here. @@ -329,7 +330,7 @@ public static boolean prepareImport(boolean isImportCmd, x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf())); } - if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){ + if ((parsedTableName != null) && (!parsedTableName.isEmpty())) { tblDesc.setTableName(parsedTableName); } @@ -338,9 +339,9 @@ public static boolean prepareImport(boolean isImportCmd, for (Partition partition : partitions) { // TODO: this should ideally not create AddPartitionDesc per partition AddPartitionDesc partsDesc = - getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition, - replicationSpec, x.getConf()); - if (inReplicationScope){ + getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition, + replicationSpec, x.getConf()); + if (inReplicationScope) { // Statistics for a non-transactional table will be replicated separately. Don't bother // with it here. if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) { @@ -350,12 +351,12 @@ public static boolean prepareImport(boolean isImportCmd, partitionDescs.add(partsDesc); } - if (isPartSpecSet){ + if (isPartSpecSet) { // The import specification asked for only a particular partition to be loaded // We load only that, and ignore all the others. boolean found = false; for (Iterator partnIter = partitionDescs - .listIterator(); partnIter.hasNext();) { + .listIterator(); partnIter.hasNext(); ) { AddPartitionDesc addPartitionDesc = partnIter.next(); if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { found = true; @@ -365,8 +366,8 @@ public static boolean prepareImport(boolean isImportCmd, } if (!found) { throw new SemanticException( - ErrorMsg.INVALID_PARTITION - .getMsg(" - Specified partition not found in import directory")); + ErrorMsg.INVALID_PARTITION + .getMsg(" - Specified partition not found in import directory")); } } @@ -412,25 +413,25 @@ public static boolean prepareImport(boolean isImportCmd, if (inReplicationScope) { createReplImportTasks( - tblDesc, partitionDescs, - replicationSpec, waitOnPrecursor, table, - fromURI, wh, x, writeId, stmtId, updatedMetadata); + tblDesc, partitionDescs, + replicationSpec, waitOnPrecursor, table, + fromURI, wh, x, writeId, stmtId, updatedMetadata); } else { createRegularImportTasks( - tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, table, - fromURI, fs, wh, x, writeId, stmtId); + tblDesc, partitionDescs, + isPartSpecSet, replicationSpec, table, + fromURI, fs, wh, x, writeId, stmtId); } return tableExists; } private static AddPartitionDesc getBaseAddPartitionDescFromPartition( - Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition, - ReplicationSpec replicationSpec, HiveConf conf) - throws MetaException, SemanticException { + Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition, + ReplicationSpec replicationSpec, HiveConf conf) + throws MetaException, SemanticException { AddPartitionDesc partsDesc = new AddPartitionDesc(dbName, tblDesc.getTableName(), - EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), - partition.getSd().getLocation(), partition.getParameters()); + EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), + partition.getSd().getLocation(), partition.getParameters()); AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0); partDesc.setInputFormat(partition.getSd().getInputFormat()); partDesc.setOutputFormat(partition.getSd().getOutputFormat()); @@ -441,27 +442,27 @@ private static AddPartitionDesc getBaseAddPartitionDescFromPartition( partDesc.setBucketCols(partition.getSd().getBucketCols()); partDesc.setSortCols(partition.getSd().getSortCols()); if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() - && !replicationSpec.isMigratingToExternalTable()) { + && !replicationSpec.isMigratingToExternalTable()) { String newLocation = ReplExternalTables - .externalTableLocation(conf, partition.getSd().getLocation()); + .externalTableLocation(conf, partition.getSd().getLocation()); LOG.debug("partition {} has data location: {}", partition, newLocation); partDesc.setLocation(newLocation); } else { partDesc.setLocation(new Path(fromPath, - Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); } return partsDesc; } private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, - org.apache.hadoop.hive.metastore.api.Table tblObj) throws Exception { + org.apache.hadoop.hive.metastore.api.Table tblObj) throws Exception { Table table = new Table(tblObj); return new ImportTableDesc(dbName, table); } private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, - ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId) throws HiveException { + ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, + Long writeId, int stmtId) throws HiveException { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -508,10 +509,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " + - dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + - " for " + table.getTableName() + ": " + + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + + writeId + + " for " + table.getTableName() + ": " + (AcidUtils.isFullAcidTable(table) ? "acid" : - (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") + (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") ) ); } @@ -553,7 +555,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return loadTableTask; } - private static Task createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ + private static Task createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x) { return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } @@ -565,20 +567,20 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } private static Task alterTableTask(ImportTableDesc tableDesc, - EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { + EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { tableDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { tableDesc.setReplicationSpec(replicationSpec); } return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } private static Task alterSinglePartition( - ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, - ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, - EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { + ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, + ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, + EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { addPartitionDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { addPartitionDesc.setReplicationSpec(replicationSpec); } AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); @@ -591,17 +593,18 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } private static Task addSinglePartition(ImportTableDesc tblDesc, - Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) - throws MetaException, IOException, HiveException { + Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) + throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); boolean isAutoPurge = false; boolean needRecycle = false; boolean copyToMigratedTxnTable = false; - if (tblDesc.isExternal() && tblDesc.getLocation() == null) { - x.getLOG().debug("Importing in-place: adding AddPart for partition " - + partSpecToString(partSpec.getPartSpec())); + if (shouldSkipDataCopyInReplScope(tblDesc, replicationSpec) + || (tblDesc.isExternal() && tblDesc.getLocation() == null)) { + x.getLOG().debug("Adding AddPart and skipped data copy for partition " + + partSpecToString(partSpec.getPartSpec())); // addPartitionDesc already has the right partition location @SuppressWarnings("unchecked") Task addPartTask = TaskFactory.get( @@ -611,8 +614,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, String srcLocation = partSpec.getLocation(); fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) - + " with source location: " + srcLocation); + + partSpecToString(partSpec.getPartSpec()) + + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); LoadFileType loadFileType; @@ -620,7 +623,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope() && x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { loadFileType = LoadFileType.IGNORE; - destPath = tgtLocation; + destPath = tgtLocation; isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); if (table.isTemporary()) { needRecycle = false; @@ -636,19 +639,19 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, //Replication scope the write id will be invalid boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || replicationSpec.isInReplicationScope(); - destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) + destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); } - Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) || + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) || replicationSpec.isInReplicationScope() ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " - + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " - + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " + - (AcidUtils.isFullAcidTable(table) ? "acid" : - (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") - ) + + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + + writeId + " for " + partSpecToString(partSpec.getPartSpec()) + ": " + + (AcidUtils.isFullAcidTable(table) ? "acid" : + (AcidUtils.isInsertOnlyTable(table) ? "mm" : "flat") + ) ); } @@ -710,6 +713,17 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } } + /** + * In REPL LOAD flow, the data copy is done separately for external tables using data locations + * dumped in file {@link ReplExternalTables#FILE_NAME}. So, we can skip copying it here. + */ + private static boolean shouldSkipDataCopyInReplScope(ImportTableDesc tblDesc, ReplicationSpec replicationSpec) { + return ((replicationSpec != null) + && replicationSpec.isInReplicationScope() + && tblDesc.isExternal() + && replicationSpec.isMigratingToExternalTable()); + } + /** * Helper method to set location properly in partSpec */ @@ -1210,7 +1224,8 @@ private static void createReplImportTasks( addPartitionDesc.getPartition(0).getPartSpec()); } } - } else if (!replicationSpec.isMetadataOnly()) { + } else if (!replicationSpec.isMetadataOnly() + && !shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)) { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); t.addDependentTask(loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));