diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 72da2f1ba3..b8e96f2595 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -673,6 +674,59 @@ public void retryIncBootstrapExternalTablesFromDifferentDumpWithoutCleanTablesCo ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode()); } + @Test + public void dynamicallyConvertManagedToExternalTable() throws Throwable { + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + List loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tupleBootstrapManagedTable = primary.run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create table t2 (id int) partitioned by (key int)") + .run("insert into table t2 partition(key=10) values (1)") + .dump(primaryDbName, null, dumpWithClause); + + replica.load(replicatedDbName, tupleBootstrapManagedTable.dumpLocation, loadWithClause); + + Hive hiveForReplica = Hive.get(replica.hiveConf); + Table replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1"); + Path oldTblLocT1 = replicaTable.getDataLocation(); + + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2"); + Path oldTblLocT2 = replicaTable.getDataLocation(); + + WarehouseInstance.Tuple tupleIncConvertToExternalTbl = primary.run("use " + primaryDbName) + .run("alter table t1 set tblproperties('EXTERNAL'='true')") + .run("alter table t2 set tblproperties('EXTERNAL'='true')") + .dump(primaryDbName, tupleBootstrapManagedTable.lastReplicationId, dumpWithClause); + + assertExternalFileInfo(Arrays.asList("t1", "t2"), + new Path(tupleIncConvertToExternalTbl.dumpLocation, FILE_NAME)); + replica.load(replicatedDbName, tupleIncConvertToExternalTbl.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select id from t1") + .verifyResult("1") + .run("select id from t2 where key=10") + .verifyResult("1"); + + // Check if the table type is set correctly in target. + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1"); + assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType())); + + replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2"); + assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType())); + + // Verify if new table location is set inside the base directory. + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + // Old location should be removed and set to new location. + assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT1)); + assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT2)); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java index af39c16570..0e0cbbd51a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.ddl.DDLOperation; @@ -54,17 +55,24 @@ public int execute() throws HiveException { Table tbl = desc.toTable(context.getConf()); LOG.debug("creating table {} on {}", tbl.getFullyQualifiedName(), tbl.getDataLocation()); - if (desc.getReplicationSpec().isInReplicationScope() && (!desc.getReplaceMode())){ - // if this is a replication spec, then replace-mode semantics might apply. - // if we're already asking for a table replacement, then we can skip this check. - // however, otherwise, if in replication scope, and we've not been explicitly asked - // to replace, we should check if the object we're looking at exists, and if so, + boolean dataLocationChanged = false; + if (desc.getReplicationSpec().isInReplicationScope()) { + // If in replication scope, we should check if the object we're looking at exists, and if so, // trigger replace-mode semantics. Table existingTable = context.getDb().getTable(tbl.getDbName(), tbl.getTableName(), false); - if (existingTable != null){ + if (existingTable != null) { if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) { desc.setReplaceMode(true); // we replace existing table. ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters()); + + // If location of an existing managed table is changed, then need to delete the old location if exists. + // This scenario occurs when a managed table is converted into external table at source. In this case, + // at target, the table data would be moved to different location under base directory for external tables. + if (existingTable.getTableType().equals(TableType.MANAGED_TABLE) + && tbl.getTableType().equals(TableType.EXTERNAL_TABLE) + && (!existingTable.getDataLocation().equals(tbl.getDataLocation()))) { + dataLocationChanged = true; + } } else { LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update", desc.getTableName()); return 0; // no replacement, the existing table state is newer than our update. @@ -74,7 +82,7 @@ public int execute() throws HiveException { // create the table if (desc.getReplaceMode()) { - createTableReplaceMode(tbl); + createTableReplaceMode(tbl, dataLocationChanged); } else { createTableNonReplaceMode(tbl); } @@ -83,7 +91,7 @@ public int execute() throws HiveException { return 0; } - private void createTableReplaceMode(Table tbl) throws HiveException { + private void createTableReplaceMode(Table tbl, boolean dataLocationChanged) throws HiveException { ReplicationSpec replicationSpec = desc.getReplicationSpec(); long writeId = 0; EnvironmentContext environmentContext = null; @@ -108,6 +116,12 @@ private void createTableReplaceMode(Table tbl) throws HiveException { } } + // If table's data location is moved, then set the corresponding flag in environment context to + // indicate Metastore to update location of all partitions and delete old directory. + if (dataLocationChanged) { + environmentContext = ReplUtils.setDataLocationChangedFlag(environmentContext); + } + // replace-mode creates are really alters using CreateTableDesc. context.getDb().alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, environmentContext, true, writeId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index a5ed840879..05e2a138bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -19,8 +19,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -206,4 +208,12 @@ public static boolean isFirstIncPending(Map parameters) { // and not through replication. return firstIncPendFlag != null && !firstIncPendFlag.isEmpty() && "true".equalsIgnoreCase(firstIncPendFlag); } + + public static EnvironmentContext setDataLocationChangedFlag(EnvironmentContext envContext) { + if (envContext == null) { + envContext = new EnvironmentContext(); + } + envContext.putToProperties(ReplConst.DATA_LOCATION_CHANGED, ReplConst.TRUE); + return envContext; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java new file mode 100644 index 0000000000..8e85198c2d --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +/** + * A class that defines the constant strings used by the replication implementation. + */ + +public class ReplConst { + + /** + * The constant that denotes the table data location is changed to different path. This indicates + * Metastore to update corresponding path in Partitions and also need to delete old path. + */ + public static final String DATA_LOCATION_CHANGED = "DATA_LOCATION_CHANGED"; + + public static final String TRUE = "true"; +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 617c7bc012..2112d7ac18 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ReplConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -99,9 +100,12 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam dbname = dbname.toLowerCase(); final boolean cascade = environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get( - StatsSetupConst.CASCADE)); + && environmentContext.isSetProperties() + && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(StatsSetupConst.CASCADE)); + final boolean replDataLocationChanged = environmentContext != null + && environmentContext.isSetProperties() + && ReplConst.TRUE.equals(environmentContext.getProperties().get(ReplConst.DATA_LOCATION_CHANGED)); + if (newt == null) { throw new InvalidOperationException("New table is null"); } @@ -154,6 +158,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam msdb.openTransaction(); // get old table // Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats. + Database olddb = msdb.getDatabase(catName, dbname); oldt = msdb.getTable(catName, dbname, name, null); if (oldt == null) { throw new InvalidOperationException("table " + @@ -192,12 +197,12 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam // 2) the table is not an external table, and // 3) the user didn't change the default location (or new location is empty), and // 4) the table was not initially created with a specified location - if (rename - && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) - && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 - || StringUtils.isEmpty(newt.getSd().getLocation())) - && !MetaStoreUtils.isExternalTable(oldt)) { - Database olddb = msdb.getDatabase(catName, dbname); + if (replDataLocationChanged + || (rename + && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) + && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 + || StringUtils.isEmpty(newt.getSd().getLocation())) + && !MetaStoreUtils.isExternalTable(oldt))) { // if a table was created in a user specified location using the DDL like // create table tbl ... location ...., it should be treated like an external table // in the table rename, its data location should not be changed. We can check @@ -209,7 +214,13 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name) && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR); - if (!tableInSpecifiedLoc) { + if (replDataLocationChanged) { + // If data location is changed in replication flow, then new path was already set in + // the newt. Also, it is as good as the data is moved and set dataWasMoved=true so that + // location in partitions are also updated accordingly. + destPath = new Path(newt.getSd().getLocation()); + dataWasMoved = true; + } else if (!tableInSpecifiedLoc) { srcFs = wh.getFs(srcPath); // get new location @@ -357,6 +368,13 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam } } + // If data location is changed in replication flow, then need to delete the old path. + if (replDataLocationChanged) { + Path deleteOldDataLoc = new Path(oldt.getSd().getLocation()); + boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge")); + wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb); + } + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { txnAlterTableEventResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_TABLE, @@ -385,7 +403,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam if (!success) { LOG.error("Failed to alter table " + TableName.getQualified(catName, dbname, name)); msdb.rollbackTransaction(); - if (dataWasMoved) { + if (!replDataLocationChanged && dataWasMoved) { try { if (destFs.exists(destPath)) { if (!destFs.rename(destPath, srcPath)) {