diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 01e1d01..b371b67 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1707,6 +1707,7 @@ "If hive (in tez mode only) cannot find a usable hive jar in \"hive.jar.directory\", \n" + "it will upload the hive jar to \"hive.user.install.directory/user.name\"\n" + "and use it to run queries."), + HIVE_CACHE_RUNTIME_JARS("hive.cache.runtime.jars", true, "Whether Hive should cache runtime jars in HDFS @ /user/$USER/.hiveJars/"), // Vectorization enabled HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false, diff --git ql/pom.xml ql/pom.xml index 3507411..6f335a4 100644 --- ql/pom.xml +++ ql/pom.xml @@ -605,32 +605,7 @@ - org.apache.hive:hive-common - org.apache.hive:hive-exec - org.apache.hive:hive-serde com.esotericsoftware.kryo:kryo - com.twitter:parquet-hadoop-bundle - org.apache.thrift:libthrift - commons-lang:commons-lang - org.apache.commons:commons-lang3 - org.jodd:jodd-core - org.json:json - org.apache.avro:avro - org.apache.avro:avro-mapred - org.apache.hive.shims:hive-shims-0.20 - org.apache.hive.shims:hive-shims-0.20S - org.apache.hive.shims:hive-shims-0.23 - org.apache.hive.shims:hive-shims-0.23 - org.apache.hive.shims:hive-shims-common - org.apache.hive.shims:hive-shims-common-secure - com.googlecode.javaewah:JavaEWAH - javolution:javolution - com.google.protobuf:protobuf-java - org.iq80.snappy:snappy - org.codehaus.jackson:jackson-core-asl - org.codehaus.jackson:jackson-mapper-asl - com.google.guava:guava - net.sf.opencsv:opencsv diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java new file mode 100644 index 0000000..d8be4c7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.net.URL; +import java.security.CodeSource; +import java.util.Set; + +import javaewah.EWAHCompressedBitmap; +import javolution.util.FastBitSet; + +import org.apache.avro.mapred.AvroMapper; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.shims.Hadoop20SShims; +import org.apache.hadoop.hive.shims.Hadoop20Shims; +import org.apache.hadoop.hive.shims.Hadoop23Shims; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.thrift.TException; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.iq80.snappy.Snappy; +import org.json.JSONArray; + +import parquet.io.ColumnIO; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; +public class HiveAuxClasspathBuilder { + private static final Log LOG = LogFactory.getLog(HiveAuxClasspathBuilder.class.getName()); + private static final Splitter AUX_PATH_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults(); + private static final Joiner AUX_PATH_JOINER = Joiner.on(",").skipNulls(); + private static final ImmutableSet JARS; + static { + // order below is the same as the old ant build and meven shade + ImmutableSet> clazzes = ImmutableSet.>builder() + .add(HiveConf.class) // org.apache.hive:hive-common + .add(HiveAuxClasspathBuilder.class) // org.apache.hive:hive-exec + .add(AbstractSerDe.class) // org.apache.hive:hive-serde + .add(ColumnIO.class) // com.twitter:parquet-hadoop + .add(TException.class) // org.apache.thrift:libthrift + .add(StringUtils.class) // commons-lang:commons-lang + .add(JSONArray.class) // org.json:json + .add(AvroMapper.class) // org.apache.avro:avro-mapred + .add(Hadoop20Shims.class) // org.apache.hive:hive-shims-0.20 + .add(Hadoop20SShims.class) // org.apache.hive:hive-shims-0.20S + .add(Hadoop23Shims.class) // org.apache.hive:hive-shims-0.23 + .add(HadoopShims.class) // org.apache.hive:hive-shims-common + .add(HadoopShimsSecure.class) // org.apache.hive:hive-shims-common-secure + .add(EWAHCompressedBitmap.class) // com.googlecode.javaewah:JavaEWAH + .add(FastBitSet.class) // javolution:javolution + .add(ByteString.class) // com.google.protobuf:protobuf-java + .add(Snappy.class) // org.iq80.snappy:snappy + .add(JsonFactory.class) // org.codehaus.jackson:jackson-core-asl + .add(ObjectMapper.class) // org.codehaus.jackson:jackson-mapper-asl + .add(ImmutableSet.class) // com.google.guava:guava + .build(); + ImmutableSet.Builder jarBuilder = ImmutableSet.builder(); + for (Class clazz : clazzes) { + CodeSource codeSource = clazz.getProtectionDomain().getCodeSource(); + if (codeSource == null) { + throw new IllegalStateException("Could not find code source for class " + clazz); + } else { + URL location = codeSource.getLocation(); + if (location == null) { + throw new IllegalStateException("Could not find jar location for class " + clazz); + } else { + jarBuilder.add(location.toExternalForm()); + } + } + } + JARS = jarBuilder.build(); + } + public static String getHiveAuxClasspath(Configuration conf, String addedJars) { + Set jars = Sets.newLinkedHashSet(JARS); + jars.addAll(Sets.newLinkedHashSet(AUX_PATH_SPLITTER.split(Strings.nullToEmpty(addedJars).trim()))); + jars.addAll(Sets.newLinkedHashSet(AUX_PATH_SPLITTER.split(Strings.nullToEmpty(HiveConf. + getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)).trim()))); + String result = AUX_PATH_JOINER.join(jars); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Aux Jars: " + result); + } + return result; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 98cf2a7..6378945 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.HiveAuxClasspathBuilder; import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -297,15 +298,23 @@ public int execute(DriverContext driverContext) { // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands // it - String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS); String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS); - if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) { - String allJars = StringUtils.isNotBlank(auxJars) ? (StringUtils.isNotBlank(addedJars) ? addedJars - + "," + auxJars - : auxJars) - : addedJars; + HiveAuxClasspathBuilder classpathBuilder = new HiveAuxClasspathBuilder(); + String allJars = classpathBuilder.getHiveAuxClasspath(job, addedJars); + boolean cacheJars = HiveConf.getBoolVar(job, ConfVars.HIVE_CACHE_RUNTIME_JARS); + if (StringUtils.isNotBlank(allJars)) { LOG.info("adding libjars: " + allJars); - initializeFiles("tmpjars", allJars); + if(cacheJars){ + JarCache jarCache = new JarCache(); + try { + jarCache.cacheJars(job, allJars); + } catch (Exception e) { + LOG.error("Problem caching jars in ExecDriver, falling back to un-cached jars", e); + initializeFiles("tmpjars", allJars); + } + } else { + initializeFiles("tmpjars", allJars); + } } // Transfer HIVEADDEDFILES to "tmpfiles" so hadoop understands it diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java new file mode 100644 index 0000000..3240441 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.mr; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Splitter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public class JarCache { + private static final Log LOG = LogFactory.getLog(JarCache.class); + private static final FsPermission PERMS = new FsPermission(FsAction.ALL, FsAction.NONE, + FsAction.NONE); + private static final Splitter AUX_PATH_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults(); + private static final Object staticLock = new Object(); + + /** + * Update the cached files access time once per day + */ + private static final long ONE_DAY = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); + /** + * Cache a max of 10,000 file hashes + */ + private static final Cache fileHashCache = CacheBuilder.newBuilder(). + maximumSize(10000).build(); + + @SuppressWarnings("deprecation") + public static void cacheJars(Configuration conf, String commaSeperatedJars) + throws Exception { + for (String jar : AUX_PATH_SPLITTER.split(commaSeperatedJars)) { + Path cachedPath = cacheSingleJar(conf, new Path(jar)); + LOG.info("Jar " + jar + " is being replaced with cached path " + cachedPath); + DistributedCache.addArchiveToClassPath(cachedPath, conf); + } + } + @SuppressWarnings("deprecation") + private static Path cacheSingleJar(Configuration conf, Path jar) + throws Exception { + if (ShimLoader.getHadoopShims().isLocalMode(conf)) { + return jar; + } + FileSystem sourceFs = jar.getFileSystem(conf); + UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); + Path installPrefix = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR)); + FileSystem destFs = installPrefix.getFileSystem(conf); + Path installDir = new Path(new Path(installPrefix, userName), ".hiveJars"); + destFs.mkdirs(installDir); + // lock to ensure there is no races, at least within a single jvm + synchronized (staticLock) { + CachedItem cachedItem = fileHashCache.getIfPresent(jar); + if (cachedItem == null) { + InputStream is = null; + try { + is = sourceFs.open(jar); + cachedItem = new CachedItem(DigestUtils.shaHex(is)); + fileHashCache.put(jar, cachedItem); + } catch (IOException e) { + throw new IOException("Error reading file " + jar, e); + } finally { + try { + if (is != null) { + is.close(); + } + } catch (IOException e) { + LOG.info("Error closing stream: " + jar, e); + } + } + } + String name = FilenameUtils.removeExtension(jar.getName()) + "-" + cachedItem.hash + ".jar"; + Path installPath = new Path(installDir, name); + if (destFs.exists(installPath) && + sourceFs.getFileStatus(jar).getLen() == destFs.getFileStatus(installPath).getLen()) { + long currentTime = System.currentTimeMillis(); + if (cachedItem.updateAccessTime(currentTime)) { + destFs.setTimes(installPath, -1, currentTime); + } + return installPath; + } else { + InputStream in = null; + OutputStream out = null; + try { + in = sourceFs.open(jar); + destFs.delete(installPath); + out = FileSystem.create(destFs, installPath, PERMS); + IOUtils.copyBytes(in, out, 4096); + LOG.info("Copied " + jar + " to " + installPath); + return installPath; + } catch (IOException e) { + throw new IOException("Error copying file " + jar + " to " + installPath, e); + } finally { + IOUtils.cleanup(LOG, in, out); + } + } + } + } + private static class CachedItem { + final String hash; + long lastUpdatedAccessTime; + CachedItem(String hash) { + this.hash = hash; + this.lastUpdatedAccessTime = 0; + } + boolean updateAccessTime(long currentTime) { + boolean result = currentTime - lastUpdatedAccessTime > ONE_DAY; + if (result) { + lastUpdatedAccessTime = currentTime; + } + return result; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 7775100..78ee2e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.HiveAuxClasspathBuilder; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -146,22 +147,21 @@ public int execute(DriverContext driverContext) { String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN); String hiveJar = conf.getJar(); - String libJarsOption; + String libJarsOption = " "; String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); conf.setVar(ConfVars.HIVEADDEDJARS, addedJars); - String auxJars = conf.getAuxJars(); - // Put auxjars and addedjars together into libjars - if (StringUtils.isEmpty(addedJars)) { - if (StringUtils.isEmpty(auxJars)) { - libJarsOption = " "; - } else { - libJarsOption = " -libjars " + auxJars + " "; - } - } else { - if (StringUtils.isEmpty(auxJars)) { - libJarsOption = " -libjars " + addedJars + " "; + String alljars = HiveAuxClasspathBuilder.getHiveAuxClasspath(conf, addedJars); + boolean cacheJars = HiveConf.getBoolVar(conf, ConfVars.HIVE_CACHE_RUNTIME_JARS); + if (StringUtils.isNotBlank(alljars)) { + if (cacheJars) { + try { + JarCache.cacheJars(job, alljars); + } catch (Exception e) { + LOG.warn("Problem caching jars in MapRedTask, falling back to un-cached jars", e); + libJarsOption = " -libjars " + alljars + " "; + } } else { - libJarsOption = " -libjars " + addedJars + "," + auxJars + " "; + libJarsOption = " -libjars " + alljars + " "; } } diff --git shims/aggregator/pom.xml shims/aggregator/pom.xml index fe18ccb..a223280 100644 --- shims/aggregator/pom.xml +++ shims/aggregator/pom.xml @@ -37,31 +37,26 @@ org.apache.hive.shims hive-shims-common ${project.version} - compile org.apache.hive.shims hive-shims-0.20 ${project.version} - runtime org.apache.hive.shims hive-shims-common-secure ${project.version} - compile org.apache.hive.shims hive-shims-0.20S ${project.version} - runtime org.apache.hive.shims hive-shims-0.23 ${project.version} - runtime