diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ca2a1cd..7d6dec9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2031,6 +2031,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none", new StringSet("auto", "none", "all", "map"), "Chooses whether query fragments will run in container or in llap"), + LLAP_OBJECT_CACHE_ENABLED("hive.llap.object.cache.enabled", true, + "Cache objects (plans, hashtables, etc) in llap"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java index d3e5d90..5d48651 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -21,6 +21,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; +import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; +import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache; /** * ObjectCacheFactory returns the appropriate cache depending on settings in @@ -38,7 +40,11 @@ private ObjectCacheFactory() { public static ObjectCache getCache(Configuration conf) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (LlapIoProxy.isDaemon()) { // daemon - return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) { + return new org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache(); + } else { // no cache + return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); + } } else { // container return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java new file mode 100644 index 0000000..0ec4643 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * LlapObjectCache. Llap implementation for the shared object cache. + * + */ +public class LlapObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { + + private static final Log LOG = LogFactory.getLog(LlapObjectCache.class.getName()); + + private static final Cache registry + = CacheBuilder.newBuilder().softValues().build(); + + private static final Map locks + = new HashMap(); + + private static final ReentrantLock lock = new ReentrantLock(); + + private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); + private static final boolean isLogInfoEnabled = LOG.isInfoEnabled(); + + public LlapObjectCache() { + } + + @Override + public void release(String key) { + // nothing to do, soft references will clean themselves up + } + + @Override + public Object retrieve(String key, Callable fn) + throws HiveException { + + Object o = null; + ReentrantLock objectLock = null; + + lock.lock(); + try { + o = registry.getIfPresent(key); + if (o != null) { + if (isLogInfoEnabled) { + LOG.info("Found " + key + " in cache"); + } + return o; + } + + if (locks.containsKey(key)) { + objectLock = locks.get(key); + } else { + objectLock = new ReentrantLock(); + locks.put(key, objectLock); + } + } finally { + lock.unlock(); + } + + objectLock.lock(); + try{ + lock.lock(); + try { + o = registry.getIfPresent(key); + if (o != null) { + if (isLogInfoEnabled) { + LOG.info("Found " + key + " in cache"); + } + return o; + } + } finally { + lock.unlock(); + } + + try { + o = fn.call(); + } catch (Exception e) { + throw new HiveException(e); + } + + lock.lock(); + try { + if (isLogInfoEnabled) { + LOG.info("Caching new object for key: " + key); + } + + registry.put(key, o); + locks.remove(key); + } finally { + lock.unlock(); + } + } finally { + objectLock.unlock(); + } + return o; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 80906b9..c9bfba4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -72,26 +72,32 @@ public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private MapRecordSource[] sources; private final Map multiMRInputMap = new HashMap(); - private int position = 0; - MRInputLegacy legacyMRInput = null; - MultiMRInput mainWorkMultiMRInput = null; - private ExecMapperContext execContext = null; - private boolean abort = false; + private int position; + MRInputLegacy legacyMRInput; + MultiMRInput mainWorkMultiMRInput; + private ExecMapperContext execContext; + private boolean abort; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; - List mergeWorkList = null; - + List mergeWorkList; List cacheKeys; ObjectCache cache; public MapRecordProcessor(final JobConf jconf) throws Exception { - ObjectCache cache = ObjectCacheFactory.getCache(jconf); + cache = ObjectCacheFactory.getCache(jconf); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); cacheKeys = new ArrayList(); + } + + @Override + void init(final JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, + Map inputs, Map outputs) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); + super.init(jconf, processorContext, mrReporter, inputs, outputs); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - String key = queryId + MAP_PLAN_KEY; + String key = queryId + processorContext.getTaskVertexName() + MAP_PLAN_KEY; cacheKeys.add(key); // create map and fetch operators @@ -111,7 +117,7 @@ public Object call() { continue; } - key = queryId + prefix; + key = queryId + processorContext.getTaskVertexName() + prefix; cacheKeys.add(key); mergeWorkList.add( @@ -123,13 +129,6 @@ public Object call() { })); } } - } - - @Override - void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, - Map inputs, Map outputs) throws Exception { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); // Update JobConf using MRInput, info like filename comes via this legacyMRInput = getMRInput(inputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 071b144..baaf639 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -83,7 +83,7 @@ void init(final JobConf jconf, ProcessorContext processorContext, ObjectCache cache = ObjectCacheFactory.getCache(jconf); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - cacheKey = queryId + REDUCE_PLAN_KEY; + cacheKey = queryId + processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { public Object call() { return Utilities.getReduceWork(jconf); diff --git ql/src/test/results/clientpositive/tez/llapdecider.q.out ql/src/test/results/clientpositive/tez/llapdecider.q.out index 3f364dd..60b2fed 100644 --- ql/src/test/results/clientpositive/tez/llapdecider.q.out +++ ql/src/test/results/clientpositive/tez/llapdecider.q.out @@ -447,6 +447,7 @@ STAGE PLANS: value expressions: _col1 (type: string) Execution mode: llap Reducer 2 + Execution mode: llap Reduce Operator Tree: Merge Join Operator condition map: @@ -536,6 +537,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Reducer 2 + Execution mode: llap Reduce Operator Tree: Merge Join Operator condition map: @@ -976,6 +978,7 @@ STAGE PLANS: value expressions: _col1 (type: string) Execution mode: llap Reducer 2 + Execution mode: llap Reduce Operator Tree: Merge Join Operator condition map: