diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1f8bc12fde..48ebc4f870 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -20,14 +20,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; @@ -42,11 +37,7 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.metastore.api.TxnType; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId; @@ -54,19 +45,13 @@ import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; -import org.apache.hadoop.hive.ql.hooks.Entity; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lock.CompileLock; import org.apache.hadoop.hive.ql.lock.CompileLockFactory; -import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -91,7 +76,6 @@ import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; -import org.apache.hive.common.util.TxnIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,12 +93,13 @@ private int maxRows = 100; private ByteStream.Output bos = new ByteStream.Output(); - private Context context; private final DriverContext driverContext; - private TaskQueue taskQueue; + private final DriverState driverState = new DriverState(); private final List hiveLocks = new ArrayList(); + private final ValidTxnManager validTxnManager; - private DriverState driverState = new DriverState(); + private Context context; + private TaskQueue taskQueue; @Override public Schema getSchema() { @@ -158,11 +143,6 @@ public Driver(QueryState queryState, QueryInfo queryInfo) { this(queryState, queryInfo, null); } - public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnManager) { - driverContext = new DriverContext(queryState, queryInfo, new HookRunner(queryState.getConf(), CONSOLE), - txnManager); - } - public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnManager, ValidWriteIdList compactionWriteIds, long compactorTxnId) { this(queryState, queryInfo, txnManager); @@ -170,6 +150,12 @@ public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnMana driverContext.setCompactorTxnId(compactorTxnId); } + public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnManager) { + driverContext = new DriverContext(queryState, queryInfo, new HookRunner(queryState.getConf(), CONSOLE), + txnManager); + validTxnManager = new ValidTxnManager(this, driverContext); + } + /** * Compile a new query, but potentially reset taskID counter. Not resetting task counter * is useful for generating re-entrant QL queries. @@ -300,104 +286,6 @@ private void setTriggerContext(String queryId) { context.setWmContext(wmContext); } - // Checks whether txn list has been invalidated while planning the query. - // This would happen if query requires exclusive/semi-shared lock, and there - // has been a committed transaction on the table over which the lock is - // required. - private boolean isValidTxnListState() throws LockException { - // 1) Get valid txn list. - String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if (txnString == null) { - // Not a transactional op, nothing more to do - return true; - } - ValidTxnList currentTxnList = driverContext.getTxnManager().getValidTxns(); - String currentTxnString = currentTxnList.toString(); - if (currentTxnString.equals(txnString)) { - // Still valid, nothing more to do - return true; - } - // 2) Get locks that are relevant: - // - Exclusive for INSERT OVERWRITE. - // - Semi-shared for UPDATE/DELETE. - if (context.getHiveLocks() == null || context.getHiveLocks().isEmpty()) { - // Nothing to check - return true; - } - Set nonSharedLocks = new HashSet<>(); - for (HiveLock lock : context.getHiveLocks()) { - if (lock.mayContainComponents()) { - // The lock may have multiple components, e.g., DbHiveLock, hence we need - // to check for each of them - for (LockComponent lckCmp : lock.getHiveLockComponents()) { - // We only consider tables for which we hold either an exclusive - // or a shared write lock - if ((lckCmp.getType() == LockType.EXCLUSIVE || - lckCmp.getType() == LockType.SHARED_WRITE) && - lckCmp.getTablename() != null && lckCmp.getDbname() != DbTxnManager.GLOBAL_LOCKS) { - nonSharedLocks.add( - TableName.getDbTable( - lckCmp.getDbname(), lckCmp.getTablename())); - } - } - } else { - // The lock has a single components, e.g., SimpleHiveLock or ZooKeeperHiveLock. - // Pos 0 of lock paths array contains dbname, pos 1 contains tblname - if ((lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || - lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) && - lock.getHiveLockObject().getPaths().length == 2) { - nonSharedLocks.add( - TableName.getDbTable( - lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); - } - } - } - // 3) Get txn tables that are being written - String txnWriteIdListStr = driverContext.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); - if (txnWriteIdListStr == null || txnWriteIdListStr.length() == 0) { - // Nothing to check - return true; - } - ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListStr); - Map writtenTables = getWrittenTables(driverContext.getPlan()); - - ValidTxnWriteIdList currentTxnWriteIds = - driverContext.getTxnManager().getValidWriteIds( - writtenTables.entrySet().stream() - .filter(e -> AcidUtils.isTransactionalTable(e.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()), - currentTxnString); - - for (Map.Entry tableInfo : writtenTables.entrySet()) { - String fullQNameForLock = TableName.getDbTable( - tableInfo.getValue().getDbName(), - MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName())); - if (nonSharedLocks.contains(fullQNameForLock)) { - // Check if table is transactional - if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { - // Check that write id is still valid - if (!TxnIdUtils.checkEquivalentWriteIds( - txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()), - currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) { - // Write id has changed, it is not valid anymore, - // we need to recompile - return false; - } - } - nonSharedLocks.remove(fullQNameForLock); - } - } - if (!nonSharedLocks.isEmpty()) { - throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + - " been visited when trying to validate the locks from query tables.\n" + - "Tables: " + writtenTables.keySet() + "\n" + - "Remaining locks after check: " + nonSharedLocks); - } - // It passes the test, it is valid - return true; - } - @Override public HiveConf getConf() { return driverContext.getConf(); @@ -419,110 +307,6 @@ public FetchTask getFetchTask() { return driverContext.getFetchTask(); } - // Write the current set of valid write ids for the operated acid tables into the conf file so - // that it can be read by the input format. - private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { - String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if ((txnString == null) || (txnString.isEmpty())) { - throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + - JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); - } - List txnTables = getTransactionalTables(driverContext.getPlan()); - ValidTxnWriteIdList txnWriteIds = null; - if (driverContext.getCompactionWriteIds() != null) { - /** - * This is kludgy: here we need to read with Compactor's snapshot/txn - * rather than the snapshot of the current {@code txnMgr}, in effect - * simulating a "flashback query" but can't actually share compactor's - * txn since it would run multiple statements. See more comments in - * {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} where it start - * the compactor txn*/ - if (txnTables.size() != 1) { - throw new LockException("Unexpected tables in compaction: " + txnTables); - } - txnWriteIds = new ValidTxnWriteIdList(driverContext.getCompactorTxnId()); - txnWriteIds.addTableValidWriteIdList(driverContext.getCompactionWriteIds()); - } else { - txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); - } - if (driverContext.getTxnType() == TxnType.READ_ONLY && !getWrittenTables(driverContext.getPlan()).isEmpty()) { - throw new IllegalStateException(String.format( - "Inferred transaction type '%s' doesn't conform to the actual query string '%s'", - driverContext.getTxnType(), driverContext.getQueryState().getQueryString())); - } - - String writeIdStr = txnWriteIds.toString(); - driverContext.getConf().set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); - if (driverContext.getPlan().getFetchTask() != null) { - /** - * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which - * initializes JobConf in FetchOperator before recordValidTxns() but this has to be done - * after locks are acquired to avoid race conditions in ACID. - * This case is supported only for single source query. - */ - Operator source = driverContext.getPlan().getFetchTask().getWork().getSource(); - if (source instanceof TableScanOperator) { - TableScanOperator tsOp = (TableScanOperator)source; - String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(), - tsOp.getConf().getTableName()); - ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName); - if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) { - throw new IllegalStateException("ACID table: " + fullTableName - + " is missing from the ValidWriteIdList config: " + writeIdStr); - } - if (writeIdList != null) { - driverContext.getPlan().getFetchTask().setValidWriteIdList(writeIdList.toString()); - } - } - } - LOG.debug("Encoding valid txn write ids info " + writeIdStr + " txnid:" + txnMgr.getCurrentTxnId()); - return txnWriteIds; - } - - // Make the list of transactional tables that are read or written by current txn - private List getTransactionalTables(QueryPlan plan) { - Map tables = new HashMap<>(); - plan.getInputs().forEach( - input -> addTableFromEntity(input, tables) - ); - plan.getOutputs().forEach( - output -> addTableFromEntity(output, tables) - ); - return tables.entrySet().stream() - .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - - // Make the map of tables written by current txn - private Map getWrittenTables(QueryPlan plan) { - Map tables = new HashMap<>(); - plan.getOutputs().forEach( - output -> addTableFromEntity(output, tables) - ); - return tables; - } - - private void addTableFromEntity(Entity entity, Map tables) { - Table tbl; - switch (entity.getType()) { - case TABLE: { - tbl = entity.getTable(); - break; - } - case PARTITION: - case DUMMYPARTITION: { - tbl = entity.getPartition().getTable(); - break; - } - default: { - return; - } - } - String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - tables.put(fullTableName, tbl); - } - /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -607,7 +391,7 @@ private void acquireLocks() throws CommandProcessorException { } if (driverContext.getPlan().hasAcidResourcesInQuery() || hasAcidDdl) { - recordValidWriteIds(driverContext.getTxnManager()); + validTxnManager.recordValidWriteIds(); } } catch (Exception e) { @@ -882,7 +666,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command lockAndRespond(); try { - if (!isValidTxnListState()) { + if (!validTxnManager.isValidTxnListState()) { LOG.info("Compiling after acquiring locks"); // Snapshot was outdated when locks were acquired, hence regenerate context, // txn list and retry @@ -897,7 +681,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, driverContext.getTxnManager().getValidTxns().toString()); if (driverContext.getPlan().hasAcidResourcesInQuery()) { - recordValidWriteIds(driverContext.getTxnManager()); + validTxnManager.recordValidWriteIds(); } if (!alreadyCompiled) { @@ -908,7 +692,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime()); } - if (!isValidTxnListState()) { + if (!validTxnManager.isValidTxnListState()) { // Throw exception throw handleHiveException(new HiveException("Operation could not be executed"), 14); } diff --git ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java new file mode 100644 index 0000000000..4885e437aa --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hive.common.util.TxnIdUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helps the Driver finding the valid transactions/write ids, and record them for the plan. + */ +class ValidTxnManager { + private static final String CLASS_NAME = Driver.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + private final Driver driver; + private final DriverContext driverContext; + + ValidTxnManager(Driver driver, DriverContext driverContext) { + this.driver = driver; + this.driverContext = driverContext; + } + + /** + * Checks whether txn list has been invalidated while planning the query. + * This would happen if query requires exclusive/semi-shared lock, and there has been a committed transaction + * on the table over which the lock is required. + */ + boolean isValidTxnListState() throws LockException { + // 1) Get valid txn list. + String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (txnString == null) { + return true; // Not a transactional op, nothing more to do + } + + String currentTxnString = driverContext.getTxnManager().getValidTxns().toString(); + if (currentTxnString.equals(txnString)) { + return true; // Still valid, nothing more to do + } + + // 2) Get locks that are relevant: + // - Exclusive for INSERT OVERWRITE. + // - Semi-shared for UPDATE/DELETE. + Set nonSharedLockedTables = getNonSharedLockedTables(); + if (nonSharedLockedTables == null) { + return true; // Nothing to check + } + + // 3) Get txn tables that are being written + String txnWriteIdListString = driverContext.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); + if (StringUtils.isEmpty(txnWriteIdListString)) { + return true; // Nothing to check + } + + return checkWriteIds(currentTxnString, nonSharedLockedTables, txnWriteIdListString); + } + + private Set getNonSharedLockedTables() { + if (CollectionUtils.isEmpty(driver.getContext().getHiveLocks())) { + return null; // Nothing to check + } + + Set nonSharedLockedTables = new HashSet<>(); + for (HiveLock lock : driver.getContext().getHiveLocks()) { + if (lock.mayContainComponents()) { + // The lock may have multiple components, e.g., DbHiveLock, hence we need to check for each of them + for (LockComponent lockComponent : lock.getHiveLockComponents()) { + // We only consider tables for which we hold either an exclusive or a shared write lock + if ((lockComponent.getType() == LockType.EXCLUSIVE || lockComponent.getType() == LockType.SHARED_WRITE) && + lockComponent.getTablename() != null && lockComponent.getDbname() != DbTxnManager.GLOBAL_LOCKS) { + nonSharedLockedTables.add(TableName.getDbTable(lockComponent.getDbname(), lockComponent.getTablename())); + } + } + } else { + // The lock has a single components, e.g., SimpleHiveLock or ZooKeeperHiveLock. + // Pos 0 of lock paths array contains dbname, pos 1 contains tblname + if ((lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) && + lock.getHiveLockObject().getPaths().length == 2) { + nonSharedLockedTables.add( + TableName.getDbTable(lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); + } + } + } + return nonSharedLockedTables; + } + + private boolean checkWriteIds(String currentTxnString, Set nonSharedLockedTables, String txnWriteIdListString) + throws LockException { + ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListString); + Map writtenTables = getTables(false, true); + + ValidTxnWriteIdList currentTxnWriteIds = driverContext.getTxnManager().getValidWriteIds( + getTransactionalTables(writtenTables), currentTxnString); + + for (Map.Entry tableInfo : writtenTables.entrySet()) { + String fullQNameForLock = TableName.getDbTable(tableInfo.getValue().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName())); + if (nonSharedLockedTables.contains(fullQNameForLock)) { + // Check if table is transactional + if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { + // Check that write id is still valid + if (!TxnIdUtils.checkEquivalentWriteIds(txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()), + currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) { + // Write id has changed, it is not valid anymore, we need to recompile + return false; + } + } + nonSharedLockedTables.remove(fullQNameForLock); + } + } + + if (!nonSharedLockedTables.isEmpty()) { + throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + + " been visited when trying to validate the locks from query tables.\n" + + "Tables: " + writtenTables.keySet() + "\n" + + "Remaining locks after check: " + nonSharedLockedTables); + } + + return true; // It passes the test, it is valid + } + + /** + * Write the current set of valid write ids for the operated acid tables into the configuration so + * that it can be read by the input format. + */ + ValidTxnWriteIdList recordValidWriteIds() throws LockException { + String txnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (StringUtils.isEmpty(txnString)) { + throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + + JavaUtils.txnIdToString(driverContext.getTxnManager().getCurrentTxnId())); + } + + ValidTxnWriteIdList txnWriteIds = getTxnWriteIds(txnString); + setValidWriteIds(txnWriteIds); + + LOG.debug("Encoding valid txn write ids info {} txnid: {}", txnWriteIds.toString(), + driverContext.getTxnManager().getCurrentTxnId()); + return txnWriteIds; + } + + private ValidTxnWriteIdList getTxnWriteIds(String txnString) throws LockException { + List txnTables = getTransactionalTables(getTables(true, true)); + ValidTxnWriteIdList txnWriteIds = null; + if (driverContext.getCompactionWriteIds() != null) { + // This is kludgy: here we need to read with Compactor's snapshot/txn rather than the snapshot of the current + // {@code txnMgr}, in effect simulating a "flashback query" but can't actually share compactor's txn since it + // would run multiple statements. See more comments in {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} + // where it start the compactor txn*/ + if (txnTables.size() != 1) { + throw new LockException("Unexpected tables in compaction: " + txnTables); + } + txnWriteIds = new ValidTxnWriteIdList(driverContext.getCompactorTxnId()); + txnWriteIds.addTableValidWriteIdList(driverContext.getCompactionWriteIds()); + } else { + txnWriteIds = driverContext.getTxnManager().getValidWriteIds(txnTables, txnString); + } + if (driverContext.getTxnType() == TxnType.READ_ONLY && !getTables(false, true).isEmpty()) { + throw new IllegalStateException(String.format( + "Inferred transaction type '%s' doesn't conform to the actual query string '%s'", + driverContext.getTxnType(), driverContext.getQueryState().getQueryString())); + } + return txnWriteIds; + } + + private void setValidWriteIds(ValidTxnWriteIdList txnWriteIds) { + driverContext.getConf().set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString()); + if (driverContext.getPlan().getFetchTask() != null) { + // This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which initializes JobConf + // in FetchOperator before recordValidTxns() but this has to be done after locks are acquired to avoid race + // conditions in ACID. This case is supported only for single source query. + Operator source = driverContext.getPlan().getFetchTask().getWork().getSource(); + if (source instanceof TableScanOperator) { + TableScanOperator tsOp = (TableScanOperator)source; + String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(), + tsOp.getConf().getTableName()); + ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName); + if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) { + throw new IllegalStateException(String.format( + "ACID table: %s is missing from the ValidWriteIdList config: %s", fullTableName, txnWriteIds.toString())); + } + if (writeIdList != null) { + driverContext.getPlan().getFetchTask().setValidWriteIdList(writeIdList.toString()); + } + } + } + } + + private Map getTables(boolean inputNeeded, boolean outputNeeded) { + Map tables = new HashMap<>(); + if (inputNeeded) { + driverContext.getPlan().getInputs().forEach(input -> addTableFromEntity(input, tables)); + } + if (outputNeeded) { + driverContext.getPlan().getOutputs().forEach(output -> addTableFromEntity(output, tables)); + } + return tables; + } + + private void addTableFromEntity(Entity entity, Map tables) { + Table table; + switch (entity.getType()) { + case TABLE: + table = entity.getTable(); + break; + case PARTITION: + case DUMMYPARTITION: + table = entity.getPartition().getTable(); + break; + default: + return; + } + String fullTableName = AcidUtils.getFullTableName(table.getDbName(), table.getTableName()); + tables.put(fullTableName, table); + } + + private List getTransactionalTables(Map tables) { + return tables.entrySet().stream() + .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } +}