diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 0a7261c..ac06b69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -52,6 +52,8 @@ private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); + private transient String hashMapKey; + private transient ObjectCache cache; protected transient HashMapWrapper[] mapJoinTables; protected static MapJoinMetaData metadata = new MapJoinMetaData(); @@ -82,6 +84,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + hashMapKey = "__HASH_MAP_"+this.getOperatorId(); + cache = ObjectCacheFactory.getCache(hconf); + metadataValueTag = new int[numAliases]; for (int pos = 0; pos < numAliases; pos++) { metadataValueTag[pos] = -1; @@ -91,22 +96,29 @@ protected void initializeOp(Configuration hconf) throws HiveException { int tagLen = conf.getTagLength(); - mapJoinTables = new HashMapWrapper[tagLen]; rowContainerMap = new MapJoinRowContainer[tagLen]; - // initialize the hash tables for other tables - for (int pos = 0; pos < numAliases; pos++) { - if (pos == posBigTable) { - continue; - } - HashMapWrapper hashTable = new HashMapWrapper(); + mapJoinTables = (HashMapWrapper[]) cache.retrieve(hashMapKey); + hashTblInitedOnce = true; - mapJoinTables[pos] = hashTable; - MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); - rowContainerMap[pos] = rowContainer; - } + if (mapJoinTables == null) { + mapJoinTables = new HashMapWrapper[tagLen]; - hashTblInitedOnce = false; + // initialize the hash tables for other tables + for (int pos = 0; pos < numAliases; pos++) { + if (pos == posBigTable) { + continue; + } + + HashMapWrapper hashTable = new HashMapWrapper(); + + mapJoinTables[pos] = hashTable; + MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); + rowContainerMap[pos] = rowContainer; + } + + hashTblInitedOnce = false; + } } @Override @@ -197,6 +209,7 @@ private void loadHashTable() throws HiveException { LOG.error("Load Distributed Cache Error", e); throw new HiveException(e); } + cache.cache(hashMapKey, mapJoinTables); } // Load the hash table diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java new file mode 100644 index 0000000..f2b374c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +/** + * ObjectCache. Interface for maintaining objects associated with a task. + */ +public interface ObjectCache { + /** + * Add an object to the cache + * @param key + * @param value + */ + public void cache(String key, Object value); + + /** + * Retrieve object from cache. + * @param key + * @return the last cached object with the key, null if none. + */ + public Object retrieve(String key); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java new file mode 100644 index 0000000..953b231 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * ObjectCacheFactory returns the appropriate cache depending on settings in + * the hive conf. + */ +public class ObjectCacheFactory { + + private ObjectCacheFactory() { + // avoid instantiation + } + + /** + * Returns the appropriate cache + */ + public static ObjectCache getCache(Configuration conf) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) { + return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(); + } else { + return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index d1e82a2..b979457 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -57,6 +59,7 @@ */ public class ExecMapper extends MapReduceBase implements Mapper { + private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; private Map fetchOperators; private OutputCollector oc; @@ -92,11 +95,20 @@ public void configure(JobConf job) { } catch (Exception e) { l4j.info("cannot get classpath: " + e.getMessage()); } + + setDone(false); + + ObjectCache cache = ObjectCacheFactory.getCache(job); + try { jc = job; execContext.setJc(jc); // create map and fetch operators - MapWork mrwork = Utilities.getMapWork(job); + MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); + if (mrwork == null) { + mrwork = Utilities.getMapWork(job); + cache.cache(PLAN_KEY, mrwork); + } mo = new MapOperator(); mo.setConf(mrwork); // initialize map operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 2578907..9576e06 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -30,6 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; @@ -64,6 +66,8 @@ */ public class ExecReducer extends MapReduceBase implements Reducer { + private static final String PLAN_KEY = "__REDUCE_PLAN__"; + private JobConf jc; private OutputCollector oc; private Operator reducer; @@ -112,7 +116,14 @@ public void configure(JobConf job) { l4j.info("cannot get classpath: " + e.getMessage()); } jc = job; - ReduceWork gWork = Utilities.getReduceWork(job); + + ObjectCache cache = ObjectCacheFactory.getCache(jc); + ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY); + if (gWork == null) { + gWork = Utilities.getReduceWork(job); + cache.cache(PLAN_KEY, gWork); + } + reducer = gWork.getReducer(); reducer.setParentOperators(null); // clear out any parents as reducer is the // root diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java new file mode 100644 index 0000000..79736ef --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.mr; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +/** + * ObjectCache. No-op implementation on MR we don't have a means to reuse + * Objects between runs of the same task. + * + */ +public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { + + private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); + + @Override + public void cache(String key, Object value) { + LOG.info("Ignoring cache key: "+key); + } + + @Override + public Object retrieve(String key) { + LOG.info("Ignoring retrieval request: "+key); + return null; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f954bf7..f38f786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -578,7 +578,7 @@ public static Vertex createVertex(JobConf conf, BaseWork work, * createTezDir creates a temporary directory in the scratchDir folder to * be used with Tez. Assumes scratchDir exists. */ - public static Path createTezDir(Path scratchDir, Configuration conf) + public static Path createTezDir(Path scratchDir, Configuration conf) throws IOException { Path tezDir = getTezDir(scratchDir); FileSystem fs = tezDir.getFileSystem(conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java new file mode 100644 index 0000000..8f92d3e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle; +import org.apache.tez.engine.common.objectregistry.ObjectRegistry; +import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory; + + +/** + * ObjectCache. Tez implementation based on the tez object registry. + * + */ +public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { + + private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); + private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry(); + + @Override + public void cache(String key, Object value) { + LOG.info("Adding " + key + " to cache"); + registry.add(ObjectLifeCycle.VERTEX, key, value); + } + + @Override + public Object retrieve(String key) { + Object o = registry.get(key); + LOG.info("Found " + key + " in cache with value: " + o); + return o; + } +}