diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa27a01174..23e441f667 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -515,6 +515,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This is the base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " + "external table path on target cluster."), + STRICT_MANAGED_TABLES_MIGRARTION_OWNER("strict.managed.tables.migration.owner", "hive", + "This is used by upgrade tool to check if a managed table should be converted to external table. If the " + + "owner of the table location is not same as this config value, then the table is converted to an " + + "external table. The same is used during replication from a cluster with strict managed table set to" + + "false to a target cluster with strict managed table set to true."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index e6113942dd..666405a5ab 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -206,7 +206,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { FileIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new FileIterator(t.getSd().getLocation()); CreateTableMessage msg = - MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter); + MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter, tableEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgEncoder.getSerializer().serialize(msg)); @@ -243,7 +243,7 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table after = tableEvent.getNewTable(); AlterTableMessage msg = MessageBuilder.getInstance() .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), - tableEvent.getWriteId()); + tableEvent.getWriteId(), tableEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgEncoder.getSerializer().serialize(msg) @@ -361,7 +361,7 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio PartitionFilesIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t); EventMessage msg = MessageBuilder.getInstance() - .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter); + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter, partitionEvent.getLocOwner()); MessageSerializer serializer = msgEncoder.getSerializer(); NotificationEvent event = new NotificationEvent(0, now(), @@ -401,7 +401,7 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce AlterPartitionMessage msg = MessageBuilder.getInstance() .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(), - partitionEvent.getWriteId()); + partitionEvent.getWriteId(), partitionEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgEncoder.getSerializer().serialize(msg)); @@ -540,7 +540,7 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { Table tableObj = insertEvent.getTableObj(); InsertMessage msg = MessageBuilder.getInstance().buildInsertMessage(tableObj, insertEvent.getPartitionObj(), insertEvent.isReplace(), - new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())); + new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums()), insertEvent.getLocOwner()); NotificationEvent event = new NotificationEvent(0, now(), EventType.INSERT.toString(), msgEncoder.getSerializer().serialize(msg)); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index efafe0c641..f2c7b3731c 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -120,7 +120,8 @@ public void onAddPartition(AddPartitionEvent partitionEvent) Table table = partitionEvent.getTable(); String topicName = getTopicName(table); if (topicName != null && !topicName.equals("")) { - send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator()), topicName); + send(messageFactory.buildAddPartitionMessage(table, partitionEvent.getPartitionIterator(), + partitionEvent.getLocOwner()), topicName); } else { LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partitionEvent.getTable().getDbName() @@ -141,7 +142,7 @@ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { String topicName = getTopicName(ape.getTable()); send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after, - ape.getWriteId()), topicName); + ape.getWriteId(), ape.getLocOwner()), topicName); } } @@ -221,7 +222,7 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { throw me; } String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase(); - send(messageFactory.buildCreateTableMessage(newTbl), topicName); + send(messageFactory.buildCreateTableMessage(newTbl, tableEvent.getLocOwner()), topicName); } } @@ -255,7 +256,8 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { // DB topic - Alan. String topicName = getTopicPrefix(tableEvent.getIHMSHandler().getConf()) + "." + after.getDbName().toLowerCase(); - send(messageFactory.buildAlterTableMessage(before, after, tableEvent.getWriteId()), topicName); + send(messageFactory.buildAlterTableMessage(before, after, tableEvent.getWriteId(), + tableEvent.getLocOwner()), topicName); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java index e16ffa5269..a5ab595624 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AddPartitionMessage.java @@ -50,6 +50,8 @@ protected AddPartitionMessage() { */ public abstract List> getPartitions (); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java index fc8f6b7560..2b8ed5e904 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -42,6 +42,8 @@ protected AlterPartitionMessage() { public abstract Long getWriteId(); + public abstract String getLocOwner(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java index 416dd10bb2..817173a02d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java @@ -42,4 +42,6 @@ public HCatEventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java index c7ca09cc24..9de307ab19 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/CreateTableMessage.java @@ -47,4 +47,6 @@ public HCatEventMessage checkValid() { throw new IllegalStateException("Table name unset."); return super.checkValid(); } + + public abstract String getLocOwner(); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java index 1754df8501..98e025e1d1 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/InsertMessage.java @@ -62,4 +62,6 @@ public HCatEventMessage checkValid() { throw new IllegalStateException("Table name unset."); return super.checkValid(); } + + public abstract String getLocOwner(); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index 782fffb516..ddc6d0d52a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -117,9 +117,10 @@ public static MessageDeserializer getDeserializer(String format, /** * Factory method for CreateTableMessage. * @param table The Table being created. + * @param locOwner Owner of the table location. * @return CreateTableMessage instance. */ - public abstract CreateTableMessage buildCreateTableMessage(Table table); + public abstract CreateTableMessage buildCreateTableMessage(Table table, String locOwner); /** * Factory method for AlterTableMessage. Unlike most of these calls, this one can return null, @@ -129,9 +130,10 @@ public static MessageDeserializer getDeserializer(String format, * @param before The table before the alter * @param after The table after the alter * @param writeId writeId under which alter is done (for ACID tables) + * @param locOwner Owner of the table location. * @return */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId); + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId, String locOwner); /** * Factory method for DropTableMessage. @@ -140,13 +142,15 @@ public static MessageDeserializer getDeserializer(String format, */ public abstract DropTableMessage buildDropTableMessage(Table table); - /** - * Factory method for AddPartitionMessage. - * @param table The Table to which the partitions are added. - * @param partitions The iterator to set of Partitions being added. - * @return AddPartitionMessage instance. - */ - public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions); + /** + * Factory method for AddPartitionMessage. + * @param table The Table to which the partitions are added. + * @param partitions The iterator to set of Partitions being added. + * @param locOwner Owner of the table/partition location. + * @return AddPartitionMessage instance. + */ + public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitions, + String locOwner); /** * Factory method for building AlterPartitionMessage @@ -154,10 +158,11 @@ public static MessageDeserializer getDeserializer(String format, * @param before The partition before it was altered * @param after The partition after it was altered * @param writeId writeId under which alter is done (for ACID tables) + * @param locOwner Owner of the table/partition location. * @return a new AlterPartitionMessage */ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, Long writeId); + Partition after, Long writeId, String locOwner); /** * Factory method for DropPartitionMessage. @@ -188,10 +193,11 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa * @param partVals Partition values for the partition that the insert occurred in, may be null * if the insert was done into a non-partitioned table * @param files List of files created as a result of the insert, may be null. + * @param locOwner Owner of the table/partition location. * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map partVals, List files); + Map partVals, List files, String locOwner); /** * Factory method for building insert message @@ -200,8 +206,9 @@ public abstract InsertMessage buildInsertMessage(String db, String table, * @param partVals Partition values for the partition that the insert occurred in, may be null * if the insert was done into a non-partitioned table * @param files List of files created as a result of the insert, may be null. + * @param locOwner Owner of the table/partition location. * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, Table table, - Map partVals, List files); + Map partVals, List files, String locOwner); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java index ebdd29e413..1f1b202420 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAddPartitionMessage.java @@ -31,7 +31,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -45,12 +45,12 @@ public JSONAddPartitionMessage() {} public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - List> partitions, Long timestamp) { - this(server, servicePrincipal, db, table, null, partitions, timestamp); + List> partitions, String locOwner, Long timestamp) { + this(server, servicePrincipal, db, table, null, partitions, locOwner, timestamp); } public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - String tableType, List> partitions, Long timestamp) { + String tableType, List> partitions, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -58,6 +58,7 @@ public JSONAddPartitionMessage(String server, String servicePrincipal, String db this.tableType = tableType; this.partitions = partitions; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } @@ -84,6 +85,9 @@ public String getTableType() { @Override public List> getPartitions () { return partitions; } + @Override + public String getLocOwner() { return locOwner; } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index c28524165e..fb111b7d59 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -34,7 +34,7 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp, writeId; @@ -53,8 +53,9 @@ public JSONAlterPartitionMessage(String server, String table, Map keyValues, Long writeId, + String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, keyValues, writeId, timestamp); + this(server, servicePrincipal, db, table, null, keyValues, writeId, locOwner, timestamp); } public JSONAlterPartitionMessage(String server, @@ -64,6 +65,7 @@ public JSONAlterPartitionMessage(String server, String tableType, Map keyValues, long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -73,6 +75,7 @@ public JSONAlterPartitionMessage(String server, this.timestamp = timestamp; this.keyValues = keyValues; this.writeId = writeId; + this.locOwner = locOwner; checkValid(); } @@ -116,6 +119,9 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { return locOwner; } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index 9c0799b473..77bc5a7481 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -32,7 +32,7 @@ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp, writeId; @@ -47,8 +47,9 @@ public JSONAlterTableMessage(String server, String db, String table, Long writeId, - Long timestamp) { - this(server, servicePrincipal, db, table, null, writeId, timestamp); + Long timestamp, + String locOwner) { + this(server, servicePrincipal, db, table, null, writeId, locOwner, timestamp); } public JSONAlterTableMessage(String server, @@ -57,6 +58,7 @@ public JSONAlterTableMessage(String server, String table, String tableType, Long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -65,6 +67,7 @@ public JSONAlterTableMessage(String server, this.tableType = tableType; this.timestamp = timestamp; this.writeId = writeId; + this.locOwner = locOwner; checkValid(); } @@ -103,6 +106,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java index 60333d5aec..07e96ef541 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateTableMessage.java @@ -28,7 +28,7 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -38,19 +38,20 @@ */ public JSONCreateTableMessage() {} - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, timestamp); + this(server, servicePrincipal, db, table, null, locOwner, timestamp); } public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, - String tableType, Long timestamp) { + String tableType, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.tableType = tableType; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } @@ -74,6 +75,11 @@ public String getTableType() { if (tableType != null) return tableType; else return ""; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java index e05113cd33..27d4187049 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONInsertMessage.java @@ -31,7 +31,7 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType; + String server, servicePrincipal, db, table, tableType, locOwner; @JsonProperty Long timestamp; @@ -48,13 +48,13 @@ public JSONInsertMessage() {} public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map partKeyVals, List files, Long timestamp) { - this(server, servicePrincipal, db, table, null, partKeyVals, files, timestamp); + Map partKeyVals, List files, String locOwner, Long timestamp) { + this(server, servicePrincipal, db, table, null, partKeyVals, files, locOwner, timestamp); } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, String tableType, Map partKeyVals, List files, - Long timestamp) { + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -63,6 +63,7 @@ public JSONInsertMessage(String server, String servicePrincipal, String db, Stri this.timestamp = timestamp; this.partKeyVals = partKeyVals; this.files = files; + this.locOwner = locOwner; checkValid(); } @@ -97,6 +98,9 @@ public String getTableType() { @Override public Long getTimestamp() { return timestamp; } + @Override + public String getLocOwner() { return locOwner; } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 770dd1e5a6..bb64565c9e 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -82,15 +82,15 @@ public DropDatabaseMessage buildDropDatabaseMessage(Database db) { } @Override - public CreateTableMessage buildCreateTableMessage(Table table) { + public CreateTableMessage buildCreateTableMessage(Table table, String locOwner) { return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), now()); } @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId) { + public AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId, String locOwner) { return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), - before.getTableName(), before.getTableType(), writeId, now()); + before.getTableName(), before.getTableType(), writeId, locOwner, now()); } @Override @@ -100,18 +100,19 @@ public DropTableMessage buildDropTableMessage(Table table) { } @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator) { + public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator partitionsIterator, + String locOwner) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), - MessageBuilder.getPartitionKeyValues(table, partitionsIterator), now()); + MessageBuilder.getPartitionKeyValues(table, partitionsIterator), locOwner, now()); } @Override public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, Partition after, - Long writeId) { + Long writeId, String locOwner) { return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), before.getTableName(), table.getTableType(), - MessageBuilder.getPartitionKeyValues(table,before), writeId, now()); + MessageBuilder.getPartitionKeyValues(table,before), writeId, locOwner, now()); } @Override @@ -135,16 +136,16 @@ public DropFunctionMessage buildDropFunctionMessage(Function fn) { @Override public InsertMessage buildInsertMessage(String db, String table, Map partKeyVals, - List files) { + List files, String locOwner) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db, table, null, - partKeyVals, files, now()); + partKeyVals, files, locOwner, now()); } @Override public InsertMessage buildInsertMessage(String db, Table table, Map partKeyVals, - List files) { + List files, String locOwner) { return new JSONInsertMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), table.getTableType(), partKeyVals, files, now()); + table.getTableName(), table.getTableType(), partKeyVals, files, locOwner, now()); } private long now() { diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java index dca56ee031..770f7fdd16 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/TestReplicationTask.java @@ -73,7 +73,7 @@ public static void testCreate() throws HCatException { t.setDbName("testdb"); t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(0, (int)System.currentTimeMillis(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t, null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java index 846ebc75e7..253115a019 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/repl/exim/TestEximReplicationTasks.java @@ -173,7 +173,7 @@ public void testCreateTable() throws IOException { t.setDbName("testdb"); t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), - HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t, null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -267,7 +267,7 @@ public void testAlterTable() throws IOException { t.setTableName("testtable"); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), HCatConstants.HCAT_ALTER_TABLE_EVENT, - msgFactory.buildAlterTableMessage(t, t, t.getWriteId()).toString()); + msgFactory.buildAlterTableMessage(t, t, t.getWriteId(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -324,7 +324,8 @@ public void testAddPartition() throws IOException { addedPtns.add(createPtn(t, Arrays.asList("201", "xyz"))); NotificationEvent event = new NotificationEvent(getEventId(), getTime(), - HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, addedPtns.iterator()).toString()); + HCatConstants.HCAT_ADD_PARTITION_EVENT, msgFactory.buildAddPartitionMessage(t, + addedPtns.iterator(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -456,7 +457,7 @@ public void testAlterPartition() throws HCatException { NotificationEvent event = new NotificationEvent(getEventId(), getTime(), HCatConstants.HCAT_ALTER_PARTITION_EVENT, msgFactory.buildAlterPartitionMessage(t, - p, p, p.getWriteId()).toString()); + p, p, p.getWriteId(), null).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -511,7 +512,7 @@ public void testInsert() throws HCatException { t.getDbName(), t.getTableName(), getPtnDesc(t,p), - files + files, null ).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 3820fabbf9..4104aca00f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3484,7 +3484,7 @@ private NotificationEvent createDummyEvent(String dbname, String tblname, long e evid, (int)System.currentTimeMillis(), MessageBuilder.CREATE_TABLE_EVENT, - MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()) + MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator(), null) .toString() ); event.setDbName(t.getDbName()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 58561d4b91..fc5b993c70 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.hive.ql.parse.WarehouseInstance; +import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +116,7 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); put("hive.strict.managed.tables", "false"); + put("strict.managed.tables.migration.owner", System.getProperty("user.name")); }}; configsForPrimary.putAll(overrideConfigs); primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); @@ -427,4 +430,67 @@ public void testIncrementalLoadMigrationToAcidWithMoveOptimization() throws Thro replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); } + + private boolean isExternal(Table table) { + return "TRUE".equalsIgnoreCase(table.getParameters().get("EXTERNAL")); + } + + @Test + public void testMigrarionLocationOwnership() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table tbl (fld int)") + .run("insert into tbl values (1)") + .run("create table tbl_part (fld int) partitioned by (part int)") + .run("insert into tbl_part partition (part = 1) values (1)") + .run("insert into tbl_part partition (part = 2) values (2)"); + + Table table = primary.getTable(primaryDbName, "tbl"); + Path location = new Path(table.getSd().getLocation()); + FileSystem fs = location.getFileSystem(primary.getConf()); + fs.setOwner(location, "junk", "junk"); + Path partLoc = new Path(primary.getPartition(primaryDbName, + "tbl_part", Collections.singletonList("1")).getSd().getLocation()); + fs.setOwner(partLoc, "junk", "junk"); + + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run(" select fld from tbl") + .verifyResult("1") + .run(" select fld from tbl_part") + .verifyResults(new String[] {"1", "2"}); + + assertTrue(isExternal(replica.getTable(replicatedDbName, "tbl"))); + assertTrue(isExternal(replica.getTable(replicatedDbName, "tbl_part"))); + assertFalse(isExternal(primary.getTable(primaryDbName, "tbl"))); + assertFalse(isExternal(primary.getTable(primaryDbName, "tbl_part"))); + + Path newLoc = new Path(location.getParent(), "tbl_inc"); + assertTrue(fs.mkdirs(newLoc)); + fs.setOwner(newLoc, "junk", "junk"); + + Path newLocPart = new Path(location.getParent(), "tbl_inc_part_3"); + assertTrue(fs.mkdirs(newLocPart)); + fs.setOwner(newLocPart, "junk", "junk"); + + tuple = primary.run("use " + primaryDbName) + .run("create table tbl_inc (fld int, fld1 int) location '" + newLoc.toUri() + "'") + .run("insert into tbl_inc values (1, 2)") + .run("create table tbl_inc_part (fld int) partitioned by (part string) location '" + newLocPart.toUri() + "'") + .run("insert into tbl_inc_part partition (part = 'part1') values (1)") + .run("insert into tbl_inc_part partition (part = 'part2') values (2)") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run(" select fld from tbl_inc") + .verifyResult("1") + .run(" select fld from tbl_inc_part") + .verifyResults(new String[] {"1", "2"}); + + assertTrue(isExternal(replica.getTable(replicatedDbName, "tbl_inc"))); + assertFalse(isExternal(primary.getTable(primaryDbName, "tbl_inc"))); + assertTrue(isExternal(replica.getTable(replicatedDbName, "tbl_inc_part"))); + assertFalse(isExternal(primary.getTable(primaryDbName, "tbl_inc_part"))); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 22b6e98d5f..7dc951beb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -99,7 +99,7 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(), table.getTableType(), null, hiveConf, - hiveDb.getMSC(), true); + hiveDb.getMSC(), replicationSpec().isPathOwnedByHive()); HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(), migrationOption, false, getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 072189b344..0f651211e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -165,7 +165,8 @@ public static boolean replCkptStatus(String dbName, Map props, S String actualTblName, HiveConf conf, UpdatedMetaDataTracker updatedMetaDataTracker, Task childTask, - org.apache.hadoop.hive.metastore.api.Table tableObj) + org.apache.hadoop.hive.metastore.api.Table tableObj, + boolean isPathOwnedByHive) throws IOException, TException { List> taskList = new ArrayList<>(); taskList.add(childTask); @@ -175,7 +176,7 @@ public static boolean replCkptStatus(String dbName, Map props, S //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata. HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE, - null, conf, null, true); + null, conf, null, isPathOwnedByHive); if (migrationOption == MANAGED) { //if conversion to managed table. Task replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 76c69cf24b..3e735b6543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -301,10 +301,10 @@ public static void createExportDump(FileSystem fs, Path metadataPath, Table tabl } try (JsonWriter writer = new JsonWriter(fs, metadataPath)) { + new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); if (replicationSpec.isInReplicationScope()) { new ReplicationSpecSerializer().writeTo(writer, replicationSpec); } - new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cb9584c1d1..d95e7e205e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -203,15 +203,15 @@ private void parsePartitionSpec(ASTNode tableNode, LinkedHashMap } private static void upgradeTableDesc(org.apache.hadoop.hive.metastore.api.Table tableObj, MetaData rv, - EximUtil.SemanticAnalyzerWrapperContext x) + EximUtil.SemanticAnalyzerWrapperContext x, Boolean isPathOwnedByHive) throws IOException, TException, HiveException { x.getLOG().debug("Converting table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + - " with para " + tableObj.getParameters()); + " with para " + tableObj.getParameters() + " isPathOwnedByHive " + isPathOwnedByHive); //TODO : isPathOwnedByHive is hard coded to true, need to get it from repl dump metadata. TableType tableType = TableType.valueOf(tableObj.getTableType()); HiveStrictManagedMigration.TableMigrationOption migrationOption = HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, tableType, - null, x.getConf(), x.getHive().getMSC(), true); + null, x.getConf(), x.getHive().getMSC(), isPathOwnedByHive); HiveStrictManagedMigration.migrateTable(tableObj, tableType, migrationOption, false, getHiveUpdater(x.getConf()), x.getHive().getMSC(), x.getConf()); x.getLOG().debug("Converted table " + tableObj.getTableName() + " of type " + tableObj.getTableType() + @@ -284,7 +284,7 @@ public static boolean prepareImport(boolean isImportCmd, x.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && (TableType.valueOf(tblObj.getTableType()) == TableType.MANAGED_TABLE)) { //TODO : dump metadata should be read to make sure that migration is required. - upgradeTableDesc(tblObj, rv, x); + upgradeTableDesc(tblObj, rv, x, replicationSpec.isPathOwnedByHive()); //if the conversion is from non transactional to transactional table if (TxnUtils.isTransactionalTable(tblObj)) { replicationSpec.setMigratingToTxnTable(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 48213d1b4e..c8491e93d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -19,6 +19,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -50,6 +51,8 @@ private boolean isMigratingToTxnTable = false; private boolean isMigratingToExternalTable = false; private boolean needDupCopyCheck = false; + private boolean isPathOwnedByHive = true; + private boolean isEventBasedOwnershipCheck = true; // Key definitions related to replication public enum KEY { @@ -60,7 +63,8 @@ LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), VALID_WRITEID_LIST("repl.valid.writeid.list"), - VALID_TXN_LIST("repl.valid.txnid.list") + VALID_TXN_LIST("repl.valid.txnid.list"), + PATH_OWNED_BY_HIVE("repl.path.owned.by.hive") ; private final String keyName; @@ -128,6 +132,7 @@ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, this.isLazy = isLazy; this.isReplace = isReplace; this.specType = Type.DEFAULT; + this.isPathOwnedByHive = true; } public ReplicationSpec(Function keyFetcher) { @@ -150,6 +155,10 @@ public ReplicationSpec(Function keyFetcher) { this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString()); + String pathOwnedByHive = keyFetcher.apply(KEY.PATH_OWNED_BY_HIVE.toString()); + if (pathOwnedByHive != null) { + this.isPathOwnedByHive = Boolean.parseBoolean(pathOwnedByHive); + } } /** @@ -390,6 +399,8 @@ public String get(KEY key) { return getValidWriteIdList(); case VALID_TXN_LIST: return getValidTxnList(); + case PATH_OWNED_BY_HIVE: + return String.valueOf(isPathOwnedByHive()); } return null; } @@ -437,4 +448,30 @@ public void setNeedDupCopyCheck(boolean isFirstIncPending) { // Check HIVE-21197 for more detail. this.needDupCopyCheck = isFirstIncPending; } + + public boolean isPathOwnedByHive() { + return isPathOwnedByHive; + } + + public static boolean isPathOwnedByHive(HiveConf conf, String user) { + String ownerName = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRARTION_OWNER.varname, "hive"); + return (user == null || ownerName.equals(user)); + } + + public void setPathOwnedByHive(boolean pathOwnedByHive) { + isPathOwnedByHive = pathOwnedByHive; + } + + public void setPathOwnedByHive(HiveConf conf, String user) { + String ownerName = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRARTION_OWNER.varname, "hive"); + isPathOwnedByHive = (user == null || ownerName.equals(user)); + } + + public void setEventBasedOwnershipCheck(boolean eventBasedOwnershipCheck) { + isEventBasedOwnershipCheck = eventBasedOwnershipCheck; + } + + public boolean isEventBasedOwnershipCheck() { + return isEventBasedOwnershipCheck; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 4cd4d7023e..7abb4444a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -84,6 +84,34 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.conf = conf; this.paths = paths; this.mmCtx = mmCtx; + this.replicationSpec.setEventBasedOwnershipCheck(false); + setPathOwnedByHive(this.replicationSpec, tableSpec.tableHandle.getDataLocation(), db.getConf()); + } + + public static void setPathOwnedByHive(ReplicationSpec replicationSpec, Path location, HiveConf conf) { + // For incremental load path, this flag should be set using the owner name in the event. + if (replicationSpec == null || !replicationSpec.isInReplicationScope() || + replicationSpec.isEventBasedOwnershipCheck()) { + return; + } + + // If the table path or path of any of the partitions is not owned by hive, + // then table location not owned by hive for whole table. + if (!replicationSpec.isPathOwnedByHive()) { + logger.info("Path is not owned by hive user for table or some partition. No need to check further."); + return; + } + + try { + FileStatus fileStatus = location.getFileSystem(conf).getFileStatus(location); + String hiveOwner = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRARTION_OWNER.varname, "hive"); + replicationSpec.setPathOwnedByHive(hiveOwner.equals(fileStatus.getOwner())); + logger.debug("Owner of path " + location + " is " + fileStatus.getOwner() + + " replicationSpec.setPathOwnedByHive with " + replicationSpec.isPathOwnedByHive()); + } catch (Exception e) { + logger.error("Failed to get location owner ", e); + throw new RuntimeException(e.getMessage()); + } } public boolean write() throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 0756f59a81..65dd192f33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -77,6 +77,8 @@ public void handle(Context withinContext) throws Exception { return; } + withinContext.replicationSpec.setPathOwnedByHive(withinContext.hiveConf, apm.getLocOwner()); + Iterable qlPtns = StreamSupport.stream(ptns.spliterator(), true).map( input -> { if (input == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index e59bdf67f8..8d14d73db0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -104,6 +104,8 @@ public void handle(Context withinContext) throws Exception { return; } + withinContext.replicationSpec.setPathOwnedByHive(withinContext.hiveConf, eventMessage.getLocOwner()); + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); List partitions = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4deb551617..8954c107da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -103,6 +103,8 @@ public void handle(Context withinContext) throws Exception { qlMdTableAfter.setStatsStateLikeNewTable(); } + withinContext.replicationSpec.setPathOwnedByHive(withinContext.hiveConf, eventMessage.getLocOwner()); + EximUtil.createExportDump( metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 8a838db508..32faf593e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -69,6 +69,8 @@ public void handle(Context withinContext) throws Exception { qlMdTable.setStatsStateLikeNewTable(); } + withinContext.replicationSpec.setPathOwnedByHive(withinContext.hiveConf, eventMessage.getLocOwner()); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( metaDataPath.getFileSystem(withinContext.hiveConf), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 1bcd52923b..980cde9ba5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -73,6 +73,7 @@ public void handle(Context withinContext) throws Exception { // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation withinContext.replicationSpec.setIsReplace(eventMessage.isReplace()); + withinContext.replicationSpec.setPathOwnedByHive(withinContext.hiveConf, eventMessage.getLocOwner()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 552183af5b..070e6936c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -103,6 +104,7 @@ private void writePartitions(JsonWriter writer, ReplicationSpec additionalProper writer.jsonGenerator.writeStartArray(); if (partitions != null) { for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { + TableExport.setPathOwnedByHive(additionalPropertiesProvider, partition.getDataLocation(), hiveConf); new PartitionSerializer(partition.getTPartition()) .writeTo(writer, additionalPropertiesProvider); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 05a9f9123f..ca982b017d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import java.io.Serializable; @@ -66,8 +67,10 @@ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); try { + // User firing the repl load command sets the config for hive user to be used. return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName, - context.hiveConf, updatedMetadata, truncatePtnTask, tblObj); + context.hiveConf, updatedMetadata, truncatePtnTask, tblObj, + ReplicationSpec.isPathOwnedByHive(context.hiveConf, msg.getLocOwner())); } catch (Exception e) { throw new SemanticException(e.getMessage()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 5ef66fafa4..7644f23137 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import java.io.Serializable; @@ -47,8 +48,10 @@ updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); try { + // User firing the repl load command sets the config for hive user to be used. return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName, - context.hiveConf, updatedMetadata, truncateTableTask, msg.getTableObjBefore()); + context.hiveConf, updatedMetadata, truncateTableTask, msg.getTableObjBefore(), + ReplicationSpec.isPathOwnedByHive(context.hiveConf, msg.getLocOwner())); } catch (Exception e) { throw new SemanticException(e.getMessage()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java index 80025b7046..e365981803 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/HiveStrictManagedMigration.java @@ -367,7 +367,7 @@ void checkOldWarehouseRoot() throws IOException, MetaException { void checkOwnerPermsOptions() { if (runOptions.shouldModifyManagedTableOwner) { - ownerName = conf.get("strict.managed.tables.migration.owner", "hive"); + ownerName = conf.get(HiveConf.ConfVars.STRICT_MANAGED_TABLES_MIGRARTION_OWNER.varname, "hive"); groupName = conf.get("strict.managed.tables.migration.group", null); } if (runOptions.shouldModifyManagedTablePermissions) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index 155ecb18bf..fdfcdb258e 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -594,4 +594,13 @@ void findNext() throws IOException { nextFile = null; } } + + public static String getLocationOwner(String location, Configuration conf) { + try { + Path path = new Path(location); + return path.getFileSystem(conf).getFileStatus(path).getOwner(); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java index d4542d7a5b..a429bfd04f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import java.util.Arrays; import java.util.Iterator; @@ -36,17 +37,26 @@ private final Table table; private final List partitions; private PartitionSpecProxy partitionSpecProxy; + private final String locOwner; - public AddPartitionEvent(Table table, List partitions, boolean status, + public AddPartitionEvent(Table table, List partitions, PartitionSpecProxy partitionSpec, boolean status, IHMSHandler handler) { super(status, handler); this.table = table; this.partitions = partitions; - this.partitionSpecProxy = null; + this.partitionSpecProxy = partitionSpec; + + // The table location owner is same as partition location owner if the database is source of replication. + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); + } + + public AddPartitionEvent(Table table, List partitions, boolean status, + IHMSHandler handler) { + this(table, partitions, null, status, handler); } public AddPartitionEvent(Table table, Partition partition, boolean status, IHMSHandler handler) { - this(table, Arrays.asList(partition), status, handler); + this(table, Arrays.asList(partition), null, status, handler); } /** @@ -54,10 +64,7 @@ public AddPartitionEvent(Table table, Partition partition, boolean status, IHMSH */ public AddPartitionEvent(Table table, PartitionSpecProxy partitionSpec, boolean status, IHMSHandler handler) { - super(status, handler); - this.table = table; - this.partitions = null; - this.partitionSpecProxy = partitionSpec; + this(table, null, partitionSpec, status, handler); } /** @@ -81,4 +88,7 @@ public Table getTable() { } } + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java index 499c6e4f31..03f62bf2cc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable @@ -33,6 +34,7 @@ private final Table table; private final boolean isTruncateOp; private Long writeId; + private String locOwner; public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp, boolean status, Long writeId, IHMSHandler handler) { @@ -42,6 +44,9 @@ public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, bo this.table = table; this.isTruncateOp = isTruncateOp; this.writeId = writeId; + + // The table location owner is same as partition location owner if the database is source of replication. + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); } /** @@ -78,4 +83,8 @@ public boolean getIsTruncateOp() { public Long getWriteId() { return writeId; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java index 541fbe48ac..676691d365 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable @@ -32,6 +33,7 @@ private final Table oldTable; private final boolean isTruncateOp; private Long writeId; + private final String locOwner; public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, Long writeId, IHMSHandler handler) { @@ -40,6 +42,11 @@ public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, bo this.newTable = newTable; this.isTruncateOp = isTruncateOp; this.writeId = writeId; + if (newTable.getSd() == null) { + locOwner = FileUtils.getLocationOwner(oldTable.getSd().getLocation(), handler.getConf()); + } else { + locOwner = FileUtils.getLocationOwner(newTable.getSd().getLocation(), handler.getConf()); + } } /** @@ -66,4 +73,8 @@ public boolean getIsTruncateOp() { public Long getWriteId() { return writeId; } + + public String getLocOwner() { + return locOwner; + } } \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java index 4f5e887ac2..63eeec09bf 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CreateTableEvent.java @@ -22,16 +22,19 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; @InterfaceAudience.Public @InterfaceStability.Stable public class CreateTableEvent extends ListenerEvent { private final Table table; + private final String locOwner; public CreateTableEvent (Table table, boolean status, IHMSHandler handler) { super (status, handler); this.table = table; + locOwner = FileUtils.getLocationOwner(table.getSd().getLocation(), handler.getConf()); } /** @@ -40,4 +43,8 @@ public CreateTableEvent (Table table, boolean status, IHMSHandler handler) { public Table getTable () { return table; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index 60ad7db60e..59d23c6469 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.thrift.TException; @@ -46,6 +47,7 @@ private final boolean replace; private final List files; private List fileChecksums = new ArrayList<>(); + private final String locOwner; /** * @@ -89,6 +91,9 @@ public InsertEvent(String catName, String db, String table, List partVal if (insertData.isSetFilesAddedChecksum()) { fileChecksums = insertData.getFilesAddedChecksum(); } + + // The table location owner is same as partition location owner if the database is source of replication. + locOwner = FileUtils.getLocationOwner(tableObj.getSd().getLocation(), handler.getConf()); } /** @@ -129,4 +134,8 @@ public boolean isReplace() { public List getFileChecksums() { return fileChecksums; } + + public String getLocOwner() { + return locOwner; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java index 3262b52bea..42fdceed3e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java @@ -65,4 +65,5 @@ public EventMessage checkValid() { */ public abstract Iterable getPartitionFilesIter(); + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java index a1ba01a36f..ea8c9aa3a2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -67,5 +67,7 @@ public EventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java index bbc01c1a5e..c5037665fa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -57,4 +57,6 @@ public EventMessage checkValid() { } public abstract Long getWriteId(); + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java index 49732ff97d..4b9f9dd408 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java @@ -37,6 +37,8 @@ protected CreateTableMessage() { public abstract Table getTableObj() throws Exception; + public abstract String getLocOwner(); + /** * Get list of files created as a result of this DML operation * diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index c470097165..a824324a14 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -72,4 +72,6 @@ public EventMessage checkValid() { throw new IllegalStateException("Table name unset."); return super.checkValid(); } + + public abstract String getLocOwner(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java index aa83da4ed5..434bf32cb9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java @@ -178,14 +178,14 @@ public DropDatabaseMessage buildDropDatabaseMessage(Database db) { return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); } - public CreateTableMessage buildCreateTableMessage(Table table, Iterator fileIter) { - return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now()); + public CreateTableMessage buildCreateTableMessage(Table table, Iterator fileIter, String locOwner) { + return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, locOwner, now()); } public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, - Long writeId) { + Long writeId, String locOwner) { return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, - isTruncateOp, writeId, now()); + isTruncateOp, writeId, locOwner, now()); } public DropTableMessage buildDropTableMessage(Table table) { @@ -193,15 +193,15 @@ public DropTableMessage buildDropTableMessage(Table table) { } public AddPartitionMessage buildAddPartitionMessage(Table table, - Iterator partitionsIterator, Iterator partitionFileIter) { + Iterator partitionsIterator, Iterator partitionFileIter, String locOwner) { return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, - partitionsIterator, partitionFileIter, now()); + partitionsIterator, partitionFileIter, locOwner, now()); } public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp, Long writeId) { + Partition after, boolean isTruncateOp, Long writeId, String locOwner) { return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - table, before, after, isTruncateOp, writeId, now()); + table, before, after, isTruncateOp, writeId, locOwner, now()); } public DropPartitionMessage buildDropPartitionMessage(Table table, @@ -219,9 +219,9 @@ public DropFunctionMessage buildDropFunctionMessage(Function fn) { } public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, - boolean replace, Iterator fileIter) { + boolean replace, Iterator fileIter, String locOwner) { return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - tableObj, partObj, replace, fileIter, now()); + tableObj, partObj, replace, fileIter, locOwner, now()); } public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List pks) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java index 6494cb8dc7..713336da3d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -44,7 +44,7 @@ public class JSONAddPartitionMessage extends AddPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty Long timestamp; @@ -68,7 +68,7 @@ public JSONAddPartitionMessage() { * Note that we get an Iterator rather than an Iterable here: so we can only walk thru the list once */ public JSONAddPartitionMessage(String server, String servicePrincipal, Table tableObj, - Iterator partitionsIterator, Iterator partitionFileIter, + Iterator partitionsIterator, Iterator partitionFileIter, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -91,6 +91,7 @@ public JSONAddPartitionMessage(String server, String servicePrincipal, Table tab } this.partitionFiles = (partitionFileIter != null) ? Lists.newArrayList(partitionFileIter) : Lists.newArrayList(); + this.locOwner = locOwner; checkValid(); } @@ -160,6 +161,9 @@ public String getTableObjJson() { return partitionListJson; } + @Override + public String getLocOwner() { return locOwner; } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index 414402f705..fbe41e9ffe 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -34,7 +34,7 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty String isTruncateOp; @@ -55,7 +55,8 @@ public JSONAlterPartitionMessage() { } public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, - Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long writeId, Long timestamp) { + Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long writeId, + String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObj.getDbName(); @@ -65,6 +66,7 @@ public JSONAlterPartitionMessage(String server, String servicePrincipal, Table t this.timestamp = timestamp; this.keyValues = MessageBuilder.getPartitionKeyValues(tableObj, partitionObjBefore); this.writeId = writeId; + this.locOwner = locOwner; try { this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); this.partitionObjBeforeJson = MessageBuilder.createPartitionObjJson(partitionObjBefore); @@ -149,6 +151,9 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { return locOwner; } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index 8c621b2701..68d72106b8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -31,7 +31,7 @@ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjBeforeJson, tableObjAfterJson; + String server, servicePrincipal, db, table, tableType, tableObjBeforeJson, tableObjAfterJson, locOwner; @JsonProperty String isTruncateOp; @@ -46,7 +46,7 @@ public JSONAlterTableMessage() { } public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter, - boolean isTruncateOp, Long writeId, Long timestamp) { + boolean isTruncateOp, Long writeId, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObjBefore.getDbName(); @@ -61,6 +61,7 @@ public JSONAlterTableMessage(String server, String servicePrincipal, Table table } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } + this.locOwner = locOwner; checkValid(); } @@ -124,6 +125,11 @@ public Long getWriteId() { return writeId == null ? 0 : writeId; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java index 145ee4b199..503fb69673 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -36,7 +36,7 @@ public class JSONCreateTableMessage extends CreateTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson; + String server, servicePrincipal, db, table, tableType, tableObjJson, locOwner; @JsonProperty Long timestamp; @JsonProperty @@ -49,25 +49,26 @@ public JSONCreateTableMessage() { } public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, - String tableType, Long timestamp) { + String tableType, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.tableType = tableType; this.timestamp = timestamp; + this.locOwner = locOwner; checkValid(); } - public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, + public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, String locOwner, Long timestamp) { - this(server, servicePrincipal, db, table, null, timestamp); + this(server, servicePrincipal, db, table, null, locOwner, timestamp); } public JSONCreateTableMessage(String server, String servicePrincipal, Table tableObj, - Iterator fileIter, Long timestamp) { + Iterator fileIter, String locOwner, Long timestamp) { this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), - tableObj.getTableType(), timestamp); + tableObj.getTableType(), locOwner, timestamp); try { this.tableObjJson = MessageBuilder.createTableObjJson(tableObj); } catch (TException e) { @@ -119,6 +120,11 @@ public String getTableObjJson() { return tableObjJson; } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index 40d480b7e3..5020550e97 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -37,7 +37,7 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty - String server, servicePrincipal, db, table, tableType, tableObjJson, ptnObjJson; + String server, servicePrincipal, db, table, tableType, tableObjJson, ptnObjJson, locOwner; @JsonProperty Long timestamp; @@ -55,7 +55,7 @@ public JSONInsertMessage() { } public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, Partition ptnObj, - boolean replace, Iterator fileIter, Long timestamp) { + boolean replace, Iterator fileIter, String locOwner, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -81,6 +81,7 @@ public JSONInsertMessage(String server, String servicePrincipal, Table tableObj, this.timestamp = timestamp; this.replace = Boolean.toString(replace); this.files = Lists.newArrayList(fileIter); + this.locOwner = locOwner; checkValid(); } @@ -137,6 +138,11 @@ public Partition getPtnObj() throws Exception { return ((null == ptnObjJson) ? null : (Partition) MessageBuilder.getTObj(ptnObjJson, Partition.class)); } + @Override + public String getLocOwner() { + return locOwner; + } + @Override public String toString() { try {