diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 2ab59d7a02..4f8f1ab4e3 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -193,7 +193,7 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table after = tableEvent.getNewTable(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), tableEvent.getWriteId()).toString()); event.setCatName(after.isSetCatName() ? after.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); @@ -342,7 +342,8 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce Partition after = partitionEvent.getNewPartition(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(), + partitionEvent.getWriteId()).toString()); event.setCatName(before.isSetCatName() ? before.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 649d901209..1406e5a192 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -140,7 +140,8 @@ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { Partition after = ape.getNewPartition(); String topicName = getTopicName(ape.getTable()); - send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after), topicName); + send(messageFactory.buildAlterPartitionMessage(ape.getTable(),before, after, + ape.getWriteId()), topicName); } } @@ -254,7 +255,7 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { // DB topic - Alan. String topicName = getTopicPrefix(tableEvent.getIHMSHandler().getConf()) + "." + after.getDbName().toLowerCase(); - send(messageFactory.buildAlterTableMessage(before, after), topicName); + send(messageFactory.buildAlterTableMessage(before, after, tableEvent.getWriteId()), topicName); } } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java index ccaae4edbf..fc8f6b7560 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterPartitionMessage.java @@ -40,6 +40,8 @@ protected AlterPartitionMessage() { public abstract Map getKeyValues(); + public abstract Long getWriteId(); + @Override public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java index cf292c1eb8..416dd10bb2 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/AlterTableMessage.java @@ -40,4 +40,6 @@ public HCatEventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); return super.checkValid(); } + + public abstract Long getWriteId(); } diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java index a248bbf813..782fffb516 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java @@ -128,9 +128,10 @@ public static MessageDeserializer getDeserializer(String format, * and some are not yet supported. * @param before The table before the alter * @param after The table after the alter + * @param writeId writeId under which alter is done (for ACID tables) * @return */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after); + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId); /** * Factory method for DropTableMessage. @@ -152,10 +153,11 @@ public static MessageDeserializer getDeserializer(String format, * @param table The table in which the partition is being altered * @param before The partition before it was altered * @param after The partition after it was altered + * @param writeId writeId under which alter is done (for ACID tables) * @return a new AlterPartitionMessage */ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after); + Partition after, Long writeId); /** * Factory method for DropPartitionMessage. diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java index 8aea1adf72..542fe7c5af 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterPartitionMessage.java @@ -37,7 +37,7 @@ String server, servicePrincipal, db, table, tableType; @JsonProperty - Long timestamp; + Long timestamp, writeId; @JsonProperty Map keyValues; @@ -52,8 +52,9 @@ public JSONAlterPartitionMessage(String server, String db, String table, Map keyValues, + Long writeId, Long timestamp) { - this(server, servicePrincipal, db, table, null, keyValues, timestamp); + this(server, servicePrincipal, db, table, null, keyValues, writeId, timestamp); } public JSONAlterPartitionMessage(String server, @@ -62,6 +63,7 @@ public JSONAlterPartitionMessage(String server, String table, String tableType, Map keyValues, + long writeId, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -70,6 +72,7 @@ public JSONAlterPartitionMessage(String server, this.tableType = tableType; this.timestamp = timestamp; this.keyValues = keyValues; + this.writeId = writeId; checkValid(); } @@ -108,6 +111,11 @@ public String getTableType() { return keyValues; } + @Override + public Long getWriteId() { + return writeId; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java index 13cd167f8d..6f81f24e97 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONAlterTableMessage.java @@ -35,7 +35,7 @@ String server, servicePrincipal, db, table, tableType; @JsonProperty - Long timestamp; + Long timestamp, writeId; /** * Default constructor, needed for Jackson. @@ -46,8 +46,9 @@ public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + Long writeId, Long timestamp) { - this(server, servicePrincipal, db, table, null, timestamp); + this(server, servicePrincipal, db, table, null, writeId, timestamp); } public JSONAlterTableMessage(String server, @@ -55,6 +56,7 @@ public JSONAlterTableMessage(String server, String db, String table, String tableType, + Long writeId, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; @@ -62,6 +64,7 @@ public JSONAlterTableMessage(String server, this.table = table; this.tableType = tableType; this.timestamp = timestamp; + this.writeId = writeId; checkValid(); } @@ -95,6 +98,11 @@ public String getTableType() { if (tableType != null) return tableType; else return ""; } + @Override + public Long getWriteId() { + return writeId; + } + @Override public String toString() { try { diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index 0973c18180..ec573a37a3 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -96,9 +96,9 @@ public CreateTableMessage buildCreateTableMessage(Table table) { } @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after) { + public AlterTableMessage buildAlterTableMessage(Table before, Table after, Long writeId) { return new JSONAlterTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), - before.getTableName(), before.getTableType(), now()); + before.getTableName(), before.getTableType(), writeId, now()); } @Override @@ -115,10 +115,11 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator selectStmtList, List expected truncateTable(primaryDbName, tableName); selectStmtList.add("select count(*) from " + tableName); expectedValues.add(new String[] {"0"}); + selectStmtList.add("select count(*) from " + tableName + "_nopart"); + expectedValues.add(new String[] {"0"}); insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); truncateTable(primaryDbName, tableNameMM); selectStmtList.add("select count(*) from " + tableNameMM); expectedValues.add(new String[] {"0"}); + selectStmtList.add("select count(*) from " + tableNameMM + "_nopart"); + expectedValues.add(new String[] {"0"}); + } + + private void appendAlterTable(List selectStmtList, List expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testAlterTable"; + String tableNameMM = tableName + "_MM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + primary.run("use " + primaryDbName) + .run("alter table " + tableName + " change value value1 int ") + .run("select value1 from " + tableName) + .verifyResults(new String[]{"1", "2", "3", "4", "5"}) + .run("alter table " + tableName + "_nopart change value value1 int ") + .run("select value1 from " + tableName + "_nopart") + .verifyResults(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select value1 from " + tableName ); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select value1 from " + tableName + "_nopart"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + primary.run("use " + primaryDbName) + .run("alter table " + tableNameMM + " change value value1 int ") + .run("select value1 from " + tableNameMM) + .verifyResults(new String[]{"1", "2", "3", "4", "5"}) + .run("alter table " + tableNameMM + "_nopart change value value1 int ") + .run("select value1 from " + tableNameMM + "_nopart") + .verifyResults(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select value1 from " + tableNameMM ); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select value1 from " + tableNameMM + "_nopart"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); } private void appendInsertIntoFromSelect(String tableName, String tableNameMM, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index d620243645..6ac59bff23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4949,8 +4949,11 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { // create the table if (crtTbl.getReplaceMode()) { + ReplicationSpec replicationSpec = crtTbl.getReplicationSpec(); + long writeId = replicationSpec != null && replicationSpec.isInReplicationScope() ? crtTbl.getReplWriteId() : 0L; // replace-mode creates are really alters using CreateTableDesc. - db.alterTable(tbl, false, null, true); + db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, null, + true, writeId); } else { if ((foreignKeys != null && foreignKeys.size() > 0) || (primaryKeys != null && primaryKeys.size() > 0) || @@ -5226,7 +5229,8 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H String tableName = truncateTableDesc.getTableName(); Map partSpec = truncateTableDesc.getPartSpec(); - if (!allowOperationInReplicationScope(db, tableName, partSpec, truncateTableDesc.getReplicationSpec())) { + ReplicationSpec replicationSpec = truncateTableDesc.getReplicationSpec(); + if (!allowOperationInReplicationScope(db, tableName, partSpec, replicationSpec)) { // no truncate, the table is missing either due to drop/rename which follows the truncate. // or the existing table is newer than our update. if (LOG.isDebugEnabled()) { @@ -5238,7 +5242,8 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H } try { - db.truncateTable(tableName, partSpec); + db.truncateTable(tableName, partSpec, + replicationSpec != null && replicationSpec.isInReplicationScope() ? truncateTableDesc.getWriteId() : 0L); } catch (Exception e) { throw new HiveException(e, ErrorMsg.GENERIC_ERROR); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 08a4506b22..660174de7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.hive.conf.Constants.MATERIALIZED_VIEW_REWRITING_TIME_WINDOW; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; +import static org.apache.hadoop.hive.ql.io.AcidUtils.getFullTableName; import static org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.makeBinaryPredicate; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; @@ -620,7 +621,12 @@ public void alterTable(String fullyQlfdTblName, Table newTbl, boolean cascade, } public void alterTable(String catName, String dbName, String tblName, Table newTbl, boolean cascade, - EnvironmentContext environmentContext, boolean transactional) + EnvironmentContext environmentContext, boolean transactional) throws HiveException { + alterTable(catName, dbName, tblName, newTbl, cascade, environmentContext, transactional, 0); + } + + public void alterTable(String catName, String dbName, String tblName, Table newTbl, boolean cascade, + EnvironmentContext environmentContext, boolean transactional, long replWriteId) throws HiveException { if (catName == null) { @@ -642,8 +648,13 @@ public void alterTable(String catName, String dbName, String tblName, Table newT // Take a table snapshot and set it to newTbl. AcidUtils.TableSnapshot tableSnapshot = null; if (transactional) { - // Make sure we pass in the names, so we can get the correct snapshot for rename table. - tableSnapshot = AcidUtils.getTableSnapshot(conf, newTbl, dbName, tblName, true); + if (replWriteId > 0) { + ValidWriteIdList writeIds = getMSC().getValidWriteIds(getFullTableName(dbName, tblName), replWriteId); + tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString()); + } else { + // Make sure we pass in the names, so we can get the correct snapshot for rename table. + tableSnapshot = AcidUtils.getTableSnapshot(conf, newTbl, dbName, tblName, true); + } if (tableSnapshot != null) { newTbl.getTTable().setWriteId(tableSnapshot.getWriteId()); } else { @@ -1075,12 +1086,18 @@ public void dropTable(String dbName, String tableName, boolean deleteData, * name of the table * @throws HiveException */ - public void truncateTable(String dbDotTableName, Map partSpec) throws HiveException { + public void truncateTable(String dbDotTableName, Map partSpec, Long writeId) throws HiveException { try { Table table = getTable(dbDotTableName, true); AcidUtils.TableSnapshot snapshot = null; if (AcidUtils.isTransactionalTable(table)) { - snapshot = AcidUtils.getTableSnapshot(conf, table, true); + if (writeId <= 0) { + snapshot = AcidUtils.getTableSnapshot(conf, table, true); + } else { + String fullTableName = getFullTableName(table.getDbName(), table.getTableName()); + ValidWriteIdList writeIdList = getMSC().getValidWriteIds(fullTableName, writeId); + snapshot = new TableSnapshot(writeId, writeIdList.writeToString()); + } } // TODO: APIs with catalog names diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index eb594f825d..6fbe29c5ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -155,7 +155,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), - null, getTxnMgr()); + null, getTxnMgr(), 0); } catch (SemanticException e) { throw e; @@ -201,7 +201,8 @@ public static boolean prepareImport(boolean isImportCmd, String parsedLocation, String parsedTableName, String overrideDBName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, - UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr + UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr, + long writeId // Initialize with 0 for non-ACID and non-MM tables. ) throws IOException, MetaException, HiveException, URISyntaxException { // initialize load path @@ -255,6 +256,7 @@ public static boolean prepareImport(boolean isImportCmd, tblDesc.setReplicationSpec(replicationSpec); StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE); inReplicationScope = true; + tblDesc.setReplWriteId(writeId); } if (isExternalSet) { @@ -330,7 +332,6 @@ public static boolean prepareImport(boolean isImportCmd, AcidUtils.setNonTransactional(tblDesc.getTblProps()); } - Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables. int stmtId = 0; if (!replicationSpec.isInReplicationScope() && ((tableExists && AcidUtils.isTransactionalTable(table)) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 3804396626..897ea7f414 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.io.BufferedWriter; import java.io.IOException; @@ -76,6 +77,7 @@ public void handle(Context withinContext) throws Exception { } } } + withinContext.createDmd(this).write(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index e9b6d3df6d..3fb18d8b25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; @@ -26,11 +28,23 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_PARTITION; +import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_TABLE; + public class TableHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { try { List> importTasks = new ArrayList<>(); + long writeId = 0; + + if (context.dmd.getDumpType().equals(EVENT_ALTER_TABLE)) { + AlterTableMessage message = deserializer.getAlterTableMessage(context.dmd.getPayload()); + writeId = message.getWriteId(); + } else if (context.dmd.getDumpType().equals(EVENT_ALTER_PARTITION)) { + AlterPartitionMessage message = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); + writeId = message.getWriteId(); + } context.nestedContext.setConf(context.hiveConf); EximUtil.SemanticAnalyzerWrapperContext x = @@ -43,7 +57,7 @@ // Also, REPL LOAD doesn't support external table and hence no location set as well. ImportSemanticAnalyzer.prepareImport(false, false, false, false, (context.precursor != null), null, context.tableName, context.dbName, - null, context.location, x, updatedMetadata, context.getTxnMgr()); + null, context.location, x, updatedMetadata, context.getTxnMgr(), writeId); return importTasks; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index b4fc0009ad..0d5ac31fd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -57,10 +57,11 @@ TruncateTableDesc truncateTableDesc = new TruncateTableDesc( actualDbName + "." + actualTblName, partSpec, context.eventOnlyReplicationSpec()); + truncateTableDesc.setWriteId(msg.getWriteId()); Task truncatePtnTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), context.hiveConf); - context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), - truncateTableDesc.getTableName()); + context.log.debug("Added truncate ptn task : {}:{}:{}", truncatePtnTask.getId(), + truncateTableDesc.getTableName(), truncateTableDesc.getWriteId()); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, partSpec); return Collections.singletonList(truncatePtnTask); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index fe73a180be..d18a9e1fd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -38,11 +38,12 @@ TruncateTableDesc truncateTableDesc = new TruncateTableDesc( actualDbName + "." + actualTblName, null, context.eventOnlyReplicationSpec()); + truncateTableDesc.setWriteId(msg.getWriteId()); Task truncateTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), context.hiveConf); - context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), - truncateTableDesc.getTableName()); + context.log.debug("Added truncate tbl task : {}:{}:{}", truncateTableTask.getId(), + truncateTableDesc.getTableName(), truncateTableDesc.getWriteId()); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(truncateTableTask); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index 0fadf1b61f..27f677e1a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -109,6 +109,7 @@ // The FSOP configuration for the FSOP that is going to write initial data during ctas. // This is not needed beyond compilation, so it is transient. private transient FileSinkDesc writer; + private Long replWriteId; // to be used by repl task to get the txn and valid write id list public CreateTableDesc() { } @@ -902,4 +903,12 @@ public FileSinkDesc getAndUnsetWriter() { public void setWriter(FileSinkDesc writer) { this.writer = writer; } + + public Long getReplWriteId() { + return replWriteId; + } + + public void setReplWriteId(Long replWriteId) { + this.replWriteId = replWriteId; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java index ef7325fe2c..50b43bad31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java @@ -359,4 +359,10 @@ public Table toTable(HiveConf conf) throws Exception { return null; } } + + public void setReplWriteId(Long replWriteId) { + if (this.createTblDesc != null) { + this.createTblDesc.setReplWriteId(replWriteId); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java index 9e83576e6b..61deb24eef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java @@ -141,6 +141,10 @@ public boolean mayNeedWriteId() { return isTransactional; } + public long getWriteId() { + return writeId; + } + @Override public String toString() { return this.getClass().getSimpleName() + " for " + getFullTableName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 981f6aed5f..302269b8d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -500,7 +500,7 @@ private ValidReaderWriteIdList getWriteIds( TableName fullTableName) throws NoSuchTxnException, MetaException { // TODO: acid utils don't support catalogs GetValidWriteIdsRequest req = new GetValidWriteIdsRequest( - Lists.newArrayList(fullTableName.getDbTable()), null); + Lists.newArrayList(fullTableName.getDbTable())); return TxnCommonUtils.createValidReaderWriteIdList( txnHandler.getValidWriteIds(req).getTblValidWriteIds().get(0)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a61b6e8d37..beb6902d7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -148,7 +148,7 @@ public void run() { // The response will have one entry per table and hence we get only one ValidWriteIdList String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); GetValidWriteIdsRequest rqst - = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index e77358b0e4..d9f186cd03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -144,7 +144,7 @@ public void run() { // Compaction doesn't work under a transaction and hence pass 0 for current txn Id // The response will have one entry per table and hence we get only one OpenWriteIds String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsRequest.java index a5bbb86af1..5dbdc3ee5c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsRequest.java @@ -40,6 +40,7 @@ private static final org.apache.thrift.protocol.TField FULL_TABLE_NAMES_FIELD_DESC = new org.apache.thrift.protocol.TField("fullTableNames", org.apache.thrift.protocol.TType.LIST, (short)1); private static final org.apache.thrift.protocol.TField VALID_TXN_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("validTxnList", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField WRITE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("writeId", org.apache.thrift.protocol.TType.I64, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -48,12 +49,14 @@ } private List fullTableNames; // required - private String validTxnList; // required + private String validTxnList; // optional + private long writeId; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { FULL_TABLE_NAMES((short)1, "fullTableNames"), - VALID_TXN_LIST((short)2, "validTxnList"); + VALID_TXN_LIST((short)2, "validTxnList"), + WRITE_ID((short)3, "writeId"); private static final Map byName = new HashMap(); @@ -72,6 +75,8 @@ public static _Fields findByThriftId(int fieldId) { return FULL_TABLE_NAMES; case 2: // VALID_TXN_LIST return VALID_TXN_LIST; + case 3: // WRITE_ID + return WRITE_ID; default: return null; } @@ -112,14 +117,19 @@ public String getFieldName() { } // isset id assignments + private static final int __WRITEID_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.VALID_TXN_LIST,_Fields.WRITE_ID}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.FULL_TABLE_NAMES, new org.apache.thrift.meta_data.FieldMetaData("fullTableNames", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); - tmpMap.put(_Fields.VALID_TXN_LIST, new org.apache.thrift.meta_data.FieldMetaData("validTxnList", org.apache.thrift.TFieldRequirementType.REQUIRED, + tmpMap.put(_Fields.VALID_TXN_LIST, new org.apache.thrift.meta_data.FieldMetaData("validTxnList", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.WRITE_ID, new org.apache.thrift.meta_data.FieldMetaData("writeId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetValidWriteIdsRequest.class, metaDataMap); } @@ -128,18 +138,17 @@ public GetValidWriteIdsRequest() { } public GetValidWriteIdsRequest( - List fullTableNames, - String validTxnList) + List fullTableNames) { this(); this.fullTableNames = fullTableNames; - this.validTxnList = validTxnList; } /** * Performs a deep copy on other. */ public GetValidWriteIdsRequest(GetValidWriteIdsRequest other) { + __isset_bitfield = other.__isset_bitfield; if (other.isSetFullTableNames()) { List __this__fullTableNames = new ArrayList(other.fullTableNames); this.fullTableNames = __this__fullTableNames; @@ -147,6 +156,7 @@ public GetValidWriteIdsRequest(GetValidWriteIdsRequest other) { if (other.isSetValidTxnList()) { this.validTxnList = other.validTxnList; } + this.writeId = other.writeId; } public GetValidWriteIdsRequest deepCopy() { @@ -157,6 +167,8 @@ public GetValidWriteIdsRequest deepCopy() { public void clear() { this.fullTableNames = null; this.validTxnList = null; + setWriteIdIsSet(false); + this.writeId = 0; } public int getFullTableNamesSize() { @@ -220,6 +232,28 @@ public void setValidTxnListIsSet(boolean value) { } } + public long getWriteId() { + return this.writeId; + } + + public void setWriteId(long writeId) { + this.writeId = writeId; + setWriteIdIsSet(true); + } + + public void unsetWriteId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __WRITEID_ISSET_ID); + } + + /** Returns true if field writeId is set (has been assigned a value) and false otherwise */ + public boolean isSetWriteId() { + return EncodingUtils.testBit(__isset_bitfield, __WRITEID_ISSET_ID); + } + + public void setWriteIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __WRITEID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case FULL_TABLE_NAMES: @@ -238,6 +272,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case WRITE_ID: + if (value == null) { + unsetWriteId(); + } else { + setWriteId((Long)value); + } + break; + } } @@ -249,6 +291,9 @@ public Object getFieldValue(_Fields field) { case VALID_TXN_LIST: return getValidTxnList(); + case WRITE_ID: + return getWriteId(); + } throw new IllegalStateException(); } @@ -264,6 +309,8 @@ public boolean isSet(_Fields field) { return isSetFullTableNames(); case VALID_TXN_LIST: return isSetValidTxnList(); + case WRITE_ID: + return isSetWriteId(); } throw new IllegalStateException(); } @@ -299,6 +346,15 @@ public boolean equals(GetValidWriteIdsRequest that) { return false; } + boolean this_present_writeId = true && this.isSetWriteId(); + boolean that_present_writeId = true && that.isSetWriteId(); + if (this_present_writeId || that_present_writeId) { + if (!(this_present_writeId && that_present_writeId)) + return false; + if (this.writeId != that.writeId) + return false; + } + return true; } @@ -316,6 +372,11 @@ public int hashCode() { if (present_validTxnList) list.add(validTxnList); + boolean present_writeId = true && (isSetWriteId()); + list.add(present_writeId); + if (present_writeId) + list.add(writeId); + return list.hashCode(); } @@ -347,6 +408,16 @@ public int compareTo(GetValidWriteIdsRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetWriteId()).compareTo(other.isSetWriteId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetWriteId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.writeId, other.writeId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -374,14 +445,22 @@ public String toString() { sb.append(this.fullTableNames); } first = false; - if (!first) sb.append(", "); - sb.append("validTxnList:"); - if (this.validTxnList == null) { - sb.append("null"); - } else { - sb.append(this.validTxnList); + if (isSetValidTxnList()) { + if (!first) sb.append(", "); + sb.append("validTxnList:"); + if (this.validTxnList == null) { + sb.append("null"); + } else { + sb.append(this.validTxnList); + } + first = false; + } + if (isSetWriteId()) { + if (!first) sb.append(", "); + sb.append("writeId:"); + sb.append(this.writeId); + first = false; } - first = false; sb.append(")"); return sb.toString(); } @@ -392,10 +471,6 @@ public void validate() throws org.apache.thrift.TException { throw new org.apache.thrift.protocol.TProtocolException("Required field 'fullTableNames' is unset! Struct:" + toString()); } - if (!isSetValidTxnList()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'validTxnList' is unset! Struct:" + toString()); - } - // check for sub-struct validity } @@ -409,6 +484,8 @@ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOExcept private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -459,6 +536,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, GetValidWriteIdsReq org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // WRITE_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.writeId = iprot.readI64(); + struct.setWriteIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -485,8 +570,15 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, GetValidWriteIdsRe oprot.writeFieldEnd(); } if (struct.validTxnList != null) { - oprot.writeFieldBegin(VALID_TXN_LIST_FIELD_DESC); - oprot.writeString(struct.validTxnList); + if (struct.isSetValidTxnList()) { + oprot.writeFieldBegin(VALID_TXN_LIST_FIELD_DESC); + oprot.writeString(struct.validTxnList); + oprot.writeFieldEnd(); + } + } + if (struct.isSetWriteId()) { + oprot.writeFieldBegin(WRITE_ID_FIELD_DESC); + oprot.writeI64(struct.writeId); oprot.writeFieldEnd(); } oprot.writeFieldStop(); @@ -513,7 +605,20 @@ public void write(org.apache.thrift.protocol.TProtocol prot, GetValidWriteIdsReq oprot.writeString(_iter622); } } - oprot.writeString(struct.validTxnList); + BitSet optionals = new BitSet(); + if (struct.isSetValidTxnList()) { + optionals.set(0); + } + if (struct.isSetWriteId()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetValidTxnList()) { + oprot.writeString(struct.validTxnList); + } + if (struct.isSetWriteId()) { + oprot.writeI64(struct.writeId); + } } @Override @@ -530,8 +635,15 @@ public void read(org.apache.thrift.protocol.TProtocol prot, GetValidWriteIdsRequ } } struct.setFullTableNamesIsSet(true); - struct.validTxnList = iprot.readString(); - struct.setValidTxnListIsSet(true); + BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.validTxnList = iprot.readString(); + struct.setValidTxnListIsSet(true); + } + if (incoming.get(1)) { + struct.writeId = iprot.readI64(); + struct.setWriteIdIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java index 47f96f323a..ae0956870a 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java @@ -230374,7 +230374,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, create_ischema_args private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("create_ischema_result"); private static final org.apache.thrift.protocol.TField O1_FIELD_DESC = new org.apache.thrift.protocol.TField("o1", org.apache.thrift.protocol.TType.STRUCT, (short)1); - private static final org.apache.thrift.protocol.TField O2_FIELD_DESC = new org.apache.thrift.protocol.TField("o2", org.apache.thrift.protocol.TType.STRUCT, (short)-1); + private static final org.apache.thrift.protocol.TField O2_FIELD_DESC = new org.apache.thrift.protocol.TField("o2", org.apache.thrift.protocol.TType.STRUCT, (short)2); private static final org.apache.thrift.protocol.TField O3_FIELD_DESC = new org.apache.thrift.protocol.TField("o3", org.apache.thrift.protocol.TType.STRUCT, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); @@ -230390,7 +230390,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, create_ischema_args /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { O1((short)1, "o1"), - O2((short)-1, "o2"), + O2((short)2, "o2"), O3((short)3, "o3"); private static final Map byName = new HashMap(); @@ -230408,7 +230408,7 @@ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { case 1: // O1 return O1; - case -1: // O2 + case 2: // O2 return O2; case 3: // O3 return O3; @@ -230833,7 +230833,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, create_ischema_resu org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case -1: // O2 + case 2: // O2 if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { struct.o2 = new NoSuchObjectException(); struct.o2.read(iprot); @@ -230864,16 +230864,16 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, create_ischema_res struct.validate(); oprot.writeStructBegin(STRUCT_DESC); - if (struct.o2 != null) { - oprot.writeFieldBegin(O2_FIELD_DESC); - struct.o2.write(oprot); - oprot.writeFieldEnd(); - } if (struct.o1 != null) { oprot.writeFieldBegin(O1_FIELD_DESC); struct.o1.write(oprot); oprot.writeFieldEnd(); } + if (struct.o2 != null) { + oprot.writeFieldBegin(O2_FIELD_DESC); + struct.o2.write(oprot); + oprot.writeFieldEnd(); + } if (struct.o3 != null) { oprot.writeFieldBegin(O3_FIELD_DESC); struct.o3.write(oprot); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php index 0973f4f3c1..4574c6a492 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php @@ -58155,7 +58155,7 @@ class ThriftHiveMetastore_create_ischema_result { 'type' => TType::STRUCT, 'class' => '\metastore\AlreadyExistsException', ), - -1 => array( + 2 => array( 'var' => 'o2', 'type' => TType::STRUCT, 'class' => '\metastore\NoSuchObjectException', @@ -58207,7 +58207,7 @@ class ThriftHiveMetastore_create_ischema_result { $xfer += $input->skip($ftype); } break; - case -1: + case 2: if ($ftype == TType::STRUCT) { $this->o2 = new \metastore\NoSuchObjectException(); $xfer += $this->o2->read($input); @@ -58236,16 +58236,16 @@ class ThriftHiveMetastore_create_ischema_result { public function write($output) { $xfer = 0; $xfer += $output->writeStructBegin('ThriftHiveMetastore_create_ischema_result'); - if ($this->o2 !== null) { - $xfer += $output->writeFieldBegin('o2', TType::STRUCT, -1); - $xfer += $this->o2->write($output); - $xfer += $output->writeFieldEnd(); - } if ($this->o1 !== null) { $xfer += $output->writeFieldBegin('o1', TType::STRUCT, 1); $xfer += $this->o1->write($output); $xfer += $output->writeFieldEnd(); } + if ($this->o2 !== null) { + $xfer += $output->writeFieldBegin('o2', TType::STRUCT, 2); + $xfer += $this->o2->write($output); + $xfer += $output->writeFieldEnd(); + } if ($this->o3 !== null) { $xfer += $output->writeFieldBegin('o3', TType::STRUCT, 3); $xfer += $this->o3->write($output); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 5ed4f71b1d..22deffe1d3 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -17822,6 +17822,10 @@ class GetValidWriteIdsRequest { * @var string */ public $validTxnList = null; + /** + * @var int + */ + public $writeId = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -17838,6 +17842,10 @@ class GetValidWriteIdsRequest { 'var' => 'validTxnList', 'type' => TType::STRING, ), + 3 => array( + 'var' => 'writeId', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -17847,6 +17855,9 @@ class GetValidWriteIdsRequest { if (isset($vals['validTxnList'])) { $this->validTxnList = $vals['validTxnList']; } + if (isset($vals['writeId'])) { + $this->writeId = $vals['writeId']; + } } } @@ -17893,6 +17904,13 @@ class GetValidWriteIdsRequest { $xfer += $input->skip($ftype); } break; + case 3: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->writeId); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -17928,6 +17946,11 @@ class GetValidWriteIdsRequest { $xfer += $output->writeString($this->validTxnList); $xfer += $output->writeFieldEnd(); } + if ($this->writeId !== null) { + $xfer += $output->writeFieldBegin('writeId', TType::I64, 3); + $xfer += $output->writeI64($this->writeId); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py index 3c0d0a55b1..38074ce79b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py @@ -47673,7 +47673,13 @@ class create_ischema_result: - o3 """ - thrift_spec = None + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'o1', (AlreadyExistsException, AlreadyExistsException.thrift_spec), None, ), # 1 + (2, TType.STRUCT, 'o2', (NoSuchObjectException, NoSuchObjectException.thrift_spec), None, ), # 2 + (3, TType.STRUCT, 'o3', (MetaException, MetaException.thrift_spec), None, ), # 3 + ) + def __init__(self, o1=None, o2=None, o3=None,): self.o1 = o1 self.o2 = o2 @@ -47694,7 +47700,7 @@ def read(self, iprot): self.o1.read(iprot) else: iprot.skip(ftype) - elif fid == -1: + elif fid == 2: if ftype == TType.STRUCT: self.o2 = NoSuchObjectException() self.o2.read(iprot) @@ -47716,14 +47722,14 @@ def write(self, oprot): oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) return oprot.writeStructBegin('create_ischema_result') - if self.o2 is not None: - oprot.writeFieldBegin('o2', TType.STRUCT, -1) - self.o2.write(oprot) - oprot.writeFieldEnd() if self.o1 is not None: oprot.writeFieldBegin('o1', TType.STRUCT, 1) self.o1.write(oprot) oprot.writeFieldEnd() + if self.o2 is not None: + oprot.writeFieldBegin('o2', TType.STRUCT, 2) + self.o2.write(oprot) + oprot.writeFieldEnd() if self.o3 is not None: oprot.writeFieldBegin('o3', TType.STRUCT, 3) self.o3.write(oprot) diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 7fc1e43de0..38fac465d7 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -12413,17 +12413,20 @@ class GetValidWriteIdsRequest: Attributes: - fullTableNames - validTxnList + - writeId """ thrift_spec = ( None, # 0 (1, TType.LIST, 'fullTableNames', (TType.STRING,None), None, ), # 1 (2, TType.STRING, 'validTxnList', None, None, ), # 2 + (3, TType.I64, 'writeId', None, None, ), # 3 ) - def __init__(self, fullTableNames=None, validTxnList=None,): + def __init__(self, fullTableNames=None, validTxnList=None, writeId=None,): self.fullTableNames = fullTableNames self.validTxnList = validTxnList + self.writeId = writeId def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -12449,6 +12452,11 @@ def read(self, iprot): self.validTxnList = iprot.readString() else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.writeId = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -12470,14 +12478,16 @@ def write(self, oprot): oprot.writeFieldBegin('validTxnList', TType.STRING, 2) oprot.writeString(self.validTxnList) oprot.writeFieldEnd() + if self.writeId is not None: + oprot.writeFieldBegin('writeId', TType.I64, 3) + oprot.writeI64(self.writeId) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() def validate(self): if self.fullTableNames is None: raise TProtocol.TProtocolException(message='Required field fullTableNames is unset!') - if self.validTxnList is None: - raise TProtocol.TProtocolException(message='Required field validTxnList is unset!') return @@ -12485,6 +12495,7 @@ def __hash__(self): value = 17 value = (value * 31) ^ hash(self.fullTableNames) value = (value * 31) ^ hash(self.validTxnList) + value = (value * 31) ^ hash(self.writeId) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index e0c6c02715..0192c6da31 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2756,17 +2756,18 @@ class GetValidWriteIdsRequest include ::Thrift::Struct, ::Thrift::Struct_Union FULLTABLENAMES = 1 VALIDTXNLIST = 2 + WRITEID = 3 FIELDS = { FULLTABLENAMES => {:type => ::Thrift::Types::LIST, :name => 'fullTableNames', :element => {:type => ::Thrift::Types::STRING}}, - VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList'} + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, + WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId', :optional => true} } def struct_fields; FIELDS; end def validate raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field fullTableNames is unset!') unless @fullTableNames - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field validTxnList is unset!') unless @validTxnList end ::Thrift::Struct.generate_accessors self diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index e54a7321e2..e6a72762bb 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -13573,7 +13573,7 @@ module ThriftHiveMetastore class Create_ischema_result include ::Thrift::Struct, ::Thrift::Struct_Union O1 = 1 - O2 = -1 + O2 = 2 O3 = 3 FIELDS = { diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index feb44d5159..85a5c601e0 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -925,7 +925,8 @@ struct ReplTblWriteIdStateRequest { // Request msg to get the valid write ids list for the given list of tables wrt to input validTxnList struct GetValidWriteIdsRequest { 1: required list fullTableNames, // Full table names of format . - 2: required string validTxnList, // Valid txn list string wrt the current txn of the caller + 2: optional string validTxnList, // Valid txn list string wrt the current txn of the caller + 3: optional i64 writeId, //write id to be used to get the current txn id } // Valid Write ID list of one table wrt to current txn @@ -2289,7 +2290,7 @@ service ThriftHiveMetastore extends fb303.FacebookService // Schema calls void create_ischema(1:ISchema schema) throws(1:AlreadyExistsException o1, - NoSuchObjectException o2, 3:MetaException o3) + 2: NoSuchObjectException o2, 3:MetaException o3) void alter_ischema(1:AlterISchemaRequest rqst) throws(1:NoSuchObjectException o1, 2:MetaException o2) ISchema get_ischema(1:ISchemaName name) throws (1:NoSuchObjectException o1, 2:MetaException o2) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 6a1cd3bc9a..f52ff91a8f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -360,7 +360,8 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam if (transactionalListeners != null && !transactionalListeners.isEmpty()) { txnAlterTableEventResponses = MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_TABLE, - new AlterTableEvent(oldt, newt, false, true, handler), + new AlterTableEvent(oldt, newt, false, true, + newt.getWriteId(), handler), environmentContext); } // commit the changes @@ -405,7 +406,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam // make this call whether the event failed or succeeded. To make this behavior consistent, // this call is made for failed events also. MetaStoreListenerNotifier.notifyEvent(listeners, EventMessage.EventType.ALTER_TABLE, - new AlterTableEvent(oldt, newt, false, success, handler), + new AlterTableEvent(oldt, newt, false, success, newt.getWriteId(), handler), environmentContext, txnAlterTableEventResponses, msdb); } } @@ -487,7 +488,8 @@ public Partition alterPartition(RawStore msdb, Warehouse wh, String catName, Str if (transactionalListeners != null && !transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldPart, new_part, tbl, false, true, handler), + new AlterPartitionEvent(oldPart, new_part, tbl, false, + true, new_part.getWriteId(), handler), environmentContext); @@ -635,7 +637,8 @@ public Partition alterPartition(RawStore msdb, Warehouse wh, String catName, Str if (transactionalListeners != null && !transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldPart, new_part, tbl, false, true, handler), + new AlterPartitionEvent(oldPart, new_part, tbl, false, + true, new_part.getWriteId(), handler), environmentContext); } @@ -742,7 +745,8 @@ public Partition alterPartition(RawStore msdb, Warehouse wh, String catName, Str if (transactionalListeners != null && !transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventMessage.EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldPart, newPart, tbl, false, true, handler)); + new AlterPartitionEvent(oldPart, newPart, tbl, false, + true, newPart.getWriteId(), handler)); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 33b22a9fc3..ba82a9327c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2708,13 +2708,15 @@ private void alterPartitionForTruncate(RawStore ms, String catName, String dbNam if (!transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ALTER_PARTITION, - new AlterPartitionEvent(partition, partition, table, true, true, this)); + new AlterPartitionEvent(partition, partition, table, true, true, + writeId, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ALTER_PARTITION, - new AlterPartitionEvent(partition, partition, table, true, true, this)); + new AlterPartitionEvent(partition, partition, table, true, true, + writeId, this)); } if (writeId > 0) { @@ -2740,13 +2742,15 @@ private void alterTableStatsForTruncate(RawStore ms, String catName, String dbNa if (!transactionalListeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.ALTER_TABLE, - new AlterTableEvent(table, table, true, true, this)); + new AlterTableEvent(table, table, true, true, + writeId, this)); } if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ALTER_TABLE, - new AlterTableEvent(table, table, true, true, this)); + new AlterTableEvent(table, table, true, true, + writeId, this)); } // TODO: this should actually pass thru and set writeId for txn stats. @@ -4919,7 +4923,8 @@ private void rename_partition(String catName, String db_name, String tbl_name, MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldPart, new_part, table, false, true, this), + new AlterPartitionEvent(oldPart, new_part, table, false, + true, new_part.getWriteId(), this), envContext); } } catch (InvalidObjectException e) { @@ -5019,7 +5024,8 @@ private void alter_partitions_with_environment_context(String catName, String db if (!listeners.isEmpty()) { MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ALTER_PARTITION, - new AlterPartitionEvent(oldTmpPart, tmpPart, table, false, true, this)); + new AlterPartitionEvent(oldTmpPart, tmpPart, table, false, + true, writeId, this)); } } } catch (InvalidObjectException e) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index a0ff79cc5c..c962ccc93a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2736,7 +2736,15 @@ public ValidTxnList getValidTxns(long currentTxn) throws TException { @Override public ValidWriteIdList getValidWriteIds(String fullTableName) throws TException { - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + GetValidWriteIdsResponse validWriteIds = client.get_valid_write_ids(rqst); + return TxnCommonUtils.createValidReaderWriteIdList(validWriteIds.getTblValidWriteIds().get(0)); + } + + @Override + public ValidWriteIdList getValidWriteIds(String fullTableName, Long writeId) throws TException { + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + rqst.setWriteId(writeId); GetValidWriteIdsResponse validWriteIds = client.get_valid_write_ids(rqst); return TxnCommonUtils.createValidReaderWriteIdList(validWriteIds.getTblValidWriteIds().get(0)); } @@ -2744,7 +2752,8 @@ public ValidWriteIdList getValidWriteIds(String fullTableName) throws TException @Override public List getValidWriteIds( List tablesList, String validTxnList) throws TException { - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList); + rqst.setValidTxnList(validTxnList); return client.get_valid_write_ids(rqst).getTblValidWriteIds(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index ac10da2f3e..54e7eda0da 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -2814,13 +2814,22 @@ Function getFunction(String catName, String dbName, String funcName) ValidTxnList getValidTxns(long currentTxn) throws TException; /** - * Get a structure that details valid transactions. + * Get a structure that details valid write ids. * @param fullTableName full table name of format . * @return list of valid write ids for the given table * @throws TException */ ValidWriteIdList getValidWriteIds(String fullTableName) throws TException; + /** + * Get a structure that details valid write ids. + * @param fullTableName full table name of format . + * @param writeId The write id to get the corresponding txn + * @return list of valid write ids for the given table + * @throws TException + */ + ValidWriteIdList getValidWriteIds(String fullTableName, Long writeId) throws TException; + /** * Get a structure that details valid write ids list for all tables read by current txn. * @param tablesList list of tables (format: .) read from the current transaction diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java index 09cad85334..499c6e4f31 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java @@ -32,14 +32,16 @@ private final Partition newPart; private final Table table; private final boolean isTruncateOp; + private Long writeId; public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp, - boolean status, IHMSHandler handler) { + boolean status, Long writeId, IHMSHandler handler) { super(status, handler); this.oldPart = oldPart; this.newPart = newPart; this.table = table; this.isTruncateOp = isTruncateOp; + this.writeId = writeId; } /** @@ -72,4 +74,8 @@ public Table getTable() { public boolean getIsTruncateOp() { return isTruncateOp; } + + public Long getWriteId() { + return writeId; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java index 2e3f6f00ee..541fbe48ac 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java @@ -31,13 +31,15 @@ private final Table newTable; private final Table oldTable; private final boolean isTruncateOp; + private Long writeId; public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, - IHMSHandler handler) { + Long writeId, IHMSHandler handler) { super (status, handler); this.oldTable = oldTable; this.newTable = newTable; this.isTruncateOp = isTruncateOp; + this.writeId = writeId; } /** @@ -60,4 +62,8 @@ public Table getNewTable() { public boolean getIsTruncateOp() { return isTruncateOp; } + + public Long getWriteId() { + return writeId; + } } \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java index aaa7ef5299..a1ba01a36f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -65,5 +65,7 @@ public EventMessage checkValid() { } return super.checkValid(); } + + public abstract Long getWriteId(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java index 30e28629a0..bbc01c1a5e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -55,4 +55,6 @@ public EventMessage checkValid() { } return super.checkValid(); } + + public abstract Long getWriteId(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index d529147f1c..7ff168f793 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -178,9 +178,11 @@ public static MessageDeserializer getDeserializer(String format, * @param before The table before the alter * @param after The table after the alter * @param isTruncateOp Flag to denote truncate table + * @param writeId writeId under which alter is done (for ACID tables) * @return */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp); + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, + Long writeId); /** * Factory method for DropTableMessage. @@ -205,10 +207,12 @@ public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Iterat * @param before The partition before it was altered * @param after The partition after it was altered * @param isTruncateOp Flag to denote truncate partition + * @param writeId writeId under which alter is done (for ACID tables) * @return a new AlterPartitionMessage */ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp); + Partition after, boolean isTruncateOp, + Long writeId); /** * Factory method for DropPartitionMessage. diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index 2ddec51fcc..9b85f4c1c8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -39,7 +39,7 @@ String isTruncateOp; @JsonProperty - Long timestamp; + Long timestamp, writeId; @JsonProperty Map keyValues; @@ -54,7 +54,7 @@ public JSONAlterPartitionMessage() { } public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, - Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long timestamp) { + Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long writeId, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObj.getDbName(); @@ -63,6 +63,7 @@ public JSONAlterPartitionMessage(String server, String servicePrincipal, Table t this.isTruncateOp = Boolean.toString(isTruncateOp); this.timestamp = timestamp; this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore); + this.writeId = writeId; try { this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); this.partitionObjBeforeJson = JSONMessageFactory.createPartitionObjJson(partitionObjBefore); @@ -142,6 +143,11 @@ public String getPartitionObjAfterJson() { return partitionObjAfterJson; } + @Override + public Long getWriteId() { + return writeId; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index d398fa098a..eddff98891 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -36,7 +36,7 @@ String isTruncateOp; @JsonProperty - Long timestamp; + Long timestamp, writeId; /** * Default constructor, needed for Jackson. @@ -45,7 +45,7 @@ public JSONAlterTableMessage() { } public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter, - boolean isTruncateOp, Long timestamp) { + boolean isTruncateOp, Long writeId, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObjBefore.getDbName(); @@ -53,6 +53,7 @@ public JSONAlterTableMessage(String server, String servicePrincipal, Table table this.tableType = tableObjBefore.getTableType(); this.isTruncateOp = Boolean.toString(isTruncateOp); this.timestamp = timestamp; + this.writeId = writeId; try { this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore); this.tableObjAfterJson = JSONMessageFactory.createTableObjJson(tableObjAfter); @@ -117,6 +118,11 @@ public String getTableObjAfterJson() { return tableObjAfterJson ; } + @Override + public Long getWriteId() { + return writeId ; + } + @Override public String toString() { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 64126745a2..2668b05320 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -125,9 +125,8 @@ public CreateTableMessage buildCreateTableMessage(Table table, Iterator } @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp) { - return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - before, after, isTruncateOp, now()); + public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, Long writeId) { + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, writeId, now()); } @Override @@ -144,9 +143,9 @@ public AddPartitionMessage buildAddPartitionMessage(Table table, @Override public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp) { + Partition after, boolean isTruncateOp, Long writeId) { return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - table, before, after, isTruncateOp, now()); + table, before, after, isTruncateOp, writeId, now()); } @Override diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2df61b41ea..de787678a8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -26,6 +26,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; +import java.sql.PreparedStatement; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -1236,24 +1237,38 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx return abortedWriteIds; } + private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Long writeId) throws MetaException, + SQLException { + PreparedStatement pst = null; + ResultSet rs = null; + try { + String[] names = TxnUtils.getDbTableName(fullTableName); + assert names.length == 2; + String s = "select t2w_txnid from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ? and t2w_writeid = ?"; + pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); + pst.setString(1, names[0]); + pst.setString(2, names[1]); + pst.setLong(3, writeId); + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), + quoteString(names[1]), writeId); + rs = pst.executeQuery(); + if (rs.next()) { + return TxnCommonUtils.createValidReadTxnList(getOpenTxns(), rs.getLong(1)); + } + throw new MetaException("invalid write id " + writeId + " for table " + fullTableName); + } finally { + close(rs, pst, null); + } + } + @Override @RetrySemantics.ReadOnly - public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) - throws NoSuchTxnException, MetaException { + public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; ValidTxnList validTxnList; - // We should prepare the valid write ids list based on validTxnList of current txn. - // If no txn exists in the caller, then they would pass null for validTxnList and so it is - // required to get the current state of txns to make validTxnList - if (rqst.isSetValidTxnList()) { - validTxnList = new ValidReadTxnList(rqst.getValidTxnList()); - } else { - // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn - validTxnList = TxnCommonUtils.createValidReadTxnList(getOpenTxns(), 0); - } try { /** * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} @@ -1261,6 +1276,19 @@ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); + // We should prepare the valid write ids list based on validTxnList of current txn. + // If no txn exists in the caller, then they would pass null for validTxnList and so it is + // required to get the current state of txns to make validTxnList + if (rqst.isSetValidTxnList()) { + assert rqst.isSetWriteId() == false; + validTxnList = new ValidReadTxnList(rqst.getValidTxnList()); + } else if (rqst.isSetWriteId()) { + validTxnList = getValidTxnList(dbConn, rqst.getFullTableNames().get(0), rqst.getWriteId()); + } else { + // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn + validTxnList = TxnCommonUtils.createValidReadTxnList(getOpenTxns(), 0); + } + // Get the valid write id list for all the tables read by the current txn List tblValidWriteIdsList = new ArrayList<>(); for (String fullTableName : rqst.getFullTableNames()) { diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 4093aa7a18..ce590d0f55 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2197,7 +2197,15 @@ public ValidTxnList getValidTxns(long currentTxn) throws TException { @Override public ValidWriteIdList getValidWriteIds(String fullTableName) throws TException { - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + GetValidWriteIdsResponse validWriteIds = client.get_valid_write_ids(rqst); + return TxnCommonUtils.createValidReaderWriteIdList(validWriteIds.getTblValidWriteIds().get(0)); + } + + @Override + public ValidWriteIdList getValidWriteIds(String fullTableName, Long writeId) throws TException { + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + rqst.setWriteId(writeId); GetValidWriteIdsResponse validWriteIds = client.get_valid_write_ids(rqst); return TxnCommonUtils.createValidReaderWriteIdList(validWriteIds.getTblValidWriteIds().get(0)); } @@ -2205,7 +2213,8 @@ public ValidWriteIdList getValidWriteIds(String fullTableName) throws TException @Override public List getValidWriteIds(List tablesList, String validTxnList) throws TException { - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList); + rqst.setValidTxnList(validTxnList); return client.get_valid_write_ids(rqst).getTblValidWriteIds(); }