diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 3aec46be51..f4c7fa4b30 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -328,11 +328,17 @@ public void run() { private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { LlapRegistryService registryService = LlapRegistryService.getClient(job); - String host = llapSplit.getLocations()[0]; + LlapServiceInstance serviceInstance = null; + String[] hosts = llapSplit.getLocations(); + if (hosts != null && hosts.length > 0) { + String host = llapSplit.getLocations()[0]; + serviceInstance = getServiceInstanceForHost(registryService, host); + if (serviceInstance == null) { + LOG.info("No service instances found for " + host + " in registry."); + } + } - LlapServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); if (serviceInstance == null) { - LOG.info("No service instances found for " + host + " in registry."); serviceInstance = getServiceInstanceRandom(registryService); if (serviceInstance == null) { throw new IOException("No service instances found in registry"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index bda2af3a04..cf6eb275f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4327,7 +4327,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I List srcs = new ArrayList<>(), tgts = new ArrayList<>(); long mmWriteId = 0; try { - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); + HiveTxnManager txnManager = getTxnMgr(); if (txnManager.isTxnOpen()) { mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName()); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index a8d851fd81..1e8857b258 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -645,4 +646,7 @@ public QueryState getQueryState() { return queryState; } + public HiveTxnManager getTxnMgr() { + return driverContext.getCtx().getHiveTxnManager(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6b333d7184..6f217cfeee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -73,7 +73,7 @@ protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); Context context = new Context(conf, getHive(), work.sessionStateLineageState, - work.currentTransactionId); + work.currentTransactionId, driverContext.getCtx()); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use @@ -127,7 +127,7 @@ a database ( directory ) new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); TableEvent tableEvent = (TableEvent) next; LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), - tableContext, loadTaskTracker); + tableContext, loadTaskTracker, getTxnMgr()); tableTracker = loadTable.tasks(); if (!scope.database) { scope.rootTasks.addAll(tableTracker.tasks()); @@ -145,7 +145,7 @@ a database ( directory ) // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext); + work.dbNameToLoadIn, tableContext, getTxnMgr()); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); @@ -163,7 +163,7 @@ a database ( directory ) work.tableNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), getTxnMgr()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index 60c85f58e5..de17d705f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -81,7 +81,7 @@ public TaskTracker tasks() throws IOException, SemanticException { tasks.addAll(pkHandler.handle( new MessageHandler.Context( dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + context.hiveDb, context.nestedContext, LOG))); } if (uksString != null && !uksString.isEmpty()) { @@ -92,7 +92,7 @@ public TaskTracker tasks() throws IOException, SemanticException { tasks.addAll(ukHandler.handle( new MessageHandler.Context( dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + context.hiveDb, context.nestedContext, LOG))); } if (nnsString != null && !nnsString.isEmpty()) { @@ -103,7 +103,7 @@ public TaskTracker tasks() throws IOException, SemanticException { tasks.addAll(nnHandler.handle( new MessageHandler.Context( dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + context.hiveDb, context.nestedContext, LOG))); } if (fksString != null && !fksString.isEmpty()) { @@ -114,7 +114,7 @@ public TaskTracker tasks() throws IOException, SemanticException { tasks.addAll(fkHandler.handle( new MessageHandler.Context( dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + context.hiveDb, context.nestedContext, LOG))); } tasks.forEach(tracker::addTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index bc7d0ad0b9..c100344421 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -75,7 +75,7 @@ public TaskTracker tasks() throws IOException, SemanticException { List> tasks = handler.handle( new MessageHandler.Context( dbNameToLoadIn, null, fromPath.toString(), null, null, context.hiveConf, - context.hiveDb, null, LOG) + context.hiveDb, context.nestedContext, LOG) ); createFunctionReplLogTask(tasks, handler.getFunctionName()); tasks.forEach(tracker::addTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 06adc64727..a42c299537 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -76,16 +77,19 @@ private final ImportTableDesc tableDesc; private Table table; + private final HiveTxnManager txnMgr; public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn, - TableContext tableContext) throws HiveException, IOException { - this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null); + TableContext tableContext, + HiveTxnManager txnMgr) throws HiveException, IOException { + this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, txnMgr); } public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter, TableEvent event, String dbNameToLoadIn, - AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException { + AddPartitionDesc lastReplicatedPartition, + HiveTxnManager txnMgr) throws HiveException, IOException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; @@ -95,6 +99,7 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn)); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + this.txnMgr = txnMgr; } private String location() throws MetaException, HiveException { @@ -244,7 +249,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, - SessionState.get().getTxnMgr().getCurrentTxnId() + txnMgr.getCurrentTxnId() ); loadTableWork.setInheritTableSpecs(false); MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 1395027159..ddb26e529e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; @@ -65,15 +66,17 @@ private final TableContext tableContext; private final TaskTracker tracker; private final TableEvent event; + private final HiveTxnManager txnMgr; public LoadTable(TableEvent event, Context context, ReplLogger replLogger, - TableContext tableContext, TaskTracker limiter) + TableContext tableContext, TaskTracker limiter, HiveTxnManager txnMgr) throws SemanticException, IOException { this.event = event; this.context = context; this.replLogger = replLogger; this.tableContext = tableContext; this.tracker = new TaskTracker(limiter); + this.txnMgr = txnMgr; } private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException { @@ -230,7 +233,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, //todo: what is the point of this? If this is for replication, who would have opened a txn? - SessionState.get().getTxnMgr().getCurrentTxnId() + txnMgr.getCurrentTxnId() ); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index bb51f36a25..6fbc6573bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -28,6 +28,7 @@ public final Hive hiveDb; public final Warehouse warehouse; public final PathInfo pathInfo; + public final org.apache.hadoop.hive.ql.Context nestedContext; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -39,12 +40,14 @@ public Context(HiveConf hiveConf, Hive hiveDb, - LineageState lineageState, long currentTransactionId) throws MetaException { + LineageState lineageState, long currentTransactionId, + org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); this.pathInfo = new PathInfo(hiveConf); sessionStateLineageState = lineageState; this.currentTransactionId = currentTransactionId; + this.nestedContext = nestedContext; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 7a7bdea89d..062df0649a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2070,7 +2070,7 @@ public void visit(RelNode node, int ordinal, RelNode parent) { final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY); ValidTxnWriteIdList txnWriteIds = null; if (validTxnsList != null && !validTxnsList.isEmpty()) { - txnWriteIds = SessionState.get().getTxnMgr().getValidWriteIds(tablesUsed, validTxnsList); + txnWriteIds = getTxnMgr().getValidWriteIds(tablesUsed, validTxnsList); } if (mvRebuildMode != MaterializationRebuildMode.NONE) { // We only retrieve the materialization corresponding to the rebuild. In turn, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index f38b0bc546..b2497476cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3545,12 +3545,12 @@ private void handleTransactionalTable(Table tab, AddPartitionDesc addPartitionDe //so that we only allocate a writeId only if actually adding data // (vs. adding a partition w/o data) try { - writeId = SessionState.get().getTxnMgr().getTableWriteId(tab.getDbName(), + writeId = getTxnMgr().getTableWriteId(tab.getDbName(), tab.getTableName()); } catch (LockException ex) { throw new SemanticException("Failed to allocate the write id", ex); } - stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + stmtId = getTxnMgr().getStmtIdAndIncrement(); } LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), Utilities.getTableDesc(tab), desc.getPartSpec(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 8b639f7922..da6d4f1d3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -152,7 +153,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), - null); + null, getTxnMgr()); } catch (SemanticException e) { throw e; @@ -198,7 +199,7 @@ public static boolean prepareImport(boolean isImportCmd, String parsedLocation, String parsedTableName, String overrideDBName, LinkedHashMap parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, - UpdatedMetaDataTracker updatedMetadata + UpdatedMetaDataTracker updatedMetadata, HiveTxnManager txnMgr ) throws IOException, MetaException, HiveException, URISyntaxException { // initialize load path @@ -329,7 +330,7 @@ public static boolean prepareImport(boolean isImportCmd, || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) { // Explain plan doesn't open a txn and hence no need to allocate write id. if (x.getCtx().getExplainConfig() == null) { - writeId = SessionState.get().getTxnMgr().getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); + writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); } } int stmtId = 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index e49089b91e..df2098b0dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -357,12 +357,12 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { int stmtId = -1; if (AcidUtils.isTransactionalTable(ts.tableHandle)) { try { - writeId = SessionState.get().getTxnMgr().getTableWriteId(ts.tableHandle.getDbName(), + writeId = getTxnMgr().getTableWriteId(ts.tableHandle.getDbName(), ts.tableHandle.getTableName()); } catch (LockException ex) { throw new SemanticException("Failed to allocate the write id", ex); } - stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); + stmtId = getTxnMgr().getStmtIdAndIncrement(); } // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MaterializedViewRebuildSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MaterializedViewRebuildSemanticAnalyzer.java index e5af95b121..7a3c16390c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MaterializedViewRebuildSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MaterializedViewRebuildSemanticAnalyzer.java @@ -80,7 +80,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // Acquire lock for the given materialized view. Only one rebuild per materialized // view can be triggered at a given time, as otherwise we might produce incorrect // results if incremental maintenance is triggered. - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); + HiveTxnManager txnManager = getTxnMgr(); LockState state; try { state = txnManager.acquireMaterializationRebuildLock( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 79b2e48ee2..f7c78e271f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -386,7 +386,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -417,7 +417,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); // // for (FileStatus dir : dirsInLoadPath) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 10982ddbd1..e2a464ec0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7167,7 +7167,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Map partSpec = null; boolean isMmTable = false, isMmCtas = false; Long writeId = null; - HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); + HiveTxnManager txnMgr = getTxnMgr(); switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index 3ccd639d62..ef4a901b8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -92,5 +93,13 @@ ReplicationSpec eventOnlyReplicationSpec() throws SemanticException { String eventId = dmd.getEventTo().toString(); return new ReplicationSpec(eventId, eventId); } + + public org.apache.hadoop.hive.ql.Context getNestedContext() { + return nestedContext; + } + + public HiveTxnManager getTxnMgr() { + return nestedContext.getHiveTxnManager(); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 4cd75d8128..3a29167a94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -41,7 +41,7 @@ // Also, REPL LOAD doesn't support external table and hence no location set as well. ImportSemanticAnalyzer.prepareImport(false, false, false, false, (context.precursor != null), null, context.tableName, context.dbName, - null, context.location, x, updatedMetadata); + null, context.location, x, updatedMetadata, context.getTxnMgr()); return importTasks; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6003ced27e..993357d3a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -459,6 +459,13 @@ public synchronized HiveTxnManager initTxnMgr(HiveConf conf) throws LockExceptio return txnMgr; } + /** + * Get the transaction manager for the current SessionState. + * Note that the Driver can be initialized with a different txn manager than the SessionState's + * txn manager (HIVE-17482), and so it is preferable to use the txn manager propagated down from + * the Driver as opposed to calling this method. + * @return transaction manager for the current SessionState + */ public HiveTxnManager getTxnMgr() { return txnMgr; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index fe570f0f8e..cae02a9431 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -249,7 +249,7 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); boolean needsCleanup = true; try { - CommandProcessorResponse cpr = driver.compileAndRespond(query, true); + CommandProcessorResponse cpr = driver.compileAndRespond(query, false); if (cpr.getResponseCode() != 0) { throw new HiveException("Failed to compile query: " + cpr.getException()); } @@ -508,6 +508,10 @@ public String toString() { private SplitLocationInfo[] makeLocationHints(TaskLocationHint hint) { Set hosts = hint.getHosts(); + if (hosts == null) { + LOG.warn("No hosts"); + return new SplitLocationInfo[0]; + } if (hosts.size() != 1) { LOG.warn("Bad # of locations: " + hosts.size()); }