diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f74485bae5..31cebe414c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -515,6 +515,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This is the base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " + "external table path on target cluster."), + STRICT_MANAGED_TABLES_MIGRATION_OWNER("strict.managed.tables.migration.owner", "hive", + "This is used by HiveStrictManagedMigration tool to check the table type conversion rules. If the " + + "owner of the table location is not same as this config value, then the table is converted to an " + + "external table. The config is also used during replication from a cluster with " + + "hive.strict.managed.tables = false to a target cluster with hive.strict.managed.tables = true."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index e6113942dd..ea57199888 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -206,7 +206,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { FileIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new FileIterator(t.getSd().getLocation()); CreateTableMessage msg = - MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter); + MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter, tableEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgEncoder.getSerializer().serialize(msg)); @@ -243,7 +243,7 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table after = tableEvent.getNewTable(); AlterTableMessage msg = MessageBuilder.getInstance() .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), - tableEvent.getWriteId()); + tableEvent.getWriteId(), tableEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgEncoder.getSerializer().serialize(msg) @@ -361,7 +361,7 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio PartitionFilesIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t); EventMessage msg = MessageBuilder.getInstance() - .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter); + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter, partitionEvent.getLocOwner()); MessageSerializer serializer = msgEncoder.getSerializer(); NotificationEvent event = new NotificationEvent(0, now(), @@ -401,7 +401,7 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce AlterPartitionMessage msg = MessageBuilder.getInstance() .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(), - partitionEvent.getWriteId()); + partitionEvent.getWriteId(), partitionEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgEncoder.getSerializer().serialize(msg)); @@ -540,7 +540,7 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { Table tableObj = insertEvent.getTableObj(); InsertMessage msg = MessageBuilder.getInstance().buildInsertMessage(tableObj, insertEvent.getPartitionObj(), insertEvent.isReplace(), - new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())); + new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()), insertEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.INSERT.toString(), msgEncoder.getSerializer().serialize(msg)); @@ -759,7 +759,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(), updateTableColumnStatEvent.getTableObj(), updateTableColumnStatEvent.getTableParameters(), - updateTableColumnStatEvent.getWriteId()); + updateTableColumnStatEvent.getWriteId(), + updateTableColumnStatEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc(); @@ -789,7 +790,8 @@ public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePar updatePartColStatEvent.getPartVals(), updatePartColStatEvent.getPartParameters(), updatePartColStatEvent.getTableObj(), - updatePartColStatEvent.getWriteId()); + updatePartColStatEvent.getWriteId(), + updatePartColStatEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(), msgEncoder.getSerializer().serialize(msg)); ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc(); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index efafe0c641..f2c7b3731c 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -120,7 +120,8 @@ public void onAddPartition(AddPartitionEvent partitionEvent) Table table = partitionEvent.getTable(); String topicName = getTopicName(table); if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator()), topicName); + send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator(), + partitionEvent.getLocOwner()), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partitionEvent.getTable().getDbName() @@ -141,7 +142,7 @@ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { String topicName = getTopicName(ape.getTable()); send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after, - ape.getWriteId()), topicName); + ape.getWriteId(), ape.getLocOwner()), topicName); } } @@ -221,7 +222,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { throw me; } String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); - send(messageFactory.buildCreateTableMessage(newTbl), topicName); + send(messageFactory.buildCreateTableMessage(newTbl, tableEvent.getLocOwner()), topicName); } } @@ -255,7 +256,8 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { // DB topic - Alan. String topicName = getTopicPrefix(tableEvent.getIHMSHandler().getConf()) + "." + after.getDbName().toLowerCase(); - send(messageFactory.buildAlterTableMessage(before, after, tableEvent.getWriteId()), topicName); + send(messageFactory.buildAlterTableMessage(before, after, tableEvent.getWriteId(), + tableEvent.getLocOwner()), topicName); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java index e16ffa5269..a5ab595624 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java @@ -50,6 +50,8 @@ protected AddPartitionMessage() { */ public abstract List> getPartitions (); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java index fc8f6b7560..2b8ed5e904 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -42,6 +42,8 @@ protected AlterPartitionMessage() { public abstract Long getWriteId(); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java index 416dd10bb2..817173a02d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java @@ -42,4 +42,6 @@ public HCatEventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java index c7ca09cc24..26ffc273d6 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java @@ -41,6 +41,8 @@ protected CreateTableMessage() { public abstract String getTableType(); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java index 1754df8501..247dfd9001 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java @@ -56,6 +56,8 @@ protected InsertMessage() { */ public abstract List getFiles(); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 782fffb516..c36ec158cf 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -117,9 +117,10 @@ public static MessageDeserializer getDeserializer(String format, /** * Factory method for CreateTableMessage. * @param table The Table being created. + * @param locOwner Owner of the table location. * @return CreateTableMessage instance. */ - public abstract CreateTableMessage buildCreateTableMessage(Table table); + public abstract CreateTableMessage buildCreateTableMessage(Table table, String locOwner); /** * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, @@ -129,9 +130,10 @@ public static MessageDeserializer getDeserializer(String format, * @param before The table before the alter * @param after The table after the alter * @param writeId writeId under which alter is done (for ACID tables) + * @param locOwner Owner of the table location. * @return */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId); + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId, String locOwner); /** * Factory method for DropTableMessage. @@ -140,13 +142,15 @@ public static MessageDeserializer getDeserializer(String format, */ public abstract DropTableMessage buildDropTableMessage(Table table); - /** - * Factory method for AddPartitionMessage. - * @param table The Table to which the partitions are added. - * @param partitions The iterator to set of Partitions being added. - * @return AddPartitionMessage instance. - */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitions The iterator to set of Partitions being added. + * @param locOwner Owner of the table/partition location. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions, + String locOwner); /** * Factory method for building AlterPartitionMessage @@ -154,10 +158,11 @@ public static MessageDeserializer getDeserializer(String format, * @param before The partition before it was altered * @param after The partition after it was altered * @param writeId writeId under which alter is done (for ACID tables) + * @param locOwner Owner of the table/partition location. * @return a new AlterPartitionMessage */ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, Long writeId); + Partition after, Long writeId, String locOwner); /** * Factory method for DropPartitionMessage. @@ -188,10 +193,12 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa * @param partVals Partition values for the partition that the insert occurred in, may be null * if the insert was done into a non-partitioned table * @param files List of files created as a result of the insert, may be null. + * @param locOwner Owner of the table/partition location. * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map partVals, List files); + Map partVals, List files, + String locOwner); /** * Factory method for building insert message @@ -200,8 +207,10 @@ public abstract InsertMessage buildInsertMessage(String db, String table, * @param partVals Partition values for the partition that the insert occurred in, may be null * if the insert was done into a non-partitioned table * @param files List of files created as a result of the insert, may be null. + * @param locOwner Owner of the table/partition location. * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, Table table, - Map partVals, List files); + Map partVals, List files, + String locOwner); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java index ebdd29e413..e8d959c9b4 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java @@ -31,7 +31,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -45,12 +45,12 @@ public JSONAddPartitionMessage() {} public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - List> partitions, Long timestamp) { - this(server, servicePrincipal, db, table, null, partitions, timestamp); + List> partitions, String locOwner, Long timestamp) { + this(server, servicePrincipal, db, table, null, partitions, locOwner, timestamp); } public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - String tableType, List> partitions, Long timestamp) { + String tableType, List> partitions, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -58,6 +58,7 @@ public JSONAddPartitionMessage(String server, String servicePrincipal, String db this.tableType = tableType; this.partitions = partitions; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } @@ -84,6 +85,11 @@ public String getTableType() { @Override public List> getPartitions () { return partitions; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index c28524165e..a94ac800a4 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -34,7 +34,7 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp, writeId; @@ -53,8 +53,9 @@ public JSONAlterPartitionMessage(String server, String table, Map keyValues, Long writeId, + String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, keyValues, writeId, timestamp); + this(server, servicePrincipal, db, table, null, keyValues, writeId, locOwner, timestamp); } public JSONAlterPartitionMessage(String server, @@ -64,6 +65,7 @@ public JSONAlterPartitionMessage(String server, String tableType, Map keyValues, long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -73,6 +75,7 @@ public JSONAlterPartitionMessage(String server, this.timestamp = timestamp; this.keyValues = keyValues; this.writeId = writeId; + this.locOwner = locOwner; checkValid(); } @@ -116,6 +119,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index 9c0799b473..77bc5a7481 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -32,7 +32,7 @@ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp, writeId; @@ -47,8 +47,9 @@ public JSONAlterTableMessage(String server, String db, String table, Long writeId, - Long timestamp) { - this(server, servicePrincipal, db, table, null, writeId, timestamp); + Long timestamp, + String locOwner) { + this(server, servicePrincipal, db, table, null, writeId, locOwner, timestamp); } public JSONAlterTableMessage(String server, @@ -57,6 +58,7 @@ public JSONAlterTableMessage(String server, String table, String tableType, Long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -65,6 +67,7 @@ public JSONAlterTableMessage(String server, this.tableType = tableType; this.timestamp = timestamp; this.writeId = writeId; + this.locOwner = locOwner; checkValid(); } @@ -103,6 +106,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java index 60333d5aec..07e96ef541 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java @@ -28,7 +28,7 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -38,19 +38,20 @@ */ public JSONCreateTableMessage() {} - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, timestamp); + this(server, servicePrincipal, db, table, null, locOwner, timestamp); } public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, - String tableType, Long timestamp) { + String tableType, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.tableType = tableType; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } @@ -74,6 +75,11 @@ public String getTableType() { if (tableType != null) return tableType; else return ""; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java index e05113cd33..553ebbc1a0 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java @@ -31,7 +31,7 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -48,13 +48,13 @@ public JSONInsertMessage() {} public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map partKeyVals, List files, Long timestamp) { - this(server, servicePrincipal, db, table, null, partKeyVals, files, timestamp); + Map partKeyVals, List files, String locOwner, Long timestamp) { + this(server, servicePrincipal, db, table, null, partKeyVals, files, locOwner, timestamp); } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - String tableType, Map partKeyVals, List files, - Long timestamp) { + String tableType, Map partKeyVals, List files, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -63,6 +63,7 @@ public JSONInsertMessage(String server, String servicePrincipal, String db, Stri this.timestamp = timestamp; this.partKeyVals = partKeyVals; this.files = files; + this.locOwner = locOwner; checkValid(); } @@ -97,6 +98,11 @@ public String getTableType() { @Override public Long getTimestamp() { return timestamp; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 770dd1e5a6..0bcddbf5f4 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -82,15 +82,15 @@ public DropDatabaseMessage buildDropDatabaseMessage(Database db) { } @Override - public CreateTableMessage buildCreateTableMessage(Table table) { + public CreateTableMessage buildCreateTableMessage(Table table, String locOwner) { return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), now()); } @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId) { + public AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId, String locOwner) { return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), - before.getTableName(), before.getTableType(), writeId, now()); + before.getTableName(), before.getTableType(), writeId, locOwner, now()); } @Override @@ -100,18 +100,19 @@ public DropTableMessage buildDropTableMessage(Table table) { } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator) { + public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator, + String locOwner) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), - MessageBuilder.getPartitionKeyValues(table, partitionsIterator), now()); + MessageBuilder.getPartitionKeyValues(table, partitionsIterator), locOwner, now()); } @Override public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, Partition after, - Long writeId) { + Long writeId, String locOwner) { return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), before.getTableName(), table.getTableType(), - MessageBuilder.getPartitionKeyValues(table,before), writeId, now()); + MessageBuilder.getPartitionKeyValues(table, before), writeId, locOwner, now()); } @Override @@ -135,16 +136,16 @@ public DropFunctionMessage buildDropFunctionMessage(Function fn) { @Override public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, - List files) { + List files, String locOwner) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, null, - partKeyVals, files, now()); + partKeyVals, files, locOwner, now()); } @Override public InsertMessage buildInsertMessage(String db, Table table, Map partKeyVals, - List files) { + List files, String locOwner) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), table.getTableType(), partKeyVals, files, now()); + table.getTableName(), table.getTableType(), partKeyVals, files, locOwner, now()); } private long now() { diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java index dca56ee031..770f7fdd16 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java @@ -73,7 +73,7 @@ public static void testCreate() throws HCatException { t.setDbName("testdb"); t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(0, (int)System.currentTimeMillis(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t, null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java index 846ebc75e7..253115a019 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java @@ -173,7 +173,7 @@ public void testCreateTable() throws IOException { t.setDbName("testdb"); t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t, null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -267,7 +267,7 @@ public void testAlterTable() throws IOException { t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), HCatConstants.HCAT_ALTER_TABLE_EVENT, - msgFactory.buildAlterTableMessage(t, t, t.getWriteId()).toString()); + msgFactory.buildAlterTableMessage(t, t, t.getWriteId(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -324,7 +324,8 @@ public void testAddPartition() throws IOException { addedPtns.add(createPtn(t, Arrays.asList("201", "xyz"))); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), - HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, addedPtns.iterator()).toString()); + HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, + addedPtns.iterator(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -456,7 +457,7 @@ public void testAlterPartition() throws HCatException { NotificationEvent event = new NotificationEvent(getEventId(), getTime(), HCatConstants.HCAT_ALTER_PARTITION_EVENT, msgFactory.buildAlterPartitionMessage(t, - p, p, p.getWriteId()).toString()); + p, p, p.getWriteId(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -511,7 +512,7 @@ public void testInsert() throws HCatException { t.getDbName(), t.getTableName(), getPtnDesc(t,p), - files + files, null ).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 3820fabbf9..4104aca00f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3484,7 +3484,7 @@ private NotificationEvent createDummyEvent(String dbname, String tblname, long e evid, (int)System.currentTimeMillis(), MessageBuilder.CREATE_TABLE_EVENT, - MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()) + MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator(), null) .toString() ); event.setDbName(t.getDbName()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index bafcdbe387..5917da4a56 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -88,32 +88,33 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); HashMap hiveConfigs = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); + put("fs.defaultFS", fs.getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); }}; replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); HashMap configsForPrimary = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); + put("fs.defaultFS", fs.getUri().toString()); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + put("strict.managed.tables.migration.owner", System.getProperty("user.name")); }}; configsForPrimary.putAll(overrideConfigs); primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); @@ -476,4 +477,74 @@ public void testIncrementalLoadMigrationToAcidWithMoveOptimization() throws Thro replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); } + + @Test + public void testForceMigrateToExternalTableBasedOnLocationOwnership() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table tbl (fld int)") + .run("insert into tbl values (1)") + .run("create table tbl_part (fld int) partitioned by (part int)") + .run("insert into tbl_part partition (part = 1) values (1)") + .run("insert into tbl_part partition (part = 2) values (2)"); + + assertFalse(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "tbl"))); + assertFalse(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "tbl_part"))); + + Table table = primary.getTable(primaryDbName, "tbl"); + Path location = new Path(table.getSd().getLocation()); + FileSystem fs = location.getFileSystem(primary.getConf()); + fs.setOwner(location, "junk", "junk"); + Path partLoc = new Path(primary.getPartition(primaryDbName, + "tbl_part", Collections.singletonList("1")).getSd().getLocation()); + fs.setOwner(partLoc, "junk", "junk"); + + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run(" select fld from tbl") + .verifyResult("1") + .run(" select fld from tbl_part") + .verifyResults(new String[] {"1", "2"}); + + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "tbl"))); + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "tbl_part"))); + + // Create locations with owner is set to junk and use it to create table/partition. + Path newLoc = new Path(location.getParent(), "tbl_inc"); + assertTrue(fs.mkdirs(newLoc)); + fs.setOwner(newLoc, "junk", "junk"); + + Path newLocPart = new Path(location.getParent(), "tbl_inc_part"); + assertTrue(fs.mkdirs(newLocPart)); + fs.setOwner(newLocPart, "junk", "junk"); + + primary.run("use " + primaryDbName) + .run("create table tbl_inc (fld int, fld1 int) location '" + newLoc.toUri() + "'") + .run("insert into tbl_inc values (1, 2)") + .run("create table tbl_inc_part (fld int) partitioned by (part string) location '" + + newLocPart.toUri() + "'") + .run("insert into tbl_inc_part partition (part = 'part1') values (1)") + .run("insert into tbl_inc_part partition (part = 'part2') values (2)"); + + assertFalse(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "tbl_inc"))); + assertFalse(MetaStoreUtils.isExternalTable(primary.getTable(primaryDbName, "tbl_inc_part"))); + + // Just to make sure the current location is not referred to decide the ownership, let's revert + // the ownership back to original owner. This shouldn't impact the conversion to external table as + // the incremental repl dump refers to location owner info stored in events. + String originalOwner = System.getProperty("user.name"); + fs.setOwner(newLoc, originalOwner, originalOwner); + fs.setOwner(newLocPart, originalOwner, originalOwner); + + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run(" select fld from tbl_inc") + .verifyResult("1") + .run(" select fld from tbl_inc_part") + .verifyResults(new String[] {"1", "2"}); + + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "tbl_inc"))); + assertTrue(MetaStoreUtils.isExternalTable(replica.getTable(replicatedDbName, "tbl_inc_part"))); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java index 5f59a2a684..5aaa401baf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java @@ -66,33 +66,34 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); HashMap hiveConfigs = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); - put("hive.metastore.transactional.event.listeners", ""); + put("fs.defaultFS", fs.getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); + put("hive.metastore.transactional.event.listeners", ""); }}; replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); HashMap configsForPrimary = new HashMap() {{ - put("fs.defaultFS", fs.getUri().toString()); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); + put("fs.defaultFS", fs.getUri().toString()); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + put("strict.managed.tables.migration.owner", System.getProperty("user.name")); }}; configsForPrimary.putAll(overrideConfigs); primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java index 49ad718b8d..a64e11ffdf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java @@ -40,30 +40,31 @@ public static void classLevelSetup() throws Exception { GzipJSONMessageEncoder.class.getCanonicalName()); Map replicaConfigs = new HashMap() {{ - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); }}; replicaConfigs.putAll(overrides); Map primaryConfigs = new HashMap() {{ - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + put("strict.managed.tables.migration.owner", System.getProperty("user.name")); }}; primaryConfigs.putAll(overrides); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java index 3b05220acb..68697de676 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java @@ -40,30 +40,31 @@ public static void classLevelSetup() throws Exception { GzipJSONMessageEncoder.class.getCanonicalName()); Map replicaConfigs = new HashMap() {{ - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); }}; replicaConfigs.putAll(overrides); Map primaryConfigs = new HashMap() {{ - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.support.concurrency", "false"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.strict.managed.tables", "false"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + put("strict.managed.tables.migration.owner", System.getProperty("user.name")); }}; primaryConfigs.putAll(overrides); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index e9a63f8bf1..aeafe856b5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -155,7 +155,7 @@ private void initialize(String cmRoot, String externalTableWarehouseRoot, String hiveConf.set(entry.getKey(), entry.getValue()); } - MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true); + MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true, true); // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to // .m2//repository/postgresql/postgresql/9.3-1102.jdbc41/postgresql-9.3-1102.jdbc41.jar. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 27009f0385..b568341504 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -96,11 +96,11 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && (table.getTableType() == TableType.MANAGED_TABLE)) { Hive hiveDb = Hive.get(hiveConf); - //TODO : dump metadata should be read to make sure that migration is required. + HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(), table.getTableType(), null, hiveConf, - hiveDb.getMSC(), true); + hiveDb.getMSC(), replicationSpec().forceMigrateToExternalTable()); HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(), migrationOption, false, getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index fbdbbdd5ac..0673581517 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -32,8 +33,11 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.DDLWork; @@ -164,15 +168,15 @@ public static boolean replCkptStatus(String dbName, Map props, S } public static boolean isTableMigratingToTransactional(HiveConf conf, - org.apache.hadoop.hive.metastore.api.Table tableObj) + org.apache.hadoop.hive.metastore.api.Table tableObj, + boolean forceMigrateToExternalTable) throws TException, IOException { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && !AcidUtils.isTransactionalTable(tableObj) && TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) { - //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata. HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE, - null, conf, null, true); + null, conf, null, forceMigrateToExternalTable); return migrationOption == MANAGED; } return false; @@ -194,11 +198,13 @@ private static void addOpenTxnTaskForMigration(String actualDbName, String actua String actualTblName, HiveConf conf, UpdatedMetaDataTracker updatedMetaDataTracker, Task childTask, - org.apache.hadoop.hive.metastore.api.Table tableObj) + org.apache.hadoop.hive.metastore.api.Table tableObj, + boolean forceMigrateToExternalTable) throws IOException, TException { List> taskList = new ArrayList<>(); taskList.add(childTask); - if (isTableMigratingToTransactional(conf, tableObj) && updatedMetaDataTracker != null) { + if ((updatedMetaDataTracker != null) + && isTableMigratingToTransactional(conf, tableObj, forceMigrateToExternalTable)) { addOpenTxnTaskForMigration(actualDbName, actualTblName, conf, updatedMetaDataTracker, taskList, childTask); } @@ -206,13 +212,14 @@ private static void addOpenTxnTaskForMigration(String actualDbName, String actua } public static List> addTasksForLoadingColStats(ColumnStatistics colStats, - HiveConf conf, - UpdatedMetaDataTracker updatedMetadata, - org.apache.hadoop.hive.metastore.api.Table tableObj, - long writeId) + HiveConf conf, + UpdatedMetaDataTracker updatedMetadata, + org.apache.hadoop.hive.metastore.api.Table tableObj, + long writeId, + boolean forceMigrateToExternalTable) throws IOException, TException { List> taskList = new ArrayList<>(); - boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj); + boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj, forceMigrateToExternalTable); ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats, isMigratingToTxn); work.setWriteId(writeId); Task task = TaskFactory.get(work, conf); @@ -264,4 +271,21 @@ public static Long getMigrationCurrentTblWriteId(HiveConf conf) { } return Long.parseLong(writeIdString); } + + // If metadata file exists in the event/bootstrap dump path, then use it to decide if we need to + // force migrate to external table. + // This method can be used by REPL LOAD event message handlers to read forceMigrateToExternalTable + // flag from metadata file. + public static boolean forceMigrateToExternalTable(HiveConf conf, String dumpMetadataDir) + throws IOException, SemanticException { + FileSystem fs = FileSystem.get(new Path(dumpMetadataDir).toUri(), conf); + Path metadataFile = new Path(dumpMetadataDir, EximUtil.METADATA_NAME); + if (fs.exists(metadataFile)) { + MetaData metaData = + EximUtil.readMetaData(fs, new Path(dumpMetadataDir, EximUtil.METADATA_NAME)); + ReplicationSpec replicationSpec = metaData.getReplicationSpec(); + return replicationSpec.forceMigrateToExternalTable(); + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 76c69cf24b..3e735b6543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -301,10 +301,10 @@ public static void createExportDump(FileSystem fs, Path metadataPath, Table tabl } try (JsonWriter writer = new JsonWriter(fs, metadataPath)) { + new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); if (replicationSpec.isInReplicationScope()) { new ReplicationSpecSerializer().writeTo(writer, replicationSpec); } - new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 07b40c996f..64f35691f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -201,16 +201,18 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } } - private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table tableObj, MetaData rv, - EximUtil.SemanticAnalyzerWrapperContext x) + private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table tableObj, + EximUtil.SemanticAnalyzerWrapperContext x, + boolean forceMigrateToExternalTable) throws IOException, TException, HiveException { - x.getLOG().debug("Converting table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + - " with para " + tableObj.getParameters()); - //TODO : isPathOwnedByHive is hard coded to true, need to get it from repl dump metadata. + x.getLOG().debug("Converting table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + + " with para " + tableObj.getParameters() + + " forceMigrateToExternalTable " + forceMigrateToExternalTable); + TableType tableType = TableType.valueOf(tableObj.getTableType()); HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, tableType, - null, x.getConf(), x.getHive().getMSC(), true); + null, x.getConf(), x.getHive().getMSC(), forceMigrateToExternalTable); HiveStrictManagedMigration.migrateTable(tableObj, tableType, migrationOption, false, getHiveUpdater(x.getConf()), x.getHive().getMSC(), x.getConf()); x.getLOG().debug("Converted table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + @@ -282,8 +284,7 @@ public static boolean prepareImport(boolean isImportCmd, if (!TxnUtils.isTransactionalTable(tblObj) && replicationSpec.isInReplicationScope() && x.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && (TableType.valueOf(tblObj.getTableType()) == TableType.MANAGED_TABLE)) { - //TODO : dump metadata should be read to make sure that migration is required. - upgradeTableDesc(tblObj, rv, x); + upgradeTableDesc(tblObj, x, replicationSpec.forceMigrateToExternalTable()); //if the conversion is from non transactional to transactional table if (TxnUtils.isTransactionalTable(tblObj)) { replicationSpec.setMigratingToTxnTable(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 48213d1b4e..17727f799e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -19,11 +19,11 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; -import java.text.Collator; import java.util.Map; /** @@ -46,6 +46,8 @@ private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables. //TxnIds snapshot private String validTxnList = null; + private boolean forceMigrateToExternalTable = false; + private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT private boolean isMigratingToTxnTable = false; private boolean isMigratingToExternalTable = false; @@ -60,7 +62,8 @@ LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), VALID_WRITEID_LIST("repl.valid.writeid.list"), - VALID_TXN_LIST("repl.valid.txnid.list") + VALID_TXN_LIST("repl.valid.txnid.list"), + FORCE_MIGRATE_TO_EXTERNAL_TABLE("repl.force.migrate.to.external.table") ; private final String keyName; @@ -128,6 +131,7 @@ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, this.isLazy = isLazy; this.isReplace = isReplace; this.specType = Type.DEFAULT; + this.forceMigrateToExternalTable = false; } public ReplicationSpec(Function keyFetcher) { @@ -150,6 +154,8 @@ public ReplicationSpec(Function keyFetcher) { this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString()); + this.forceMigrateToExternalTable + = Boolean.parseBoolean(keyFetcher.apply(KEY.FORCE_MIGRATE_TO_EXTERNAL_TABLE.toString())); } /** @@ -357,6 +363,23 @@ public void setValidTxnList(String validTxnList) { this.validTxnList = validTxnList; } + public boolean forceMigrateToExternalTable() { + return forceMigrateToExternalTable; + } + + public void setForceMigrateToExternalTable(boolean forceMigrateToExternalTable) { + this.forceMigrateToExternalTable = forceMigrateToExternalTable; + } + + public void setForceMigrateToExternalTable(HiveConf conf, String user) { + // If the source cluster is already enabled for strict managed tables, then force migration is + // not applicable. + if (conf.getBoolean(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES.varname, false)) { + return; + } + String ownerName = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRATION_OWNER.varname, "hive"); + setForceMigrateToExternalTable((user != null) && !ownerName.equals(user)); + } /** * @return whether the current replication dumped object related to ACID/Mm table @@ -390,6 +413,8 @@ public String get(KEY key) { return getValidWriteIdList(); case VALID_TXN_LIST: return getValidTxnList(); + case FORCE_MIGRATE_TO_EXTERNAL_TABLE: + return String.valueOf(forceMigrateToExternalTable()); } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index c2e26f0710..717801e053 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -139,6 +139,8 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio replicationSpec, conf); logger.debug("_metadata file written into " + paths.metaDataExportFile().toString()); + } catch (FileNotFoundException e) { + throw new SemanticException(ErrorMsg.FILE_NOT_FOUND.format(e.getMessage()), e); } catch (Exception e) { // the path used above should not be used on a second try as each dump request is written to a unique location. // however if we want to keep the dump location clean we might want to delete the paths diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 0756f59a81..7de6a5e99f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -77,6 +77,8 @@ public void handle(Context withinContext) throws Exception { return; } + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, apm.getLocOwner()); + Iterable qlPtns = StreamSupport.stream(ptns.spliterator(), true).map( input -> { if (input == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index e59bdf67f8..9c7f4ea996 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -104,19 +104,23 @@ public void handle(Context withinContext) throws Exception { return; } + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); - List partitions = new ArrayList<>(); - partitions.add(new Partition(qlMdTable, after)); - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTable, - partitions, - withinContext.replicationSpec, - withinContext.hiveConf); } + + List partitions = new ArrayList<>(); + partitions.add(new Partition(qlMdTable, after)); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + partitions, + withinContext.replicationSpec, + withinContext.hiveConf); + DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4deb551617..cccb5a4bab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -93,25 +93,28 @@ public void handle(Context withinContext) throws Exception { if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); - Table qlMdTableAfter = new Table(after); - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - - // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics - // as well, since that won't be accurate. So reset them to what they would look like for an - // empty table. - if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { - qlMdTableAfter.setStatsStateLikeNewTable(); - } + } + + Table qlMdTableAfter = new Table(after); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTableAfter, - null, - withinContext.replicationSpec, - withinContext.hiveConf); + // If we are not dumping metadata about a table, we shouldn't be dumping basic statistics + // as well, since that won't be accurate. So reset them to what they would look like for an + // empty table. + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + qlMdTableAfter.setStatsStateLikeNewTable(); } - + + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); + + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec, + withinContext.hiveConf); + DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 8a838db508..dc35618b7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -69,6 +69,8 @@ public void handle(Context withinContext) throws Exception { qlMdTable.setStatsStateLikeNewTable(); } + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( metaDataPath.getFileSystem(withinContext.hiveConf), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 1bcd52923b..8da7862c3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -73,6 +73,7 @@ public void handle(Context withinContext) throws Exception { // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation withinContext.replicationSpec.setIsReplace(eventMessage.isReplace()); + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index 54fc7a66be..490175d7d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -57,6 +59,18 @@ public void handle(Context withinContext) throws Exception { return; } + Table qlMdTable = new Table(tableObj); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); + + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + null, + withinContext.replicationSpec, + withinContext.hiveConf); + DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java index 62db959a2e..2f1c25bf87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -46,6 +48,17 @@ public void handle(Context withinContext) throws Exception { return; } + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + withinContext.replicationSpec.setForceMigrateToExternalTable(withinContext.hiveConf, eventMessage.getLocOwner()); + + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + null, + withinContext.replicationSpec, + withinContext.hiveConf); + DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index ecd4c84651..7374452248 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -65,10 +65,4 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } } - - private boolean isPartitionExternal() { - Map params = partition.getParameters(); - return params.containsKey("EXTERNAL") - && params.get("EXTERNAL").equalsIgnoreCase("TRUE"); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 552183af5b..a92d923922 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -70,7 +73,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi } } - private Table updatePropertiesInTable(Table table, ReplicationSpec additionalPropertiesProvider) { + private Table updatePropertiesInTable(Table table, ReplicationSpec additionalPropertiesProvider) + throws IOException { // Remove all the entries from the parameters which are added by repl tasks internally. Map parameters = table.getParameters(); if (parameters != null) { @@ -95,6 +99,13 @@ private Table updatePropertiesInTable(Table table, ReplicationSpec additionalPro // uncomment this else section, but currently unneeded. Will require a lot of golden file // regen if we do so. } + + // For managed table, need to check if need to force migrate to external table based on location + // ownership. + if (tableHandle.getTableType() == TableType.MANAGED_TABLE) { + checkAndSetForceMigrateToExternalTable(additionalPropertiesProvider, + new Path(table.getSd().getLocation())); + } return table; } @@ -103,10 +114,57 @@ private void writePartitions(JsonWriter writer, ReplicationSpec additionalProper writer.jsonGenerator.writeStartArray(); if (partitions != null) { for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { + // For managed table, need to check if need to force migrate to external table based on location + // ownership. + if (tableHandle.getTableType() == TableType.MANAGED_TABLE) { + checkAndSetForceMigrateToExternalTable(additionalPropertiesProvider, partition.getDataLocation()); + } new PartitionSerializer(partition.getTPartition()) .writeTo(writer, additionalPropertiesProvider); } } writer.jsonGenerator.writeEndArray(); } + + private void checkAndSetForceMigrateToExternalTable(ReplicationSpec replicationSpec, Path dataLocation) + throws IOException { + // If the source cluster is already enabled for strict managed tables, then migration rules are + // not applicable. + if (hiveConf.getBoolean(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES.varname, false)) { + return; + } + + // Conversion rules are applicable only in replication flow. Export flow shouldn't do this check. + if ((replicationSpec == null) || !replicationSpec.isInReplicationScope()) { + return; + } + + // In case of incremental dump, the ownership check should be done against location owner information + // stored in the event. Shouldn't check it from actual file system as location might be deleted. + if (replicationSpec.getReplSpecType() == ReplicationSpec.Type.INCREMENTAL_DUMP) { + return; + } + + // If the table is already marked for force migrate to external table, then it shouldn't be changed. + // Even if table or even one partition is not owned by strict managed table's owner, then we should + // force migrate to external table. So, we shouldn't overwrite this flag if it was already true. + if (replicationSpec.forceMigrateToExternalTable()) { + return; + } + + try { + FileStatus fileStatus = dataLocation.getFileSystem(hiveConf).getFileStatus(dataLocation); + String hiveOwner = hiveConf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRATION_OWNER.varname, "hive"); + + // If the location owner is not same as the config, then the table should be converted to external + // table if replicated to a cluster with strict managed enabled. + replicationSpec.setForceMigrateToExternalTable(!hiveOwner.equals(fileStatus.getOwner())); + LOG.debug("Owner of path " + dataLocation + " is " + fileStatus.getOwner() + + ", " + HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRATION_OWNER.varname + " = " + hiveOwner + + ", forceMigrateToExternalTable = " + replicationSpec.forceMigrateToExternalTable()); + } catch (IOException e) { + LOG.error("Failed to get location owner ", e); + throw e; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 9c66210e70..f0c7736d91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -56,7 +56,10 @@ oldPartSpec.put(fs.getName(), beforeIterator.next()); newPartSpec.put(fs.getName(), afterIterator.next()); } - if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj)) { + + boolean forceMigrateToExternalTable + = ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location); + if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj, forceMigrateToExternalTable)) { replicationSpec.setMigratingToTxnTable(); } @@ -69,7 +72,7 @@ renamePtnTask.getId(), oldPartSpec, newPartSpec); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec); return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName, - context.hiveConf, updatedMetadata, renamePtnTask, tableObj); + context.hiveConf, updatedMetadata, renamePtnTask, tableObj, forceMigrateToExternalTable); } catch (Exception e) { throw (e instanceof SemanticException) ? (SemanticException) e diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 53d998200c..7609ee3882 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -63,7 +63,9 @@ String oldName = StatsUtils.getFullyQualifiedTableName(oldDbName, tableObjBefore.getTableName()); String newName = StatsUtils.getFullyQualifiedTableName(newDbName, tableObjAfter.getTableName()); ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec(); - if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter)) { + boolean forceMigrateToExternalTable + = ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location); + if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter, forceMigrateToExternalTable)) { replicationSpec.setMigratingToTxnTable(); } AlterTableDesc renameTableDesc = new AlterTableDesc( @@ -82,7 +84,7 @@ // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above // if so. If that should ever change, this will need reworking. return ReplUtils.addOpenTxnTaskForMigration(oldDbName, tableObjBefore.getTableName(), - context.hiveConf, updatedMetadata, renameTableTask, tableObjAfter); + context.hiveConf, updatedMetadata, renameTableTask, tableObjAfter, forceMigrateToExternalTable); } catch (Exception e) { throw (e instanceof SemanticException) ? (SemanticException) e diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 05a9f9123f..15746d7e41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -66,8 +66,10 @@ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); try { + // User firing the repl load command sets the config for hive user to be used. return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName, - context.hiveConf, updatedMetadata, truncatePtnTask, tblObj); + context.hiveConf, updatedMetadata, truncatePtnTask, tblObj, + ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location)); } catch (Exception e) { throw new SemanticException(e.getMessage()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 5ef66fafa4..9031080b02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -47,8 +47,10 @@ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); try { + // User firing the repl load command sets the config for hive user to be used. return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName, - context.hiveConf, updatedMetadata, truncateTableTask, msg.getTableObjBefore()); + context.hiveConf, updatedMetadata, truncateTableTask, msg.getTableObjBefore(), + ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location)); } catch (Exception e) { throw new SemanticException(e.getMessage()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java index cb85f7db45..926d4e6751 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java @@ -52,7 +52,8 @@ try { return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata, - upcsm.getTableObject(), upcsm.getWriteId()); + upcsm.getTableObject(), upcsm.getWriteId(), + ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location)); } catch(Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java index 371429e645..90e6c9e434 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java @@ -52,7 +52,8 @@ try { return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata, - utcsm.getTableObject(), utcsm.getWriteId()); + utcsm.getTableObject(), utcsm.getWriteId(), + ReplUtils.forceMigrateToExternalTable(context.hiveConf, context.location)); } catch(Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java index 80025b7046..137cb586c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java @@ -367,7 +367,7 @@ void checkOldWarehouseRoot() throws IOException, MetaException { void checkOwnerPermsOptions() { if (runOptions.shouldModifyManagedTableOwner) { - ownerName = conf.get("strict.managed.tables.migration.owner", "hive"); + ownerName = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRATION_OWNER.varname, "hive"); groupName = conf.get("strict.managed.tables.migration.group", null); } if (runOptions.shouldModifyManagedTablePermissions) { @@ -474,7 +474,7 @@ void processTable(Database dbObj, String tableName, boolean modifyDefaultManaged TableMigrationOption migrationOption = runOptions.migrationOption; if (migrationOption == TableMigrationOption.AUTOMATIC) { - migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, null); + migrationOption = determineMigrationTypeAutomatically(tableObj, tableType, ownerName, conf, hms, false); } failedValidationChecks = migrateTable(tableObj, tableType, migrationOption, runOptions.dryRun, @@ -671,10 +671,14 @@ static void renameFilesToConformToAcid(Table tableObj, IMetaStoreClient hms, Con } } - public static TableMigrationOption determineMigrationTypeAutomatically(Table tableObj, TableType tableType, - String ownerName, Configuration conf, IMetaStoreClient hms, Boolean isPathOwnedByHive) - throws IOException, MetaException, TException { - TableMigrationOption result = TableMigrationOption.NONE; + public static TableMigrationOption determineMigrationTypeAutomatically(Table tableObj, + TableType tableType, + String ownerName, + Configuration conf, + IMetaStoreClient hms, + boolean forceMigrateToExternalTbl) + throws IOException, TException { + TableMigrationOption result; String msg; switch (tableType) { case MANAGED_TABLE: @@ -682,7 +686,7 @@ public static TableMigrationOption determineMigrationTypeAutomatically(Table tab // Always keep transactional tables as managed tables. result = TableMigrationOption.MANAGED; } else { - String reason = shouldTableBeExternal(tableObj, ownerName, conf, hms, isPathOwnedByHive); + String reason = shouldTableBeExternal(tableObj, ownerName, conf, hms, forceMigrateToExternalTbl); if (reason != null) { LOG.debug("Converting {} to external table. {}", getQualifiedName(tableObj), reason); result = TableMigrationOption.EXTERNAL; @@ -840,8 +844,13 @@ static boolean migrateToManagedTable(Table tableObj, TableType tableType, boolea } static String shouldTableBeExternal(Table tableObj, String ownerName, Configuration conf, - IMetaStoreClient hms, Boolean isPathOwnedByHive) - throws IOException, MetaException, TException { + IMetaStoreClient hms, boolean forceMigrateToExternalTbl) + throws IOException, TException { + if (forceMigrateToExternalTbl) { + // For replication flow, the path ownership must be verified at source cluster itself. + return String.format("Source cluster requested to forcefully convert to external table."); + } + if (MetaStoreUtils.isNonNativeTable(tableObj)) { return "Table is a non-native (StorageHandler) table"; } @@ -852,15 +861,12 @@ static String shouldTableBeExternal(Table tableObj, String ownerName, Configurat if (HiveStrictManagedUtils.isListBucketedTable(tableObj)) { return "Table is a list bucketed table"; } + // If any table/partition directory is not owned by hive, // then assume table is using storage-based auth - set external. // Transactional tables should still remain transactional, // but we should have already checked for that before this point. - if (isPathOwnedByHive != null) { - // for replication flow, the path ownership must be verified at source cluster itself. - return isPathOwnedByHive ? null : - String.format("One or more table directories is not owned by hive or non-HDFS path at source cluster"); - } else if (shouldTablePathBeExternal(tableObj, ownerName, conf, hms)) { + if ((ownerName != null) && shouldTablePathBeExternal(tableObj, ownerName, conf, hms)) { return String.format("One or more table directories not owned by %s, or non-HDFS path", ownerName); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index 155ecb18bf..686f4cf6e5 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -594,4 +594,14 @@ void findNext() throws IOException { nextFile = null; } } + + public static String getLocationOwner(String location, Configuration conf) { + try { + Path path = new Path(location); + return path.getFileSystem(conf).getFileStatus(path).getOwner(); + } catch (Exception e) { + // If the path is removed or renamed by this time, just return null. + return null; + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index f1983c510d..cbc096337f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3936,7 +3936,7 @@ private int add_partitions_pspec_core(RawStore ms, String catName, String dbName if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ADD_PARTITION, - new AddPartitionEvent(tbl, partitionSpecProxy, true, this), + new AddPartitionEvent(tbl, partitionSpecProxy, success, this), null, transactionalListenerResponses, ms); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java index d4542d7a5b..579de22f46 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java @@ -21,9 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import java.util.Arrays; import java.util.Iterator; @@ -36,17 +38,30 @@ private final Table table; private final List partitions; private PartitionSpecProxy partitionSpecProxy; + private final String locOwner; - public AddPartitionEvent(Table table, List partitions, boolean status, + public AddPartitionEvent(Table table, List partitions, PartitionSpecProxy partitionSpec, boolean status, IHMSHandler handler) { super(status, handler); this.table = table; this.partitions = partitions; - this.partitionSpecProxy = null; + this.partitionSpecProxy = partitionSpec; + + // The table location owner is same as partition location owner if the database is source of replication. + if (status && (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(table.getTableType()))) { + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } + } + + public AddPartitionEvent(Table table, List partitions, boolean status, + IHMSHandler handler) { + this(table, partitions, null, status, handler); } public AddPartitionEvent(Table table, Partition partition, boolean status, IHMSHandler handler) { - this(table, Arrays.asList(partition), status, handler); + this(table, Arrays.asList(partition), null, status, handler); } /** @@ -54,10 +69,7 @@ public AddPartitionEvent(Table table, Partition partition, boolean status, IHMSH */ public AddPartitionEvent(Table table, PartitionSpecProxy partitionSpec, boolean status, IHMSHandler handler) { - super(status, handler); - this.table = table; - this.partitions = null; - this.partitionSpecProxy = partitionSpec; + this(table, null, partitionSpec, status, handler); } /** @@ -81,4 +93,7 @@ public Table getTable() { } } + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java index 499c6e4f31..f355a73548 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java @@ -21,8 +21,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable @@ -32,7 +34,8 @@ private final Partition newPart; private final Table table; private final boolean isTruncateOp; - private Long writeId; + private final Long writeId; + private final String locOwner; public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp, boolean status, Long writeId, IHMSHandler handler) { @@ -42,6 +45,13 @@ public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, bo this.table = table; this.isTruncateOp = isTruncateOp; this.writeId = writeId; + + // The table location owner is same as partition location owner if the database is source of replication. + if (status && (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(table.getTableType()))) { + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -78,4 +88,8 @@ public boolean getIsTruncateOp() { public Long getWriteId() { return writeId; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java index 541fbe48ac..667a9ebf6d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java @@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable @@ -31,7 +33,8 @@ private final Table newTable; private final Table oldTable; private final boolean isTruncateOp; - private Long writeId; + private final Long writeId; + private final String locOwner; public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, Long writeId, IHMSHandler handler) { @@ -40,6 +43,14 @@ public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, bo this.newTable = newTable; this.isTruncateOp = isTruncateOp; this.writeId = writeId; + + // It is assumed that table type conversion from managed to external or external to managed + // is not allowed in case of table is enabled for replication. + if (status && (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(newTable.getTableType()))) { + locOwner = FileUtils.getLocationOwner(newTable.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -66,4 +77,8 @@ public boolean getIsTruncateOp() { public Long getWriteId() { return writeId; } + + public String getLocOwner() { + return locOwner; + } } \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java index 4f5e887ac2..0ad4f7a13b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java @@ -21,17 +21,25 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable public class CreateTableEvent extends ListenerEvent { private final Table table; + private final String locOwner; public CreateTableEvent (Table table, boolean status, IHMSHandler handler) { super (status, handler); this.table = table; + if (status && (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(table.getTableType()))) { + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -40,4 +48,8 @@ public CreateTableEvent (Table table, boolean status, IHMSHandler handler) { public Table getTable () { return table; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index 60ad7db60e..05de0720bb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ClientCapabilities; import org.apache.hadoop.hive.metastore.api.ClientCapability; import org.apache.hadoop.hive.metastore.api.GetTableRequest; @@ -32,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.thrift.TException; @@ -46,6 +48,7 @@ private final boolean replace; private final List files; private List fileChecksums = new ArrayList<>(); + private final String locOwner; /** * @@ -89,6 +92,13 @@ public InsertEvent(String catName, String db, String table, List partVal if (insertData.isSetFilesAddedChecksum()) { fileChecksums = insertData.getFilesAddedChecksum(); } + + // The table location owner is same as partition location owner if the database is source of replication. + if (status && (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(tableObj.getTableType()))) { + locOwner = FileUtils.getLocationOwner(tableObj.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -129,4 +139,8 @@ public boolean isReplace() { public List getFileChecksums() { return fileChecksums; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java index ba61a08173..68e767d013 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java @@ -21,8 +21,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ private Map parameters; private List partVals; private Table tableObj; + private final String locOwner; /** * @param statsObj Columns statistics Info. @@ -56,6 +59,11 @@ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List pa this.parameters = parameters; this.partVals = partVals; this.tableObj = tableObj; + if (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(tableObj.getTableType())) { + locOwner = FileUtils.getLocationOwner(tableObj.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -65,12 +73,7 @@ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List pa */ public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List partVals, Table tableObj, IHMSHandler handler) { - super(true, handler); - this.partColStats = statsObj; - this.partVals = partVals; - this.writeId = 0; - this.parameters = null; - this.tableObj = tableObj; + this(statsObj, partVals, null, tableObj, 0, handler); } public ColumnStatistics getPartColStats() { @@ -89,5 +92,11 @@ public long getWriteId() { return partVals; } - public Table getTableObj() { return tableObj; } + public Table getTableObj() { + return tableObj; + } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java index 71300abf4e..4d9d426386 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java @@ -21,10 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; -import java.util.List; import java.util.Map; /** @@ -38,6 +39,7 @@ private long writeId; private Map parameters; private Table tableObj; + private final String locOwner; /** * @param colStats Columns statistics Info. @@ -54,6 +56,11 @@ public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj, this.writeId = writeId; this.parameters = parameters; this.tableObj = tableObj; + if (TableType.MANAGED_TABLE.toString().equalsIgnoreCase(tableObj.getTableType())) { + locOwner = FileUtils.getLocationOwner(tableObj.getSd().getLocation(), handler.getConf()); + } else { + locOwner = null; + } } /** @@ -66,6 +73,7 @@ public UpdateTableColumnStatEvent(ColumnStatistics colStats, IHMSHandler handler this.writeId = 0; this.parameters = null; this.tableObj = null; + this.locOwner = null; } public ColumnStatistics getColStats() { @@ -83,4 +91,8 @@ public long getWriteId() { public Table getTableObj() { return tableObj; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java index 3262b52bea..42fdceed3e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java @@ -65,4 +65,5 @@ public EventMessage checkValid() { */ public abstract Iterable getPartitionFilesIter(); + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java index a1ba01a36f..ea8c9aa3a2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -67,5 +67,7 @@ public EventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java index bbc01c1a5e..c5037665fa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -57,4 +57,6 @@ public EventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java index 49732ff97d..4b9f9dd408 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java @@ -37,6 +37,8 @@ protected CreateTableMessage() { public abstract Table getTableObj() throws Exception; + public abstract String getLocOwner(); + /** * Get list of files created as a result of this DML operation * diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index c470097165..a824324a14 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -72,4 +72,6 @@ public EventMessage checkValid() { throw new IllegalStateException("Table name unset."); return super.checkValid(); } + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java index aa83da4ed5..2ceccd338f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -178,14 +178,14 @@ public DropDatabaseMessage buildDropDatabaseMessage(Database db) { return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); } - public CreateTableMessage buildCreateTableMessage(Table table, Iterator fileIter) { - return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now()); + public CreateTableMessage buildCreateTableMessage(Table table, Iterator fileIter, String locOwner) { + return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, locOwner, now()); } public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, - Long writeId) { + Long writeId, String locOwner) { return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, - isTruncateOp, writeId, now()); + isTruncateOp, writeId, locOwner, now()); } public DropTableMessage buildDropTableMessage(Table table) { @@ -193,15 +193,15 @@ public DropTableMessage buildDropTableMessage(Table table) { } public AddPartitionMessage buildAddPartitionMessage(Table table, - Iterator partitionsIterator, Iterator partitionFileIter) { + Iterator partitionsIterator, Iterator partitionFileIter, String locOwner) { return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, - partitionsIterator, partitionFileIter, now()); + partitionsIterator, partitionFileIter, locOwner, now()); } public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp, Long writeId) { + Partition after, boolean isTruncateOp, Long writeId, String locOwner) { return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - table, before, after, isTruncateOp, writeId, now()); + table, before, after, isTruncateOp, writeId, locOwner, now()); } public DropPartitionMessage buildDropPartitionMessage(Table table, @@ -219,9 +219,9 @@ public DropFunctionMessage buildDropFunctionMessage(Function fn) { } public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, - boolean replace, Iterator fileIter) { + boolean replace, Iterator fileIter, String locOwner) { return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - tableObj, partObj, replace, fileIter, now()); + tableObj, partObj, replace, fileIter, locOwner, now()); } public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List pks) { @@ -289,9 +289,10 @@ public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, public JSONUpdateTableColumnStatMessage buildUpdateTableColumnStatMessage(ColumnStatistics colStats, Table tableObj, Map parameters, - long writeId) { + long writeId, + String locOwner) { return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), - colStats, tableObj, parameters, writeId); + colStats, tableObj, parameters, writeId, locOwner); } public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String dbName, String colName) { @@ -300,9 +301,9 @@ public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String public JSONUpdatePartitionColumnStatMessage buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, long writeId) { + Table tableObj, long writeId, String locOwner) { return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), colStats, partVals, - parameters, tableObj, writeId); + parameters, tableObj, writeId, locOwner); } public JSONDeletePartitionColumnStatMessage buildDeletePartitionColumnStatMessage(String dbName, String colName, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java index e92a0dc9a3..35d79cfd46 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java @@ -41,4 +41,6 @@ protected UpdatePartitionColumnStatMessage() { public abstract List getPartVals(); public abstract Table getTableObject() throws Exception; + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java index e3f049c48c..4e5b30e609 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java @@ -38,4 +38,6 @@ protected UpdateTableColumnStatMessage() { public abstract Map getParameters(); public abstract Table getTableObject() throws Exception; + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java index 6494cb8dc7..1288a3105c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -44,19 +44,19 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + private String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty - Long timestamp; + private Long timestamp; @JsonProperty - List> partitions; + private List> partitions; @JsonProperty - List partitionListJson; + private List partitionListJson; @JsonProperty - List partitionFiles; + private List partitionFiles; /** * Default Constructor. Required for Jackson. @@ -68,7 +68,7 @@ public JSONAddPartitionMessage() { * Note that we get an Iterator rather than an Iterable here: so we can only walk thru the list once */ public JSONAddPartitionMessage(String server, String servicePrincipal, Table tableObj, - Iterator partitionsIterator, Iterator partitionFileIter, + Iterator partitionsIterator, Iterator partitionFileIter, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -91,6 +91,7 @@ public JSONAddPartitionMessage(String server, String servicePrincipal, Table tab } this.partitionFiles = (partitionFileIter != null) ? Lists.newArrayList(partitionFileIter) : Lists.newArrayList(); + this.locOwner = locOwner; checkValid(); } @@ -160,6 +161,11 @@ public String getTableObjJson() { return partitionListJson; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index 414402f705..8acc73a10f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -34,19 +34,19 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + private String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty - String isTruncateOp; + private String isTruncateOp; @JsonProperty - Long timestamp, writeId; + private Long timestamp, writeId; @JsonProperty - Map keyValues; + private Map keyValues; @JsonProperty - String partitionObjBeforeJson, partitionObjAfterJson; + private String partitionObjBeforeJson, partitionObjAfterJson; /** * Default constructor, needed for Jackson. @@ -55,7 +55,8 @@ public JSONAlterPartitionMessage() { } public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, - Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long writeId, Long timestamp) { + Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObj.getDbName(); @@ -65,6 +66,7 @@ public JSONAlterPartitionMessage(String server, String servicePrincipal, Table t this.timestamp = timestamp; this.keyValues = MessageBuilder.getPartitionKeyValues(tableObj, partitionObjBefore); this.writeId = writeId; + this.locOwner = locOwner; try { this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); this.partitionObjBeforeJson = MessageBuilder.createPartitionObjJson(partitionObjBefore); @@ -149,6 +151,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index 8c621b2701..90a9342a48 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -31,13 +31,13 @@ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjBeforeJson, tableObjAfterJson; + private String server, servicePrincipal, db, table, tableType, tableObjBeforeJson, tableObjAfterJson, locOwner; @JsonProperty - String isTruncateOp; + private String isTruncateOp; @JsonProperty - Long timestamp, writeId; + private Long timestamp, writeId; /** * Default constructor, needed for Jackson. @@ -46,7 +46,7 @@ public JSONAlterTableMessage() { } public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter, - boolean isTruncateOp, Long writeId, Long timestamp) { + boolean isTruncateOp, Long writeId, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObjBefore.getDbName(); @@ -61,6 +61,7 @@ public JSONAlterTableMessage(String server, String servicePrincipal, Table table } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } + this.locOwner = locOwner; checkValid(); } @@ -124,6 +125,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java index 145ee4b199..fa6632d31c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -36,11 +36,11 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + private String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty - Long timestamp; + private Long timestamp; @JsonProperty - List files; + private List files; /** * Default constructor, needed for Jackson. @@ -49,25 +49,26 @@ public JSONCreateTableMessage() { } public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, - String tableType, Long timestamp) { + String tableType, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.tableType = tableType; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, timestamp); + this(server, servicePrincipal, db, table, null, locOwner, timestamp); } public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, - Iterator fileIter, Long timestamp) { + Iterator fileIter, String locOwner, Long timestamp) { this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), - tableObj.getTableType(), timestamp); + tableObj.getTableType(), locOwner, timestamp); try { this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); } catch (TException e) { @@ -119,6 +120,11 @@ public String getTableObjJson() { return tableObjJson; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index 40d480b7e3..dc439d6bc9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -37,16 +37,16 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson, ptnObjJson; + private String server, servicePrincipal, db, table, tableType, tableObjJson, ptnObjJson, locOwner; @JsonProperty - Long timestamp; + private Long timestamp; @JsonProperty - String replace; + private String replace; @JsonProperty - List files; + private List files; /** * Default constructor, needed for Jackson. @@ -55,7 +55,7 @@ public JSONInsertMessage() { } public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, Partition ptnObj, - boolean replace, Iterator fileIter, Long timestamp) { + boolean replace, Iterator fileIter, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -81,6 +81,7 @@ public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, this.timestamp = timestamp; this.replace = Boolean.toString(replace); this.files = Lists.newArrayList(fileIter); + this.locOwner = locOwner; checkValid(); } @@ -137,6 +138,11 @@ public Partition getPtnObj() throws Exception { return ((null == ptnObjJson) ? null : (Partition) MessageBuilder.getTObj(ptnObjJson, Partition.class)); } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java index fd7fe00419..64bbb80147 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java @@ -38,7 +38,7 @@ private Long writeId, timestamp; @JsonProperty - private String server, servicePrincipal, database; + private String server, servicePrincipal, database, locOwner; @JsonProperty private String colStatsJson; @@ -61,13 +61,15 @@ public JSONUpdatePartitionColumnStatMessage() { public JSONUpdatePartitionColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, List partVals, Map parameters, - Table tableObj, long writeId) { + Table tableObj, long writeId, + String locOwner) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; this.writeId = writeId; this.database = colStats.getStatsDesc().getDbName(); this.partVals = partVals; + this.locOwner = locOwner; try { this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats); this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); @@ -126,6 +128,11 @@ public Table getTableObject() throws Exception { return (Table) MessageBuilder.getTObj(tableObjJson, Table.class); } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java index 275d204957..b97c1eb53c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java @@ -36,13 +36,13 @@ private Long writeId, timestamp; @JsonProperty - private String server, servicePrincipal, database; + private String server, servicePrincipal, database, locOwner; @JsonProperty private String colStatsJson; @JsonProperty - Map parameters; + private Map parameters; @JsonProperty private String tableObjJson; @@ -55,11 +55,12 @@ public JSONUpdateTableColumnStatMessage() { public JSONUpdateTableColumnStatMessage(String server, String servicePrincipal, Long timestamp, ColumnStatistics colStats, Table tableObj, Map parameters, - long writeId) { + long writeId, String locOwner) { this.timestamp = timestamp; this.server = server; this.servicePrincipal = servicePrincipal; this.writeId = writeId; + this.locOwner = locOwner; this.database = colStats.getStatsDesc().getDbName(); try { this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats); @@ -114,6 +115,11 @@ public Long getWriteId() { return parameters; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java index efbcfb24c2..cb64f31603 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java @@ -106,10 +106,10 @@ public static int startMetaStoreWithRetry(Configuration conf) throws Exception { return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf); } - public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri) + public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath) throws Exception { return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf, - keepJdbcUri); + keepJdbcUri, keepWarehousePath); } public static int startMetaStoreWithRetry() throws Exception { @@ -119,7 +119,7 @@ public static int startMetaStoreWithRetry() throws Exception { public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, Configuration conf) throws Exception { - return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false); + return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false); } /** @@ -130,20 +130,24 @@ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, * @param bridge The Thrift bridge to uses * @param conf The configuration to use * @param keepJdbcUri If set to true, then the JDBC url is not changed + * @param keepWarehousePath If set to true, then the Warehouse directory is not changed * @return The port on which the MetaStore finally started * @throws Exception */ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, - Configuration conf, boolean keepJdbcUri) throws Exception { + Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath) throws Exception { Exception metaStoreException = null; String warehouseDir = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE); for (int tryCount = 0; tryCount < MetaStoreTestUtils.RETRY_COUNT; tryCount++) { try { int metaStorePort = findFreePort(); - // Setting metastore instance specific warehouse directory, postfixing with port - Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort)); - MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, postfixedWarehouseDir.toString()); + if (!keepWarehousePath) { + // Setting metastore instance specific warehouse directory, postfixing with port + Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort)); + warehouseDir = postfixedWarehouseDir.toString(); + MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, warehouseDir); + } String jdbcUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY); if (!keepJdbcUri) { @@ -167,11 +171,11 @@ public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge, fs.mkdirs(wh.getWhRoot()); fs.setPermission(wh.getWhRoot(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); - LOG.info("MetaStore warehouse root dir ({}) is created", postfixedWarehouseDir); + LOG.info("MetaStore warehouse root dir ({}) is created", warehouseDir); } LOG.info("MetaStore Thrift Server started on port: {} with warehouse dir: {} with " + - "jdbcUrl: {}", metaStorePort, postfixedWarehouseDir, jdbcUrl); + "jdbcUrl: {}", metaStorePort, warehouseDir, jdbcUrl); return metaStorePort; } catch (ConnectException ce) { metaStoreException = ce;