diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 39d97fc782..d6503ca583 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; @@ -471,6 +472,16 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { process(event, insertEvent); } + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException { + NotificationEvent event = + new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msgFactory.buildOpenTxnMessage( + openTxnEvent.getTxnId()) + .toString()); + + process(event, openTxnEvent); + } + /** * @param partSetDoneEvent * @throws MetaException diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 9614114083..6c000de81d 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; @@ -228,6 +229,10 @@ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException { public void onInsert(InsertEvent insertEvent) throws MetaException { pushEventId(EventType.INSERT, insertEvent); } + + public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException { + pushEventId(EventType.OPEN_TXN, openTxnEvent); + } } @SuppressWarnings("rawtypes") diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index d763666ab3..582d8194f0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3551,6 +3551,44 @@ public void testRecycleFileDropTempTable() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testOpenTxnRelication() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + + run("CREATE TABLE " + dbName + ".unptned(a int) PARTITIONED BY (load_date date) CLUSTERED BY(a) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + advanceDumpDir(); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); + LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + + run("INSERT INTO " + dbName + ".unptned values (1)", driver); + + // Perform REPL-DUMP/LOAD + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String incrementalDumpLocn = getResult(0,0,driver); + String incrementalDumpId = getResult(0,1,true,driver); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); + + run("REPL STATUS " + dbName + "_dupe", driverMirror); + verifyResults(new String[] {incrementalDumpId}, driverMirror); + + // VERIFY tables and partitions on destination for equivalence. + String[] unptn_data = new String[]{ "1" }; + String[] empty = new String[]{}; + + //verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); + //verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty, driverMirror); + } + private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { MessageFactory msgFactory = MessageFactory.getInstance(); Table t = new Table(); diff --git a/metastore/scripts/upgrade/derby/050-HIVE-18679.derby.sql b/metastore/scripts/upgrade/derby/050-HIVE-18679.derby.sql new file mode 100644 index 0000000000..1af0fb4971 --- /dev/null +++ b/metastore/scripts/upgrade/derby/050-HIVE-18679.derby.sql @@ -0,0 +1,6 @@ +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql index 85d593f973..86dbcd88a3 100644 --- a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql @@ -135,3 +135,11 @@ CREATE TABLE WRITE_SET ( WS_COMMIT_ID bigint NOT NULL, WS_OPERATION_TYPE char(1) NOT NULL ); + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); + diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 3a11881306..b5de7b378b 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -7,5 +7,6 @@ RUN '045-HIVE-16886.derby.sql'; RUN '046-HIVE-17566.derby.sql'; RUN '048-HIVE-14498.derby.sql'; RUN '049-HIVE-18489.derby.sql'; +RUN '050-HIVE-18679.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnTask.java new file mode 100644 index 0000000000..4364ffc0b3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnTask.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.util.StringUtils; + +public class OpenTxnTask extends Task { + + private static final long serialVersionUID = 1L; + + public OpenTxnTask() { + super(); + } + + @Override + public int execute(DriverContext driverContext) { + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Executing OpenTxn for " + work.getTxnId()); + } + + try { + long txnId = driverContext.getCtx().getHiveTxnManager().replOpenTxn(work.getReplPolicy(), work.getTxnId()); + LOG.info("Replayed OpenTxn Event for policy " + work.getReplPolicy() + " with srcTxn " + work.getTxnId() + " and target txn id " + txnId); + return 0; + } catch (Exception e) { + console.printError("Failed with exception " + e.getMessage(), "\n" + + StringUtils.stringifyException(e)); + setException(e); + return 1; + } + } + + @Override + public StageType getType() { + return StageType.MOVE; // TODO: Need to check the stage for open txn. + } + + @Override + public String getName() { + return "OPEN_TRANSACTION"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnWork.java new file mode 100644 index 0000000000..bcd4514ce2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OpenTxnWork.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +@Explain(displayName = "Open Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class OpenTxnWork implements Serializable { + private static final long serialVersionUID = 1L; + private String dbName; + private String tableName; + private long txnId; + + public OpenTxnWork(String dbName, String tableName, long txnId) { + this.txnId = txnId; + this.dbName = dbName; + this.tableName = tableName; + } + + public long getTxnId() { + return txnId; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getReplPolicy() { + if (dbName == null) { + return null; + } else if (tableName == null) { + return dbName.toLowerCase() + ".*"; + } else { + return dbName.toLowerCase() + "." + tableName.toLowerCase(); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 83590e2176..98b8199a04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.OpenTxnWork; +import org.apache.hadoop.hive.ql.exec.OpenTxnTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; @@ -108,6 +110,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); + taskvec.add(new TaskTuple(OpenTxnWork.class, OpenTxnTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 5bbfe95c5d..03fcdff3aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -189,6 +189,15 @@ void setHiveConf(HiveConf conf) { } } + @Override + public long replOpenTxn(String replPolicy, long srcTxnId) throws LockException { + try { + return getMS().replOpenTxn(replPolicy, srcTxnId); + } catch (TException e) { + throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); + } + } + @Override public long openTxn(Context ctx, String user) throws LockException { return openTxn(ctx, user, 0); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index fca640859b..76510068d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -54,6 +54,11 @@ public long openTxn(Context ctx, String user) throws LockException { // No-op return 0L; } + @Override + public long replOpenTxn(String replPolicy, long srcTxnId) throws LockException { + return 0L; + } + @Override public boolean isTxnOpen() { return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 4f9f0c2629..0d07a1a196 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -46,6 +46,15 @@ */ long openTxn(Context ctx, String user) throws LockException; + /** + * Open a new transaction in target cluster. + * @param replPolicy Replication policy to uniquely identify the source cluster. + * @param srcTxnId The id of the transaction at the source cluster + * @return The new transaction id. + * @throws LockException in case of failure to start the trasnaction. + */ + long replOpenTxn(String replPolicy, long srcTxnId) throws LockException ; + /** * Get the lock manager. This must be used rather than instantiating an * instance of the lock manager directly as the transaction manager will diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 5520bc2efe..1b6cfb1362 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.OpenTxnWork; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -401,10 +402,10 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || isAcid(txnId)) { + /*if (isSourceMm || isAcid(txnId)) { // Note: this is replication gap, not MM gap... Repl V2 is not ready yet. throw new RuntimeException("Replicating MM and ACID tables is not supported"); - } + }*/ copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { CopyWork cw = new CopyWork(dataPath, destPath, false); @@ -498,10 +499,10 @@ private static boolean isAcid(Long txnId) { Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || isAcid(txnId)) { + /*if (isSourceMm || isAcid(txnId)) { // Note: this is replication gap, not MM gap... Repl V2 is not ready yet. throw new RuntimeException("Replicating MM and ACID tables is not supported"); - } + }*/ copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { @@ -901,6 +902,14 @@ private static void createRegularImportTasks( return ict; } + private static Task createOpenTxnTask( + String dbName, String tblName, Long txnId, HiveConf conf, boolean isMmTable) { + @SuppressWarnings("unchecked") + Task ict = (!isMmTable) ? null : TaskFactory.get( + new OpenTxnWork(dbName, tblName, txnId), conf); + return ict; + } + /** * Create tasks for repl import */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c38f7dc49f..e4e7faf773 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -513,7 +513,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // If no import tasks generated by the event or no table updated for table level load, then no // need to update the repl state to any object. - if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) { + if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null)) || (isDatabaseLoad && (dbName == null))) { LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load"); return importTasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index c69ecc9405..cd8a396d61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncatePartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler; public enum DumpType { @@ -112,6 +113,12 @@ public MessageHandler handler() { return new TruncatePartitionHandler(); } }, + EVENT_OPEN_TXN("EVENT_OPEN_TXN") { + @Override + public MessageHandler handler() { + return new OpenTxnHandler(); + } + }, EVENT_INSERT("EVENT_INSERT") { @Override public MessageHandler handler() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 9955246ff8..b1e666c644 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -50,6 +50,7 @@ private EventHandlerFactory() { register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class); + register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class); } static void register(String event, Class handlerClazz) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java new file mode 100644 index 0000000000..507c452309 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class OpenTxnHandler extends AbstractEventHandler { + + OpenTxnHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_OPEN_TXN; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java new file mode 100644 index 0000000000..75071c0d5e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.ql.exec.OpenTxnWork; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class OpenTxnHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) + throws SemanticException { + OpenTxnMessage msg = deserializer.getOpenTxnMessage(context.dmd.getPayload()); + + Task openTxnTask = TaskFactory.get( + new OpenTxnWork(context.dbName, context.tableName, msg.getTxnId()), + context.hiveConf + ); + context.log.debug("Added Open txn task : {}", openTxnTask.getId()); + return Collections.singletonList(openTxnTask); + } +} diff --git a/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index aadf8f17c4..2376b86540 100644 --- a/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -13788,6 +13788,16 @@ void OpenTxnRequest::__set_agentInfo(const std::string& val) { __isset.agentInfo = true; } +void OpenTxnRequest::__set_replPolicy(const std::string& val) { + this->replPolicy = val; +__isset.replPolicy = true; +} + +void OpenTxnRequest::__set_replSrcTxnId(const int64_t val) { + this->replSrcTxnId = val; +__isset.replSrcTxnId = true; +} + uint32_t OpenTxnRequest::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -13844,6 +13854,22 @@ uint32_t OpenTxnRequest::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->replPolicy); + this->__isset.replPolicy = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->replSrcTxnId); + this->__isset.replSrcTxnId = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -13884,6 +13910,16 @@ uint32_t OpenTxnRequest::write(::apache::thrift::protocol::TProtocol* oprot) con xfer += oprot->writeString(this->agentInfo); xfer += oprot->writeFieldEnd(); } + if (this->__isset.replPolicy) { + xfer += oprot->writeFieldBegin("replPolicy", ::apache::thrift::protocol::T_STRING, 5); + xfer += oprot->writeString(this->replPolicy); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.replSrcTxnId) { + xfer += oprot->writeFieldBegin("replSrcTxnId", ::apache::thrift::protocol::T_I64, 6); + xfer += oprot->writeI64(this->replSrcTxnId); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -13895,6 +13931,8 @@ void swap(OpenTxnRequest &a, OpenTxnRequest &b) { swap(a.user, b.user); swap(a.hostname, b.hostname); swap(a.agentInfo, b.agentInfo); + swap(a.replPolicy, b.replPolicy); + swap(a.replSrcTxnId, b.replSrcTxnId); swap(a.__isset, b.__isset); } @@ -13903,6 +13941,8 @@ OpenTxnRequest::OpenTxnRequest(const OpenTxnRequest& other595) { user = other595.user; hostname = other595.hostname; agentInfo = other595.agentInfo; + replPolicy = other595.replPolicy; + replSrcTxnId = other595.replSrcTxnId; __isset = other595.__isset; } OpenTxnRequest& OpenTxnRequest::operator=(const OpenTxnRequest& other596) { @@ -13910,6 +13950,8 @@ OpenTxnRequest& OpenTxnRequest::operator=(const OpenTxnRequest& other596) { user = other596.user; hostname = other596.hostname; agentInfo = other596.agentInfo; + replPolicy = other596.replPolicy; + replSrcTxnId = other596.replSrcTxnId; __isset = other596.__isset; return *this; } @@ -13920,6 +13962,8 @@ void OpenTxnRequest::printTo(std::ostream& out) const { out << ", " << "user=" << to_string(user); out << ", " << "hostname=" << to_string(hostname); out << ", " << "agentInfo="; (__isset.agentInfo ? (out << to_string(agentInfo)) : (out << "")); + out << ", " << "replPolicy="; (__isset.replPolicy ? (out << to_string(replPolicy)) : (out << "")); + out << ", " << "replSrcTxnId="; (__isset.replSrcTxnId ? (out << to_string(replSrcTxnId)) : (out << "")); out << ")"; } diff --git a/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index 4c09bc8fe6..df11f797f0 100644 --- a/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -5732,8 +5732,10 @@ inline std::ostream& operator<<(std::ostream& out, const GetOpenTxnsResponse& ob } typedef struct _OpenTxnRequest__isset { - _OpenTxnRequest__isset() : agentInfo(true) {} + _OpenTxnRequest__isset() : agentInfo(true), replPolicy(false), replSrcTxnId(false) {} bool agentInfo :1; + bool replPolicy :1; + bool replSrcTxnId :1; } _OpenTxnRequest__isset; class OpenTxnRequest { @@ -5741,7 +5743,7 @@ class OpenTxnRequest { OpenTxnRequest(const OpenTxnRequest&); OpenTxnRequest& operator=(const OpenTxnRequest&); - OpenTxnRequest() : num_txns(0), user(), hostname(), agentInfo("Unknown") { + OpenTxnRequest() : num_txns(0), user(), hostname(), agentInfo("Unknown"), replPolicy(), replSrcTxnId(0) { } virtual ~OpenTxnRequest() throw(); @@ -5749,6 +5751,8 @@ class OpenTxnRequest { std::string user; std::string hostname; std::string agentInfo; + std::string replPolicy; + int64_t replSrcTxnId; _OpenTxnRequest__isset __isset; @@ -5760,6 +5764,10 @@ class OpenTxnRequest { void __set_agentInfo(const std::string& val); + void __set_replPolicy(const std::string& val); + + void __set_replSrcTxnId(const int64_t val); + bool operator == (const OpenTxnRequest & rhs) const { if (!(num_txns == rhs.num_txns)) @@ -5772,6 +5780,14 @@ class OpenTxnRequest { return false; else if (__isset.agentInfo && !(agentInfo == rhs.agentInfo)) return false; + if (__isset.replPolicy != rhs.__isset.replPolicy) + return false; + else if (__isset.replPolicy && !(replPolicy == rhs.replPolicy)) + return false; + if (__isset.replSrcTxnId != rhs.__isset.replSrcTxnId) + return false; + else if (__isset.replSrcTxnId && !(replSrcTxnId == rhs.replSrcTxnId)) + return false; return true; } bool operator != (const OpenTxnRequest &rhs) const { diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/OpenTxnRequest.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/OpenTxnRequest.java index 4dd235a080..029d8afa38 100644 --- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/OpenTxnRequest.java +++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/OpenTxnRequest.java @@ -42,6 +42,8 @@ private static final org.apache.thrift.protocol.TField USER_FIELD_DESC = new org.apache.thrift.protocol.TField("user", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField AGENT_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("agentInfo", org.apache.thrift.protocol.TType.STRING, (short)4); + private static final org.apache.thrift.protocol.TField REPL_POLICY_FIELD_DESC = new org.apache.thrift.protocol.TField("replPolicy", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField REPL_SRC_TXN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("replSrcTxnId", org.apache.thrift.protocol.TType.I64, (short)6); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -53,13 +55,17 @@ private String user; // required private String hostname; // required private String agentInfo; // optional + private String replPolicy; // optional + private long replSrcTxnId; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { NUM_TXNS((short)1, "num_txns"), USER((short)2, "user"), HOSTNAME((short)3, "hostname"), - AGENT_INFO((short)4, "agentInfo"); + AGENT_INFO((short)4, "agentInfo"), + REPL_POLICY((short)5, "replPolicy"), + REPL_SRC_TXN_ID((short)6, "replSrcTxnId"); private static final Map byName = new HashMap(); @@ -82,6 +88,10 @@ public static _Fields findByThriftId(int fieldId) { return HOSTNAME; case 4: // AGENT_INFO return AGENT_INFO; + case 5: // REPL_POLICY + return REPL_POLICY; + case 6: // REPL_SRC_TXN_ID + return REPL_SRC_TXN_ID; default: return null; } @@ -123,8 +133,9 @@ public String getFieldName() { // isset id assignments private static final int __NUM_TXNS_ISSET_ID = 0; + private static final int __REPLSRCTXNID_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.AGENT_INFO}; + private static final _Fields optionals[] = {_Fields.AGENT_INFO,_Fields.REPL_POLICY,_Fields.REPL_SRC_TXN_ID}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -136,6 +147,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.AGENT_INFO, new org.apache.thrift.meta_data.FieldMetaData("agentInfo", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.REPL_POLICY, new org.apache.thrift.meta_data.FieldMetaData("replPolicy", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.REPL_SRC_TXN_ID, new org.apache.thrift.meta_data.FieldMetaData("replSrcTxnId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(OpenTxnRequest.class, metaDataMap); } @@ -172,6 +187,10 @@ public OpenTxnRequest(OpenTxnRequest other) { if (other.isSetAgentInfo()) { this.agentInfo = other.agentInfo; } + if (other.isSetReplPolicy()) { + this.replPolicy = other.replPolicy; + } + this.replSrcTxnId = other.replSrcTxnId; } public OpenTxnRequest deepCopy() { @@ -186,6 +205,9 @@ public void clear() { this.hostname = null; this.agentInfo = "Unknown"; + this.replPolicy = null; + setReplSrcTxnIdIsSet(false); + this.replSrcTxnId = 0; } public int getNum_txns() { @@ -279,6 +301,51 @@ public void setAgentInfoIsSet(boolean value) { } } + public String getReplPolicy() { + return this.replPolicy; + } + + public void setReplPolicy(String replPolicy) { + this.replPolicy = replPolicy; + } + + public void unsetReplPolicy() { + this.replPolicy = null; + } + + /** Returns true if field replPolicy is set (has been assigned a value) and false otherwise */ + public boolean isSetReplPolicy() { + return this.replPolicy != null; + } + + public void setReplPolicyIsSet(boolean value) { + if (!value) { + this.replPolicy = null; + } + } + + public long getReplSrcTxnId() { + return this.replSrcTxnId; + } + + public void setReplSrcTxnId(long replSrcTxnId) { + this.replSrcTxnId = replSrcTxnId; + setReplSrcTxnIdIsSet(true); + } + + public void unsetReplSrcTxnId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __REPLSRCTXNID_ISSET_ID); + } + + /** Returns true if field replSrcTxnId is set (has been assigned a value) and false otherwise */ + public boolean isSetReplSrcTxnId() { + return EncodingUtils.testBit(__isset_bitfield, __REPLSRCTXNID_ISSET_ID); + } + + public void setReplSrcTxnIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REPLSRCTXNID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case NUM_TXNS: @@ -313,6 +380,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case REPL_POLICY: + if (value == null) { + unsetReplPolicy(); + } else { + setReplPolicy((String)value); + } + break; + + case REPL_SRC_TXN_ID: + if (value == null) { + unsetReplSrcTxnId(); + } else { + setReplSrcTxnId((Long)value); + } + break; + } } @@ -330,6 +413,12 @@ public Object getFieldValue(_Fields field) { case AGENT_INFO: return getAgentInfo(); + case REPL_POLICY: + return getReplPolicy(); + + case REPL_SRC_TXN_ID: + return getReplSrcTxnId(); + } throw new IllegalStateException(); } @@ -349,6 +438,10 @@ public boolean isSet(_Fields field) { return isSetHostname(); case AGENT_INFO: return isSetAgentInfo(); + case REPL_POLICY: + return isSetReplPolicy(); + case REPL_SRC_TXN_ID: + return isSetReplSrcTxnId(); } throw new IllegalStateException(); } @@ -402,6 +495,24 @@ public boolean equals(OpenTxnRequest that) { return false; } + boolean this_present_replPolicy = true && this.isSetReplPolicy(); + boolean that_present_replPolicy = true && that.isSetReplPolicy(); + if (this_present_replPolicy || that_present_replPolicy) { + if (!(this_present_replPolicy && that_present_replPolicy)) + return false; + if (!this.replPolicy.equals(that.replPolicy)) + return false; + } + + boolean this_present_replSrcTxnId = true && this.isSetReplSrcTxnId(); + boolean that_present_replSrcTxnId = true && that.isSetReplSrcTxnId(); + if (this_present_replSrcTxnId || that_present_replSrcTxnId) { + if (!(this_present_replSrcTxnId && that_present_replSrcTxnId)) + return false; + if (this.replSrcTxnId != that.replSrcTxnId) + return false; + } + return true; } @@ -429,6 +540,16 @@ public int hashCode() { if (present_agentInfo) list.add(agentInfo); + boolean present_replPolicy = true && (isSetReplPolicy()); + list.add(present_replPolicy); + if (present_replPolicy) + list.add(replPolicy); + + boolean present_replSrcTxnId = true && (isSetReplSrcTxnId()); + list.add(present_replSrcTxnId); + if (present_replSrcTxnId) + list.add(replSrcTxnId); + return list.hashCode(); } @@ -480,6 +601,26 @@ public int compareTo(OpenTxnRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetReplPolicy()).compareTo(other.isSetReplPolicy()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReplPolicy()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replPolicy, other.replPolicy); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetReplSrcTxnId()).compareTo(other.isSetReplSrcTxnId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReplSrcTxnId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replSrcTxnId, other.replSrcTxnId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -529,6 +670,22 @@ public String toString() { } first = false; } + if (isSetReplPolicy()) { + if (!first) sb.append(", "); + sb.append("replPolicy:"); + if (this.replPolicy == null) { + sb.append("null"); + } else { + sb.append(this.replPolicy); + } + first = false; + } + if (isSetReplSrcTxnId()) { + if (!first) sb.append(", "); + sb.append("replSrcTxnId:"); + sb.append(this.replSrcTxnId); + first = false; + } sb.append(")"); return sb.toString(); } @@ -618,6 +775,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, OpenTxnRequest stru org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // REPL_POLICY + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.replPolicy = iprot.readString(); + struct.setReplPolicyIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 6: // REPL_SRC_TXN_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.replSrcTxnId = iprot.readI64(); + struct.setReplSrcTxnIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -651,6 +824,18 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, OpenTxnRequest str oprot.writeFieldEnd(); } } + if (struct.replPolicy != null) { + if (struct.isSetReplPolicy()) { + oprot.writeFieldBegin(REPL_POLICY_FIELD_DESC); + oprot.writeString(struct.replPolicy); + oprot.writeFieldEnd(); + } + } + if (struct.isSetReplSrcTxnId()) { + oprot.writeFieldBegin(REPL_SRC_TXN_ID_FIELD_DESC); + oprot.writeI64(struct.replSrcTxnId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -675,10 +860,22 @@ public void write(org.apache.thrift.protocol.TProtocol prot, OpenTxnRequest stru if (struct.isSetAgentInfo()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetReplPolicy()) { + optionals.set(1); + } + if (struct.isSetReplSrcTxnId()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetAgentInfo()) { oprot.writeString(struct.agentInfo); } + if (struct.isSetReplPolicy()) { + oprot.writeString(struct.replPolicy); + } + if (struct.isSetReplSrcTxnId()) { + oprot.writeI64(struct.replSrcTxnId); + } } @Override @@ -690,11 +887,19 @@ public void read(org.apache.thrift.protocol.TProtocol prot, OpenTxnRequest struc struct.setUserIsSet(true); struct.hostname = iprot.readString(); struct.setHostnameIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.agentInfo = iprot.readString(); struct.setAgentInfoIsSet(true); } + if (incoming.get(1)) { + struct.replPolicy = iprot.readString(); + struct.setReplPolicyIsSet(true); + } + if (incoming.get(2)) { + struct.replSrcTxnId = iprot.readI64(); + struct.setReplSrcTxnIdIsSet(true); + } } } diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php index a5b578ef37..8142cb69d1 100644 --- a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -13859,6 +13859,14 @@ class OpenTxnRequest { * @var string */ public $agentInfo = "Unknown"; + /** + * @var string + */ + public $replPolicy = null; + /** + * @var int + */ + public $replSrcTxnId = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -13879,6 +13887,14 @@ class OpenTxnRequest { 'var' => 'agentInfo', 'type' => TType::STRING, ), + 5 => array( + 'var' => 'replPolicy', + 'type' => TType::STRING, + ), + 6 => array( + 'var' => 'replSrcTxnId', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -13894,6 +13910,12 @@ class OpenTxnRequest { if (isset($vals['agentInfo'])) { $this->agentInfo = $vals['agentInfo']; } + if (isset($vals['replPolicy'])) { + $this->replPolicy = $vals['replPolicy']; + } + if (isset($vals['replSrcTxnId'])) { + $this->replSrcTxnId = $vals['replSrcTxnId']; + } } } @@ -13944,6 +13966,20 @@ class OpenTxnRequest { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->replPolicy); + } else { + $xfer += $input->skip($ftype); + } + break; + case 6: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->replSrcTxnId); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -13977,6 +14013,16 @@ class OpenTxnRequest { $xfer += $output->writeString($this->agentInfo); $xfer += $output->writeFieldEnd(); } + if ($this->replPolicy !== null) { + $xfer += $output->writeFieldBegin('replPolicy', TType::STRING, 5); + $xfer += $output->writeString($this->replPolicy); + $xfer += $output->writeFieldEnd(); + } + if ($this->replSrcTxnId !== null) { + $xfer += $output->writeFieldBegin('replSrcTxnId', TType::I64, 6); + $xfer += $output->writeI64($this->replSrcTxnId); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 5598859042..cf89ba5b39 100644 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -9580,6 +9580,8 @@ class OpenTxnRequest: - user - hostname - agentInfo + - replPolicy + - replSrcTxnId """ thrift_spec = ( @@ -9588,13 +9590,17 @@ class OpenTxnRequest: (2, TType.STRING, 'user', None, None, ), # 2 (3, TType.STRING, 'hostname', None, None, ), # 3 (4, TType.STRING, 'agentInfo', None, "Unknown", ), # 4 + (5, TType.STRING, 'replPolicy', None, None, ), # 5 + (6, TType.I64, 'replSrcTxnId', None, None, ), # 6 ) - def __init__(self, num_txns=None, user=None, hostname=None, agentInfo=thrift_spec[4][4],): + def __init__(self, num_txns=None, user=None, hostname=None, agentInfo=thrift_spec[4][4], replPolicy=None, replSrcTxnId=None,): self.num_txns = num_txns self.user = user self.hostname = hostname self.agentInfo = agentInfo + self.replPolicy = replPolicy + self.replSrcTxnId = replSrcTxnId def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9625,6 +9631,16 @@ def read(self, iprot): self.agentInfo = iprot.readString() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.replPolicy = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I64: + self.replSrcTxnId = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9651,6 +9667,14 @@ def write(self, oprot): oprot.writeFieldBegin('agentInfo', TType.STRING, 4) oprot.writeString(self.agentInfo) oprot.writeFieldEnd() + if self.replPolicy is not None: + oprot.writeFieldBegin('replPolicy', TType.STRING, 5) + oprot.writeString(self.replPolicy) + oprot.writeFieldEnd() + if self.replSrcTxnId is not None: + oprot.writeFieldBegin('replSrcTxnId', TType.I64, 6) + oprot.writeI64(self.replSrcTxnId) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9670,6 +9694,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.user) value = (value * 31) ^ hash(self.hostname) value = (value * 31) ^ hash(self.agentInfo) + value = (value * 31) ^ hash(self.replPolicy) + value = (value * 31) ^ hash(self.replSrcTxnId) return value def __repr__(self): diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index bc58cfe0ef..cd8ffcdc9b 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2133,12 +2133,16 @@ class OpenTxnRequest USER = 2 HOSTNAME = 3 AGENTINFO = 4 + REPLPOLICY = 5 + REPLSRCTXNID = 6 FIELDS = { NUM_TXNS => {:type => ::Thrift::Types::I32, :name => 'num_txns'}, USER => {:type => ::Thrift::Types::STRING, :name => 'user'}, HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostname'}, - AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true} + AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true}, + REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true}, + REPLSRCTXNID => {:type => ::Thrift::Types::I64, :name => 'replSrcTxnId', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 1f998285cf..8a1422a653 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent; import org.apache.hadoop.hive.metastore.events.PreReadTableEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor; import org.apache.hadoop.hive.metastore.metrics.Metrics; @@ -6667,7 +6668,17 @@ public GetOpenTxnsInfoResponse get_open_txns_info() throws TException { @Override public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException { - return getTxnHandler().openTxns(rqst); + OpenTxnsResponse response = getTxnHandler().openTxns(rqst); + if (!listeners.isEmpty()) { + List txnIds = response.getTxn_ids(); + // TODO: can it be done in a batch ? + for (Long id : txnIds) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.OPEN_TXN, + new OpenTxnEvent(id, true, this)); + } + } + return response; } @Override diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 23cef8d556..948790a8f2 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2225,12 +2225,22 @@ public ValidTxnList getValidTxns(long currentTxn) throws TException { @Override public long openTxn(String user) throws TException { - OpenTxnsResponse txns = openTxns(user, 1); + OpenTxnsResponse txns = openTxnsIntr(user, null, -1, 1); + return txns.getTxn_ids().get(0); + } + + @Override + public long replOpenTxn(String replPolicy, long srcTxnid) throws TException { + OpenTxnsResponse txns = openTxnsIntr(null, replPolicy, srcTxnid, 1); return txns.getTxn_ids().get(0); } @Override public OpenTxnsResponse openTxns(String user, int numTxns) throws TException { + return openTxnsIntr(user, null, -1, numTxns); + } + + private OpenTxnsResponse openTxnsIntr(String user, String replPolicy, long srcTxnid, int numTxns) throws TException { String hostname = null; try { hostname = InetAddress.getLocalHost().getHostName(); @@ -2238,7 +2248,10 @@ public OpenTxnsResponse openTxns(String user, int numTxns) throws TException { LOG.error("Unable to resolve my host name " + e.getMessage()); throw new RuntimeException(e); } - return client.open_txns(new OpenTxnRequest(numTxns, user, hostname)); + OpenTxnRequest rqst = new OpenTxnRequest(numTxns, user, hostname); + rqst.setReplPolicy(replPolicy); + rqst.setReplSrcTxnId(srcTxnid); + return client.open_txns(rqst); } @Override diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 96d4590222..720bd2a6ca 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1363,6 +1363,15 @@ GetAllFunctionsResponse getAllFunctions() */ long openTxn(String user) throws TException; + /** + * Initiate a transaction at the target cluster. + * @param replPolicy The replication policy to uniquely identify the source cluster. + * @param srcTxnId The transaction id at the source cluster + * @return transaction identifier + * @throws TException + */ + long replOpenTxn(String replPolicy, long srcTxnId) throws TException; + /** * Initiate a batch of transactions. It is not guaranteed that the * requested number of transactions will be instantiated. The system has a diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index 0e1620df03..13b75120a0 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; /** * This abstract class needs to be extended to provide implementation of actions that needs @@ -220,6 +221,15 @@ public void onAddNotNullConstraint(AddNotNullConstraintEvent addNotNullConstrain public void onDropConstraint(DropConstraintEvent dropConstraintEvent) throws MetaException { } + /** + * This will be called when a new transaction is started. + * @param openTxnEvent + * @throws MetaException + */ + public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException { + + } + @Override public Configuration getConf() { return this.conf; diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java index a640b34e4b..66db178ce3 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; import java.util.List; import java.util.Map; @@ -179,6 +180,12 @@ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws listener.onAddNotNullConstraint((AddNotNullConstraintEvent)event); } }) + .put(EventType.OPEN_TXN, new EventNotifier() { + @Override + public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException { + listener.onOpenTxn((OpenTxnEvent)event); + } + }) .build() ); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java new file mode 100644 index 0000000000..07e6e78021 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.IHMSHandler; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class OpenTxnEvent extends ListenerEvent { + + private final Long txnId; + + /** + * + * @param transactionId Unique identification for the transaction just opened. + * @param status status of insert, true = success, false = failure + * @param handler handler that is firing the event + */ + public OpenTxnEvent(Long transactionId, boolean status, IHMSHandler handler) { + super(status, handler); + txnId = transactionId; + } + + /** + * @return Long txnId + */ + public Long getTxnId() { + return txnId; + } +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java index dad2f5b115..7c4669e595 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -49,7 +49,8 @@ ADD_FOREIGNKEY(MessageFactory.ADD_FOREIGNKEY_EVENT), ADD_UNIQUECONSTRAINT(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT), ADD_NOTNULLCONSTRAINT(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT), - DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT); + DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT), + OPEN_TXN(MessageFactory.OPEN_TXN_EVENT); private String typeString; diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java index f85dc407c8..8397d0744a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -70,6 +70,8 @@ public EventMessage getEventMessage(String eventTypeString, String messageBody) return getAddNotNullConstraintMessage(messageBody); case DROP_CONSTRAINT: return getDropConstraintMessage(messageBody); + case OPEN_TXN: + return getOpenTxnMessage(messageBody); default: throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); } @@ -181,6 +183,12 @@ public EventMessage getEventMessage(String eventTypeString, String messageBody) */ public abstract DropConstraintMessage getDropConstraintMessage(String messageBody); + /** + * Method to de-serialize OpenTxnMessage instance. + */ + public abstract OpenTxnMessage getOpenTxnMessage(String messageBody); + + // Protection against construction. protected MessageDeserializer() {} } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index 0e3357d487..6c354b8690 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -63,6 +63,7 @@ public static final String ADD_UNIQUECONSTRAINT_EVENT = "ADD_UNIQUECONSTRAINT"; public static final String ADD_NOTNULLCONSTRAINT_EVENT = "ADD_NOTNULLCONSTRAINT"; public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT"; + public static final String OPEN_TXN_EVENT = "OPEN_TXN"; private static MessageFactory instance = null; @@ -254,6 +255,14 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa public abstract InsertMessage buildInsertMessage(Table tableObj, Partition ptnObj, boolean replace, Iterator files); + /** + * Factory method for building open txn message + * + * @param txnId Id of the newly opened transaction + * @return instance of OpenTxnMessage + */ + public abstract OpenTxnMessage buildOpenTxnMessage(Long txnId); + /*** * Factory method for building add primary key message * diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/OpenTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/OpenTxnMessage.java new file mode 100644 index 0000000000..efbeac2835 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/OpenTxnMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + + +/** + * HCat message sent when an open transaction is done. + */ +public abstract class OpenTxnMessage extends EventMessage { + + protected OpenTxnMessage() { + super(EventType.OPEN_TXN); + } + + /** + * Get the table object associated with the insert + * + * @return The TxnId + */ + public abstract Long getTxnId(); + +} diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index 50420c8d37..f200a74e9d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import java.util.regex.Pattern; @@ -41,8 +43,8 @@ public DatabaseAndTableFilter(final String databaseNameOrPattern, final String t @Override boolean shouldAccept(final NotificationEvent event) { - if (dbPattern == null) { - return true; // if our dbName is null, we're interested in all wh events + if ((dbPattern == null) || (event.getEventType().equals(MessageFactory.OPEN_TXN_EVENT))) { + return true; // if our dbName is null or its of open txn type, we're interested in all wh events } if (dbPattern.matcher(event.getDbName()).matches()) { if ((tableName == null) diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java index 34b62e6901..f48812248e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -253,4 +254,13 @@ public DropConstraintMessage getDropConstraintMessage(String messageBody) { throw new IllegalArgumentException("Could not construct DropConstraintMessage", e); } } + + @Override + public OpenTxnMessage getOpenTxnMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONOpenTxnMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct DropConstraintMessage", e); + } + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 7f46d071fe..8e3d887588 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -182,6 +183,11 @@ public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, tableObj, partObj, replace, fileIter, now()); } + @Override + public OpenTxnMessage buildOpenTxnMessage(Long txnId) { + return new JSONOpenTxnMessage(txnId, now()); + } + @Override public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List pks) { return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now()); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONOpenTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONOpenTxnMessage.java new file mode 100644 index 0000000000..b3eee2508b --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONOpenTxnMessage.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +import com.google.common.collect.Lists; + +import java.util.Iterator; +import java.util.List; + +/** + * JSON implementation of InsertMessage + */ +public class JSONOpenTxnMessage extends OpenTxnMessage { + + @JsonProperty + Long txnid; + + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONOpenTxnMessage() { + } + + public JSONOpenTxnMessage(Long txnid, Long timestamp) { + this.timestamp = timestamp; + this.txnid = txnid; + } + + @Override + public Long getTxnId() { return txnid; } + + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getDB() { + return null; + } + + @Override + public String getServicePrincipal() { + return null; + } + + @Override + public String getServer() { + return null; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } + +} \ No newline at end of file diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1bb976c082..b4eb98b128 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -577,6 +577,25 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { LOG.debug("Going to execute update <" + q + ">"); stmt.execute(q); } + + if (rqst.isSetReplPolicy()) { + List rowsRepl = new ArrayList<>(); + String selectRepl = "select target_txn_id from TXN_MAP where repl_policy = " + quoteString(rqst.getReplPolicy()) + "and src_txn_id = " + rqst.getReplSrcTxnId(); + for (long i = first; i < first + numTxns; i++) { + rowsRepl.add(quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnId() + "," + i); + } + List queriesRepl = sqlGenerator.createInsertValuesStmt( + "TXN_MAP (repl_policy, src_txn_id, target_txn_id)", rowsRepl); + for (String q : queriesRepl) { + rs = stmt.executeQuery(selectRepl); + //no rows in the result set + if (rs.first() == false) { + LOG.debug("Going to execute insert <" + q + ">"); + stmt.execute(q); + } + } + } + LOG.debug("Going to commit"); dbConn.commit(); return new OpenTxnsResponse(txnIds); diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql index ac28869946..37f2aa5e45 100644 --- a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql @@ -525,6 +525,12 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql index d49255a545..cfc36df8bd 100644 --- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -94,3 +94,10 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); \ No newline at end of file diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql index 7c26d5dcc4..62f1da0533 100644 --- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql @@ -1129,6 +1129,14 @@ CREATE TABLE METASTORE_DB_PROPERTIES ( ALTER TABLE METASTORE_DB_PROPERTIES ADD CONSTRAINT PROPERTY_KEY_PK PRIMARY KEY (PROPERTY_KEY); +CREATE TABLE TXN_MAP ( + REPL_POLICY nvarchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL +); + +ALTER TABLE TXN_MAP ADD CONSTRAINT TXN_MAP_PK PRIMARY KEY (REPL_POLICY, SRC_TXN_ID); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql index 6dc3e1a091..66cf393c8a 100644 --- a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql @@ -148,3 +148,11 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' + SUBSTRING(DB_LOCATION_URI, 4, LEN(DB_LOCATION_URI)) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +CREATE TABLE TXN_MAP ( + REPL_POLICY nvarchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL +); + +ALTER TABLE TXN_MAP ADD CONSTRAINT TXN_MAP_PK PRIMARY KEY (REPL_POLICY, SRC_TXN_ID); diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql index 0eb2e2e4eb..16f424ffa8 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql @@ -1063,6 +1063,14 @@ CREATE TABLE WRITE_SET ( WS_COMMIT_ID bigint NOT NULL, WS_OPERATION_TYPE char(1) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql index 0a170f6fc8..f02a3fec1f 100644 --- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql @@ -133,3 +133,10 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = CONCAT('s3a', SUBSTR(DB_LOCATION_URI, 4, LENGTH(DB_LOCATION_URI))) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql index 37f9063993..52356f9502 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql @@ -1037,6 +1037,13 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql index a923d92a06..6c04e4b7a2 100644 --- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql @@ -156,3 +156,10 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql index 9d63056376..f3bb188425 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql @@ -1729,6 +1729,13 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql index eb45cd24a6..4b0378464e 100644 --- a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql @@ -172,3 +172,10 @@ UPDATE "SDS" UPDATE "DBS" SET "DB_LOCATION_URI" = 's3a' || SUBSTR("DB_LOCATION_URI", 4) WHERE "DB_LOCATION_URI" LIKE 's3n://%' ; + +CREATE TABLE TXN_MAP ( + REPL_POLICY varchar(128) NOT NULL, + SRC_TXN_ID bigint NOT NULL, + TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (REPL_POLICY, SRC_TXN_ID) +); diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift index 371b97590c..1db4bc60b4 100644 --- a/standalone-metastore/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -713,6 +713,8 @@ struct OpenTxnRequest { 2: required string user, 3: required string hostname, 4: optional string agentInfo = "Unknown", + 5: optional string replPolicy, + 6: optional i64 replSrcTxnId, } struct OpenTxnsResponse {