diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 72da2f1ba3..b8e96f2595 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -673,6 +674,59 @@ public void retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesCo ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); } + @Test + public void dynamicallyConvertManagedToExternalTable() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tupleBootstrapManagedTable = primary.run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create table t2 (id int) partitioned by (key int)") + .run("insert into table t2 partition(key=10) values (1)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapManagedTable.dumpLocation, loadWithClause); + + Hive hiveForReplica = Hive.get(replica.hiveConf); + Table replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1"); + Path oldTblLocT1 = replicaTable.getDataLocation(); + + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2"); + Path oldTblLocT2 = replicaTable.getDataLocation(); + + WarehouseInstance.Tuple tupleIncConvertToExternalTbl = primary.run("use " + primaryDbName) + .run("alter table t1 set tblproperties('EXTERNAL'='true')") + .run("alter table t2 set tblproperties('EXTERNAL'='true')") + .dump(primaryDbName, tupleBootstrapManagedTable.lastReplicationId, dumpWithClause); + + assertExternalFileInfo(Arrays.asList("t1", "t2"), + new Path(tupleIncConvertToExternalTbl.dumpLocation, FILE_NAME)); + replica.load(replicatedDbName, tupleIncConvertToExternalTbl.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select id from t1") + .verifyResult("1") + .run("select id from t2 where key=10") + .verifyResult("1"); + + // Check if the table type is set correctly in target. + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1"); + assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType())); + + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2"); + assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType())); + + // Verify if new table location is set inside the base directory. + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + // Old location should be removed and set to new location. + assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT1)); + assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT2)); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java index af39c16570..24373feed7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.ddl.DDLOperation; @@ -54,17 +55,24 @@ public int execute() throws HiveException { Table tbl = desc.toTable(context.getConf()); LOG.debug("creating table {} on {}", tbl.getFullyQualifiedName(), tbl.getDataLocation()); - if (desc.getReplicationSpec().isInReplicationScope() && (!desc.getReplaceMode())){ - // if this is a replication spec, then replace-mode semantics might apply. - // if we're already asking for a table replacement, then we can skip this check. - // however, otherwise, if in replication scope, and we've not been explicitly asked - // to replace, we should check if the object we're looking at exists, and if so, + boolean replDataLocationChanged = false; + if (desc.getReplicationSpec().isInReplicationScope()) { + // If in replication scope, we should check if the object we're looking at exists, and if so, // trigger replace-mode semantics. Table existingTable = context.getDb().getTable(tbl.getDbName(), tbl.getTableName(), false); - if (existingTable != null){ + if (existingTable != null) { if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) { desc.setReplaceMode(true); // we replace existing table. ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters()); + + // If location of an existing managed table is changed, then need to delete the old location if exists. + // This scenario occurs when a managed table is converted into external table at source. In this case, + // at target, the table data would be moved to different location under base directory for external tables. + if (existingTable.getTableType().equals(TableType.MANAGED_TABLE) + && tbl.getTableType().equals(TableType.EXTERNAL_TABLE) + && (!existingTable.getDataLocation().equals(tbl.getDataLocation()))) { + replDataLocationChanged = true; + } } else { LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update", desc.getTableName()); return 0; // no replacement, the existing table state is newer than our update. @@ -74,7 +82,7 @@ public int execute() throws HiveException { // create the table if (desc.getReplaceMode()) { - createTableReplaceMode(tbl); + createTableReplaceMode(tbl, replDataLocationChanged); } else { createTableNonReplaceMode(tbl); } @@ -83,7 +91,7 @@ public int execute() throws HiveException { return 0; } - private void createTableReplaceMode(Table tbl) throws HiveException { + private void createTableReplaceMode(Table tbl, boolean replDataLocationChanged) throws HiveException { ReplicationSpec replicationSpec = desc.getReplicationSpec(); long writeId = 0; EnvironmentContext environmentContext = null; @@ -108,6 +116,12 @@ private void createTableReplaceMode(Table tbl) throws HiveException { } } + // In replication flow, if table's data location is changed, then set the corresponding flag in + // environment context to notify Metastore to update location of all partitions and delete old directory. + if (replDataLocationChanged) { + environmentContext = ReplUtils.setReplDataLocationChangedFlag(environmentContext); + } + // replace-mode creates are really alters using CreateTableDesc. context.getDb().alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, environmentContext, true, writeId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index a5ed840879..072189b344 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -19,8 +19,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -206,4 +208,12 @@ public static boolean isFirstIncPending(Map parameters) { // and not through replication. return firstIncPendFlag != null && !firstIncPendFlag.isEmpty() && "true".equalsIgnoreCase(firstIncPendFlag); } + + public static EnvironmentContext setReplDataLocationChangedFlag(EnvironmentContext envContext) { + if (envContext == null) { + envContext = new EnvironmentContext(); + } + envContext.putToProperties(ReplConst.REPL_DATA_LOCATION_CHANGED, ReplConst.TRUE); + return envContext; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java new file mode 100644 index 0000000000..e25189d35d --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +/** + * A class that defines the constant strings used by the replication implementation. + */ + +public class ReplConst { + + /** + * The constant that denotes the table data location is changed to different path. This indicates + * Metastore to update corresponding path in Partitions and also need to delete old path. + */ + public static final String REPL_DATA_LOCATION_CHANGED = "REPL_DATA_LOCATION_CHANGED"; + + public static final String TRUE = "true"; +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 617c7bc012..ad670c9f76 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ReplConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -98,10 +99,16 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam name = name.toLowerCase(); dbname = dbname.toLowerCase(); - final boolean cascade = environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get( - StatsSetupConst.CASCADE)); + final boolean cascade; + final boolean replDataLocationChanged; + if ((environmentContext != null) && environmentContext.isSetProperties()) { + cascade = StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(StatsSetupConst.CASCADE)); + replDataLocationChanged = ReplConst.TRUE.equals(environmentContext.getProperties().get(ReplConst.REPL_DATA_LOCATION_CHANGED)); + } else { + cascade = false; + replDataLocationChanged = false; + } + if (newt == null) { throw new InvalidOperationException("New table is null"); } @@ -126,6 +133,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam boolean dataWasMoved = false; boolean isPartitionedTable = false; + Database olddb = null; Table oldt = null; List transactionalListeners = handler.getTransactionalListeners(); @@ -154,6 +162,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam msdb.openTransaction(); // get old table // Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats. + olddb = msdb.getDatabase(catName, dbname); oldt = msdb.getTable(catName, dbname, name, null); if (oldt == null) { throw new InvalidOperationException("table " + @@ -186,73 +195,87 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam } } - // rename needs change the data location and move the data to the new location corresponding + // Two mutually exclusive flows possible. + // i) Partition locations needs update if replDataLocationChanged is true which means table's + // data location is changed with all partition sub-directories. + // ii) Rename needs change the data location and move the data to the new location corresponding // to the new name if: // 1) the table is not a virtual view, and // 2) the table is not an external table, and // 3) the user didn't change the default location (or new location is empty), and // 4) the table was not initially created with a specified location - if (rename - && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) - && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 - || StringUtils.isEmpty(newt.getSd().getLocation())) - && !MetaStoreUtils.isExternalTable(oldt)) { - Database olddb = msdb.getDatabase(catName, dbname); - // if a table was created in a user specified location using the DDL like - // create table tbl ... location ...., it should be treated like an external table - // in the table rename, its data location should not be changed. We can check - // if the table directory was created directly under its database directory to tell - // if it is such a table + if (replDataLocationChanged + || (rename + && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) + && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 + || StringUtils.isEmpty(newt.getSd().getLocation())) + && !MetaStoreUtils.isExternalTable(oldt))) { srcPath = new Path(oldt.getSd().getLocation()); - String oldtRelativePath = (new Path(olddb.getLocationUri()).toUri()) - .relativize(srcPath.toUri()).toString(); - boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name) - && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR); - - if (!tableInSpecifiedLoc) { - srcFs = wh.getFs(srcPath); - - // get new location - Database db = msdb.getDatabase(catName, newDbName); - Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath); - destPath = new Path(databasePath, newTblName); - destFs = wh.getFs(destPath); - newt.getSd().setLocation(destPath.toString()); - - // check that destination does not exist otherwise we will be - // overwriting data - // check that src and dest are on the same file system - if (!FileUtils.equalsFileSystem(srcFs, destFs)) { - throw new InvalidOperationException("table new location " + destPath - + " is on a different file system than the old location " - + srcPath + ". This operation is not supported"); - } - - try { - if (destFs.exists(destPath)) { - throw new InvalidOperationException("New location for this table " + - TableName.getQualified(catName, newDbName, newTblName) + - " already exists : " + destPath); + if (replDataLocationChanged) { + // If data location is changed in replication flow, then new path was already set in + // the newt. Also, it is as good as the data is moved and set dataWasMoved=true so that + // location in partitions are also updated accordingly. + // No need to validate if the destPath exists as in replication flow, data gets replicated + // separately. + destPath = new Path(newt.getSd().getLocation()); + dataWasMoved = true; + } else { + // Rename flow. + // If a table was created in a user specified location using the DDL like + // create table tbl ... location ...., it should be treated like an external table + // in the table rename, its data location should not be changed. We can check + // if the table directory was created directly under its database directory to tell + // if it is such a table + String oldtRelativePath = (new Path(olddb.getLocationUri()).toUri()) + .relativize(srcPath.toUri()).toString(); + boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name) + && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR); + if (!tableInSpecifiedLoc) { + srcFs = wh.getFs(srcPath); + + // get new location + Database db = msdb.getDatabase(catName, newDbName); + Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath); + destPath = new Path(databasePath, newTblName); + destFs = wh.getFs(destPath); + + newt.getSd().setLocation(destPath.toString()); + + // check that destination does not exist otherwise we will be + // overwriting data + // check that src and dest are on the same file system + if (!FileUtils.equalsFileSystem(srcFs, destFs)) { + throw new InvalidOperationException("table new location " + destPath + + " is on a different file system than the old location " + + srcPath + ". This operation is not supported"); } - // check that src exists and also checks permissions necessary, rename src to dest - if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, - ReplChangeManager.isSourceOfReplication(olddb))) { - dataWasMoved = true; + + try { + if (destFs.exists(destPath)) { + throw new InvalidOperationException("New location for this table " + + TableName.getQualified(catName, newDbName, newTblName) + + " already exists : " + destPath); + } + // check that src exists and also checks permissions necessary, rename src to dest + if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, + ReplChangeManager.isSourceOfReplication(olddb))) { + dataWasMoved = true; + } + } catch (IOException | MetaException e) { + LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e); + throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name + + " failed to move data due to: '" + getSimpleMessage(e) + + "' See hive log file for details."); } - } catch (IOException | MetaException e) { - LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e); - throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name + - " failed to move data due to: '" + getSimpleMessage(e) - + "' See hive log file for details."); - } - if (!HiveMetaStore.isRenameAllowed(olddb, db)) { - LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) + - "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed "); - throw new MetaException("Alter table not allowed for table " + - TableName.getQualified(catName, dbname, name) + - "to new table = " + TableName.getQualified(catName, newDbName, newTblName)); + if (!HiveMetaStore.isRenameAllowed(olddb, db)) { + LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) + + "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed "); + throw new MetaException("Alter table not allowed for table " + + TableName.getQualified(catName, dbname, name) + + "to new table = " + TableName.getQualified(catName, newDbName, newTblName)); + } } } @@ -382,10 +405,29 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam "Unable to change partition or table. Object " + e.getMessage() + " does not exist." + " Check metastore logs for detailed stack."); } finally { - if (!success) { + if (success) { + // Txn was committed successfully. + // If data location is changed in replication flow, then need to delete the old path. + if (replDataLocationChanged) { + assert(olddb != null); + assert(oldt != null); + Path deleteOldDataLoc = new Path(oldt.getSd().getLocation()); + boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge")); + try { + wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb); + LOG.info("Deleted the old data location: {} for the table: {}", + deleteOldDataLoc, dbname + "." + name); + } catch (MetaException ex) { + // Eat the exception as it doesn't affect the state of existing tables. + // Expect, user to manually drop this path when exception and so logging a warning. + LOG.warn("Unable to delete the old data location: {} for the table: {}", + deleteOldDataLoc, dbname + "." + name); + } + } + } else { LOG.error("Failed to alter table " + TableName.getQualified(catName, dbname, name)); msdb.rollbackTransaction(); - if (dataWasMoved) { + if (!replDataLocationChanged && dataWasMoved) { try { if (destFs.exists(destPath)) { if (!destFs.rename(destPath, srcPath)) {