diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 298be76..97bfc99 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -32,12 +32,6 @@ public class LlapUtil { private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class); - public static String getDaemonLocalDirList(Configuration conf) { - String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS); - if (localDirList != null && !localDirList.isEmpty()) return localDirList; - return conf.get("yarn.nodemanager.local-dirs"); - } - public static UserGroupInformation loginWithKerberos( String principal, String keytabFile) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) return null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..145e247 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; @@ -435,7 +436,7 @@ public static void main(String[] args) throws Exception { int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); - String localDirList = LlapUtil.getDaemonLocalDirList(daemonConf); + String localDirList = HiveUtils.getLocalDirList(daemonConf); String[] localDirs = (localDirList == null || localDirList.isEmpty()) ? new String[0] : StringUtils.getTrimmedStrings(localDirList); int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 233f66b..e4a2b35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinRowBytesContainer; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -276,7 +277,7 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf, - RowContainer.getLocalDirsForSpillFiles(hconf)); + HiveUtils.getLocalDirList(hconf)); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index 893d265..e928719 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -32,13 +32,12 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.llap.LlapUtil; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -135,7 +134,7 @@ public RowContainer(int bs, Configuration jc, Reporter reporter this.size = 0; this.itrCursor = 0; this.addCursor = 0; - this.spillFileDirs = getLocalDirsForSpillFiles(jc); + this.spillFileDirs = HiveUtils.getLocalDirList(jc); this.numFlushedBlocks = 0; this.tmpFile = null; this.currentWriteBlock = (ROW[]) new ArrayList[blockSize]; @@ -151,11 +150,6 @@ public RowContainer(int bs, Configuration jc, Reporter reporter } } - public static String getLocalDirsForSpillFiles(Configuration conf) { - return LlapProxy.isDaemon() - ? LlapUtil.getDaemonLocalDirList(conf) : conf.get("yarn.nodemanager.local-dirs"); - } - private JobConf getLocalFSJobConfClone(Configuration jc) { if (this.jobCloneUsingLocalFs == null) { this.jobCloneUsingLocalFs = new JobConf(jc); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java index feb471a..a502641 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java @@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator; import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; @@ -38,6 +40,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * General collection of helper functions. @@ -451,4 +454,24 @@ public static String getUnparsedColumnNamesFromFieldSchema( } return sb.toString(); } + + public static String getLocalDirList(Configuration conf) { + String localDirList; + + if (LlapProxy.isDaemon()) { + localDirList = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_WORK_DIRS); + if (localDirList != null && !localDirList.isEmpty()) { + return localDirList; + } // otherwise, fall back to use tez work dirs + } + + if (conf.get("hive.execution.engine").equals("tez")) { + TezContext tezContext = (TezContext) TezContext.get(); + if (tezContext != null && tezContext.getTezProcessorContext() != null) { + return StringUtils.arrayToString(tezContext.getTezProcessorContext().getWorkDirs()); + } // otherwise fall back to return null, i.e. to use local tmp dir only + } + + return null; + } }