diff --git a/bin/hive b/bin/hive index 3bd949f..c2a4f08 100755 --- a/bin/hive +++ b/bin/hive @@ -104,7 +104,7 @@ if [ -d "${HIVE_AUX_JARS_PATH}" ]; then continue; fi if $cygwin; then - f=`cygpath -w "$f"` + f=`cygpath -w "$f"` fi AUX_CLASSPATH=${AUX_CLASSPATH}:$f if [ "${AUX_PARAM}" == "" ]; then diff --git a/packaging/src/main/assembly/bin.xml b/packaging/src/main/assembly/bin.xml index a97ef7d..a5ed3bc 100644 --- a/packaging/src/main/assembly/bin.xml +++ b/packaging/src/main/assembly/bin.xml @@ -40,7 +40,7 @@ true org.apache.hive.hcatalog:* - org.slf4j:* + org.slf4j:* diff --git a/ql/pom.xml b/ql/pom.xml index 53d0b9e..8b7dc6e 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -481,29 +481,7 @@ - - org.apache.hive:hive-common - org.apache.hive:hive-exec - org.apache.hive:hive-serde com.esotericsoftware.kryo:kryo - com.twiter:parquet-hadoop-bundle - org.apache.thrift:libthrift - commons-lang:commons-lang - org.json:json - org.apache.avro:arvro-mapred - org.apache.hive.shims:hive-shims-0.20 - org.apache.hive.shims:hive-shims-0.20S - org.apache.hive.shims:hive-shims-0.23 - org.apache.hive.shims:hive-shims-0.23 - org.apache.hive.shims:hive-shims-common - org.apache.hive.shims:hive-shims-common-secure - com.googlecode.javaewah:JavaEWAH - javolution:javolution - com.google.protobuf:protobuf-java - org.iq80.snappy:snappy - org.codehaus.jackson:jackson-core-asl - org.codehaus.jackson:jackson-mapper-asl - com.google.guava:guava diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java new file mode 100644 index 0000000..52c6418 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveAuxClasspathBuilder.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.net.URL; +import java.security.CodeSource; +import java.util.Set; + +import javaewah.EWAHCompressedBitmap; +import javolution.util.FastBitSet; + +import org.apache.avro.mapred.AvroMapper; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.shims.Hadoop20SShims; +import org.apache.hadoop.hive.shims.Hadoop20Shims; +import org.apache.hadoop.hive.shims.Hadoop23Shims; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.thrift.TException; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.iq80.snappy.Snappy; +import org.json.JSONArray; + +import parquet.io.ColumnIO; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; + +public final class HiveAuxClasspathBuilder { + private static final Log LOG = LogFactory.getLog(HiveAuxClasspathBuilder.class.getName()); + protected static final Splitter AUX_PATH_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults(); + protected static final Joiner AUX_PATH_JOINER = Joiner.on(",").skipNulls(); + private static final ImmutableSet JARS; + static { + // order below is the same as the old ant build and meven shade + ImmutableSet> clazzes = ImmutableSet.>builder() + .add(HiveConf.class) // org.apache.hive:hive-common + .add(HiveAuxClasspathBuilder.class) // org.apache.hive:hive-exec + .add(AbstractSerDe.class) // org.apache.hive:hive-serde + .add(ColumnIO.class) // com.twitter:parquet-hadoop + .add(TException.class) // org.apache.thrift:libthrift + .add(StringUtils.class) // commons-lang:commons-lang + .add(JSONArray.class) // org.json:json + .add(AvroMapper.class) // org.apache.avro:avro-mapred + .add(Hadoop20Shims.class) // org.apache.hive:hive-shims-0.20 + .add(Hadoop20SShims.class) // org.apache.hive:hive-shims-0.20S + .add(Hadoop23Shims.class) // org.apache.hive:hive-shims-0.23 + .add(HadoopShims.class) // org.apache.hive:hive-shims-common + .add(HadoopShimsSecure.class) // org.apache.hive:hive-shims-common-secure + .add(EWAHCompressedBitmap.class) // com.googlecode.javaewah:JavaEWAH + .add(FastBitSet.class) // javolution:javolution + .add(ByteString.class) // com.google.protobuf:protobuf-java + .add(Snappy.class) // org.iq80.snappy:snappy + .add(JsonFactory.class) // org.codehaus.jackson:jackson-core-asl + .add(ObjectMapper.class) // org.codehaus.jackson:jackson-mapper-asl + .add(ImmutableSet.class) // com.google.guava:guava + .build(); + ImmutableSet.Builder jarBuilder = ImmutableSet.builder(); + for (Class clazz : clazzes) { + CodeSource codeSource = clazz.getProtectionDomain().getCodeSource(); + if (codeSource == null) { + throw new IllegalStateException("Could not find code source for class " + clazz); + } else { + URL location = codeSource.getLocation(); + if (location == null) { + throw new IllegalStateException("Could not find jar location for class " + clazz); + } else { + jarBuilder.add(location.toExternalForm()); + } + } + } + JARS = jarBuilder.build(); + } + + public String getHiveAuxClasspath(Configuration conf, String addedJars) { + Set jars = Sets.newLinkedHashSet(JARS); + jars.addAll(Sets.newLinkedHashSet(AUX_PATH_SPLITTER.split(Strings.nullToEmpty(addedJars).trim()))); + jars.addAll(Sets.newLinkedHashSet(AUX_PATH_SPLITTER.split(Strings.nullToEmpty(HiveConf. + getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)).trim()))); + String result = AUX_PATH_JOINER.join(jars); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Aux Jars: " + result); + } + return result; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 288da8e..c6ce30b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.HiveAuxClasspathBuilder; import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -296,15 +297,18 @@ public int execute(DriverContext driverContext) { // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands // it - String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS); String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS); - if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) { - String allJars = StringUtils.isNotBlank(auxJars) ? (StringUtils.isNotBlank(addedJars) ? addedJars - + "," + auxJars - : auxJars) - : addedJars; + HiveAuxClasspathBuilder classpathBuilder = new HiveAuxClasspathBuilder(); + String allJars = classpathBuilder.getHiveAuxClasspath(job, addedJars); + if (StringUtils.isNotBlank(allJars)) { LOG.info("adding libjars: " + allJars); - initializeFiles("tmpjars", allJars); + JarCache jarCache = new JarCache(); + try { + jarCache.cacheJars(job, allJars); + } catch (Exception e) { + LOG.error("Problem caching jars in ExecDriver, falling back to un-cached jars", e); + initializeFiles("tmpjars", allJars); + } } // Transfer HIVEADDEDFILES to "tmpfiles" so hadoop understands it diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java new file mode 100644 index 0000000..6ed02f5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JarCache.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.mr; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Splitter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public class JarCache { + private static final Log LOG = LogFactory.getLog(JarCache.class); + private static final FsPermission PERMS = new FsPermission(FsAction.ALL, FsAction.NONE, + FsAction.NONE); + protected static final Splitter AUX_PATH_SPLITTER = Splitter.on(",").omitEmptyStrings().trimResults(); + + /** + * Update the cached files access time once per day + */ + private static final long ONE_DAY = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); + /** + * Cache a max of 10,000 file hashes + */ + private static final Cache fileHashCache = CacheBuilder.newBuilder(). + maximumSize(10000).build(); + + @SuppressWarnings("deprecation") + public void cacheJars(Configuration conf, String commaSeperatedJars) + throws Exception { + for (String jar : AUX_PATH_SPLITTER.split(commaSeperatedJars)) { + Path cachedPath = cacheSingleJar(conf, new Path(jar)); + LOG.info("Jar " + jar + " is being replaced with cached path " + cachedPath); + DistributedCache.addArchiveToClassPath(cachedPath, conf); + } + } + private Path cacheSingleJar(Configuration conf, Path jar) + throws Exception { + if (ShimLoader.getHadoopShims().isLocalMode(conf)) { + return jar; + } + FileSystem sourceFs = jar.getFileSystem(conf); + CachedItem cachedItem = fileHashCache.getIfPresent(jar); + if (cachedItem == null) { + try { + cachedItem = new CachedItem(DigestUtils.shaHex(sourceFs.open(jar))); + fileHashCache.put(jar, cachedItem); + } catch (IOException e) { + throw new IOException("Error reading file " + jar, e); + } + } + UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); + Path installPrefix = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR)); + FileSystem destFs = installPrefix.getFileSystem(conf); + Path installDir = new Path(new Path(installPrefix, userName), ".hiveJars"); + destFs.mkdirs(installDir); + Path installPath = new Path(installDir, cachedItem.hash + ".jar"); + if (destFs.exists(installPath)) { + long currentTime = System.currentTimeMillis(); + if (cachedItem.updateAccessTime(currentTime)) { + destFs.setTimes(installPath, -1, currentTime); + } + return installPath; + } else { + InputStream in = null; + OutputStream out = null; + try { + in = sourceFs.open(jar); + out = FileSystem.create(destFs, installPath, PERMS); + IOUtils.copyBytes(in, out, 4096); + LOG.info("Copied " + jar + " to " + installPath); + return installPath; + } catch (IOException e) { + throw new IOException("Error copying file " + jar + " to " + installPath, e); + } finally { + IOUtils.cleanup(LOG, in, out); + } + } + } + private static class CachedItem { + final String hash; + long lastUpdatedAccessTime; + CachedItem(String hash) { + this.hash = hash; + this.lastUpdatedAccessTime = 0; + } + boolean updateAccessTime(long currentTime) { + return currentTime - lastUpdatedAccessTime > ONE_DAY; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 326654f..2024385 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.HiveAuxClasspathBuilder; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; @@ -146,25 +147,20 @@ public int execute(DriverContext driverContext) { String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN); String hiveJar = conf.getJar(); - String libJarsOption; + String libJarsOption = " "; String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR); conf.setVar(ConfVars.HIVEADDEDJARS, addedJars); - String auxJars = conf.getAuxJars(); - // Put auxjars and addedjars together into libjars - if (StringUtils.isEmpty(addedJars)) { - if (StringUtils.isEmpty(auxJars)) { - libJarsOption = " "; - } else { - libJarsOption = " -libjars " + auxJars + " "; - } - } else { - if (StringUtils.isEmpty(auxJars)) { - libJarsOption = " -libjars " + addedJars + " "; - } else { - libJarsOption = " -libjars " + addedJars + "," + auxJars + " "; + HiveAuxClasspathBuilder classpathBuilder = new HiveAuxClasspathBuilder(); + String alljars = classpathBuilder.getHiveAuxClasspath(conf, addedJars); + if (StringUtils.isNotBlank(alljars)) { + JarCache jarCache = new JarCache(); + try { + jarCache.cacheJars(job, alljars); + } catch (Exception e) { + LOG.error("Problem caching jars in MapRedTask, falling back to un-cached jars", e); + libJarsOption = " -libjars " + alljars + " "; } } - // Generate the hiveConfArgs after potentially adding the jars String hiveConfArgs = generateCmdLine(conf, ctx); diff --git a/shims/aggregator/pom.xml b/shims/aggregator/pom.xml index 7aa8c4c..a66b991 100644 --- a/shims/aggregator/pom.xml +++ b/shims/aggregator/pom.xml @@ -37,31 +37,26 @@ org.apache.hive.shims hive-shims-common ${project.version} - compile org.apache.hive.shims hive-shims-0.20 ${project.version} - runtime org.apache.hive.shims hive-shims-common-secure ${project.version} - compile org.apache.hive.shims hive-shims-0.20S ${project.version} - runtime org.apache.hive.shims hive-shims-0.23 ${project.version} - runtime