diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 7468886..f1c0100 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; @@ -27,20 +30,18 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; -import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.junit.rules.TestName; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.rules.TestName; import org.junit.rules.TestRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -48,14 +49,21 @@ import org.junit.Test; import org.junit.BeforeClass; import org.junit.AfterClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import javax.annotation.Nullable; import java.util.Collections; import com.google.common.collect.Lists; +import org.junit.Ignore; + +import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -285,6 +293,77 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { } @Test + public void testAcidTablesBootstrapWithConcurrentWrites() throws Throwable { + HiveConf primaryConf = primary.getConf(); + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)"); + + // Perform concurrent write on the acid table t1 when bootstrap dump in progress. Bootstrap + // won't see the written data but the subsequent incremental repl should see it. + BehaviourInjection callerInjectedBehavior + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (injectionPathCalled) { + nonInjectedPathCalled = true; + } else { + // Insert another row to t1 from another txn when bootstrap dump in progress. + injectionPathCalled = true; + Thread t = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Entered new thread"); + IDriver driver = DriverFactory.newDriver(primaryConf); + SessionState.start(new CliSessionState(primaryConf)); + CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)"); + boolean success = (ret.getException() == null); + assertTrue(success); + LOG.info("Exit new thread success - {}", success, ret.getException()); + } + }); + t.start(); + LOG.info("Created new thread {}", t.getName()); + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return true; + } + }; + + InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior); + WarehouseInstance.Tuple bootstrapDump = null; + try { + bootstrapDump = primary.dump(primaryDbName, null); + callerInjectedBehavior.assertInjectionsPerformed(true, true); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } + + // Bootstrap dump has taken snapshot before concurrent tread performed write. So, it won't see data "2". + replica.load(replicatedDbName, bootstrapDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResults(new String[]{"1" }); + + // Incremental should include the concurrent write of data "2" from another txn. + WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2" }); + } + + @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3df27a7..7c043b9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1079,9 +1079,12 @@ public Boolean apply(@Nullable CallerArguments args) { // Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded. List withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); - replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); - InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour - callerVerifier.assertInjectionsPerformed(true, false); + try { + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) @@ -1122,11 +1125,14 @@ public Boolean apply(@Nullable CallerArguments args) { }; InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); - // Retry with same dump with which it was already loaded should resume the bootstrap load. - // This time, it fails when try to load the foreign key constraints. All other constraints are loaded. - replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); - InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour - callerVerifier.assertInjectionsPerformed(true, false); + try { + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it fails when try to load the foreign key constraints. All other constraints are loaded. + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) @@ -1158,11 +1164,14 @@ public Boolean apply(@Nullable CallerArguments args) { }; InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); - // Retry with same dump with which it was already loaded should resume the bootstrap load. - // This time, it completes by adding just foreign key constraints for table t2. - replica.load(replicatedDbName, tuple.dumpLocation); - InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour - callerVerifier.assertInjectionsPerformed(true, false); + try { + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding just foreign key constraints for table t2. + replica.load(replicatedDbName, tuple.dumpLocation); + callerVerifier.assertInjectionsPerformed(true, false); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) @@ -1249,11 +1258,14 @@ public Boolean apply(@Nullable CallerArguments args) { }; InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); - // Retry with same dump with which it was already loaded should resume the bootstrap load. - // This time, it completes by adding remaining partitions. - replica.load(replicatedDbName, tuple.dumpLocation); - InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour - callerVerifier.assertInjectionsPerformed(false, false); + try { + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding remaining partitions. + replica.load(replicatedDbName, tuple.dumpLocation); + callerVerifier.assertInjectionsPerformed(false, false); + } finally { + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + } replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a424a64..d6cdc35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -118,6 +118,7 @@ import org.apache.hadoop.hive.ql.parse.ParseException; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; @@ -145,6 +146,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.common.util.TxnIdUtils; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -651,6 +653,10 @@ public void run() { BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); if (!retrial) { + if ((queryState.getHiveOperation() != null) + && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) { + setLastReplIdForDump(queryState.getConf()); + } openTransaction(); generateValidTxnList(); } @@ -894,6 +900,22 @@ private void setTriggerContext(final String queryId) { ctx.setWmContext(wmContext); } + /** + * Last repl id should be captured before opening txn by current REPL DUMP operation. + * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap + * dump in progress. + * @param conf Query configurations + * @throws HiveException + * @throws TException + */ + private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException { + // Last logged notification event id would be the last repl Id for the current REPl DUMP. + Hive hiveDb = Hive.get(); + Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + conf.setLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, lastReplId); + LOG.debug("Setting " + ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY + " = " + lastReplId); + } + private void openTransaction() throws LockException, CommandProcessorResponse { if (checkConcurrency() && startImplicitTxn(queryTxnMgr)) { String userFromUGI = getUserFromUGI(); @@ -901,7 +923,7 @@ private void openTransaction() throws LockException, CommandProcessorResponse { if (userFromUGI == null) { throw createProcessorResponse(10); } - long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); + queryTxnMgr.openTxn(ctx, userFromUGI); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index b9d6f58..cbd503f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import static org.apache.hadoop.util.StringUtils.stringifyException; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -31,7 +29,9 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; @@ -93,6 +93,13 @@ public int execute(DriverContext driverContext) { } return createPermanentFunction(Hive.get(conf), createFunctionDesc); } catch (Exception e) { + // For repl load flow, function may exist for first incremental phase. So, just return success. + if (createFunctionDesc.getReplicationSpec().isInReplicationScope() + && (e.getCause() instanceof AlreadyExistsException)) { + LOG.info("Create function is idempotent as function: " + + createFunctionDesc.getFunctionName() + " already exists."); + return 0; + } setException(e); LOG.error("Failed to create function", e); return 1; @@ -262,6 +269,13 @@ private int dropPermanentFunction(Hive db, DropFunctionDesc dropFunctionDesc) { return 0; } catch (Exception e) { + // For repl load flow, function may not exist for first incremental phase. So, just return success. + if (dropFunctionDesc.getReplicationSpec().isInReplicationScope() + && (e.getCause() instanceof NoSuchObjectException)) { + LOG.info("Drop function is idempotent as function: " + + dropFunctionDesc.getFunctionName() + " doesn't exist."); + return 0; + } LOG.info("drop function: ", e); console.printError("FAILED: error during drop function: " + StringUtils.stringifyException(e)); return 1; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 940e381..4eba910 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -68,7 +70,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.ErrorMsg; import java.io.Serializable; import java.util.ArrayList; @@ -218,8 +219,13 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case + // Last repl id would've been captured during compile phase in queryState configs before opening txn. + // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from + // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. Hive hiveDb = getHive(); - Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L); + assert (bootDumpBeginReplId >= 0L); + String validTxnList = getValidTxnListForReplDump(hiveDb); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); @@ -234,7 +240,7 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); - dumpTable(dbName, tblName, validTxnList, dbRoot); + dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId); dumpConstraintMetadata(dbName, tblName, dbRoot); } Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); @@ -284,7 +290,7 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throw return dbRoot; } - private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception { + private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception { try { Hive db = getHive(); HiveWrapper.Tuple tuple = new HiveWrapper(db, dbName).table(tblName); @@ -295,6 +301,11 @@ private void dumpTable(String dbName, String tblName, String validTxnList, Path tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); + + // For transactional table, data would be valid snapshot for current txn and doesn't include data + // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table + // as bootstrap dump's last repl Id. + tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); new TableExport( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c5714a5..c1b98ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -92,6 +92,7 @@ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; + public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 9294c2b..c0725ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -1676,14 +1676,15 @@ public void testReplAllocWriteId() throws Exception { replAbortTxnForTest(srcTxnIdList, "destdb.*"); - // Test for aborted transactions + // Test for aborted transactions. Idempotent case where allocate write id when txn is already + // aborted should do nothing. failed = false; try { txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds(); } catch (RuntimeException e) { failed = true; } - assertTrue(failed); + assertFalse(failed); } private void updateTxns(Connection conn) throws SQLException { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 4f56eba..d3e5bf9 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -426,6 +426,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { return getOpenTxnsInfo(); } } + @Override @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { @@ -751,6 +752,8 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept List targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), Collections.singletonList(sourceTxnId), stmt); if (targetTxnIds.isEmpty()) { + // Idempotent case where txn was already closed or abort txn event received without + // corresponding open txn event. LOG.info("Target txn id is missing for source txn id : " + sourceTxnId + " and repl policy " + rqst.getReplPolicy()); return; @@ -892,6 +895,8 @@ public void commitTxn(CommitTxnRequest rqst) List targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), Collections.singletonList(sourceTxnId), stmt); if (targetTxnIds.isEmpty()) { + // Idempotent case where txn was already closed or commit txn event received without + // corresponding open txn event. LOG.info("Target txn id is missing for source txn id : " + sourceTxnId + " and repl policy " + rqst.getReplPolicy()); return; @@ -934,9 +939,8 @@ public void commitTxn(CommitTxnRequest rqst) conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, - "tc_operation_type " + conflictSQLSuffix)); + "tc_operation_type " + conflictSQLSuffix)); } - if (rs != null && rs.next()) { isUpdateDelete = 'Y'; close(rs); @@ -1402,9 +1406,11 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt); if (srcTxnIds.size() != txnIds.size()) { - LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString() + + // Idempotent case where txn was already closed but gets allocate write id event. + // So, just ignore it and return empty list. + LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() + " and repl policy " + rqst.getReplPolicy()); - throw new RuntimeException("This should never happen for txnIds: " + txnIds); + return new AllocateTableWriteIdsResponse(txnToWriteIds); } } else { assert (!rqst.isSetSrcTxnToWriteIdList()); diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index abbcda3..8caf417 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.util.List; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -32,7 +33,6 @@ import static org.junit.Assert.assertEquals; - /** * A wrapper around {@link ObjectStore} that allows us to inject custom behaviour * on to some of the methods for testing. @@ -208,4 +208,14 @@ public void createFunction(Function func) throws InvalidObjectException, MetaExc } return super.addForeignKeys(fks); } + + @Override + public boolean alterDatabase(String catalogName, String dbname, Database db) + throws NoSuchObjectException, MetaException { + if (callerVerifier != null) { + CallerArguments args = new CallerArguments(dbname); + callerVerifier.apply(args); + } + return super.alterDatabase(catalogName, dbname, db); + } }