diff --git pom.xml pom.xml index 41f5337..29cb55c 100644 --- pom.xml +++ pom.xml @@ -99,6 +99,7 @@ 0.20.2 1.2.1 2.2.0 + 2.4.0-SNAPSHOT 0.96.0-hadoop1 0.96.0-hadoop2 @@ -185,6 +186,17 @@ false + + apache.snapshots + Apache Snapshot Repository + http://repository.apache.org/snapshots + + false + + + true + + diff --git ql/pom.xml ql/pom.xml index 7087a4c..c5db303 100644 --- ql/pom.xml +++ ql/pom.xml @@ -483,7 +483,7 @@ org.apache.hive.shims:hive-shims-0.20 org.apache.hive.shims:hive-shims-0.20S org.apache.hive.shims:hive-shims-0.23 - org.apache.hive.shims:hive-shims-0.23 + org.apache.hive.shims:hive-shims-0.23C org.apache.hive.shims:hive-shims-common org.apache.hive.shims:hive-shims-common-secure com.googlecode.javaewah:JavaEWAH diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index ec1f18e..4b2e7ad 100644 --- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,8 +51,10 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; @@ -755,6 +758,7 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { return new ProxyFileSystem(fs, uri); } + @Override public Map getHadoopConfNames() { Map ret = new HashMap(); @@ -773,4 +777,16 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException { + /* not supported */ + return null; + } + + @Override + public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) { + /* not supported */ + return null; + } } diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index d0ff7d4..5e8ed3f 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,7 +38,9 @@ import org.apache.hadoop.fs.ProxyFileSystem; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; @@ -410,4 +413,16 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException { + /* not supported */ + return null; + } + + @Override + public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) { + /* not supported */ + return null; + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 54c38ee..7ffe027 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -27,11 +27,13 @@ import java.util.Map; import java.util.HashMap; import java.net.URI; +import java.nio.ByteBuffer; import java.io.FileNotFoundException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -41,7 +43,9 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; @@ -556,5 +560,17 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("MAPREDSETUPCLEANUPNEEDED", "mapreduce.job.committer.setup.cleanup.needed"); ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed"); return ret; + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException { + /* not supported */ + return null; + } + + @Override + public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) { + /* not supported */ + return null; } } diff --git shims/0.23C/pom.xml shims/0.23C/pom.xml new file mode 100644 index 0000000..d48bb2b --- /dev/null +++ shims/0.23C/pom.xml @@ -0,0 +1,149 @@ + + + + 4.0.0 + + org.apache.hive + hive + 0.13.0-SNAPSHOT + ../../pom.xml + + + org.apache.hive.shims + hive-shims-0.23C + jar + Hive Shims 0.23C + + + ../.. + + + + + + + org.apache.hive.shims + hive-shims-0.23 + ${project.version} + + + + commons-lang + commons-lang + ${commons-lang.version} + + + commons-logging + commons-logging + ${commons-logging.version} + + + org.apache.hadoop + hadoop-common + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-23C.version} + test-jar + true + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + ${hadoop-23C.version} + test-jar + true + + + org.mortbay.jetty + jetty + ${jetty.version} + true + + + org.apache.tez + tez-api + ${tez.version} + true + + + org.apache.tez + tez-runtime-library + ${tez.version} + true + + + org.apache.tez + tez-mapreduce + ${tez.version} + true + + + org.apache.hadoop + hadoop-hdfs + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-yarn-api + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-yarn-common + ${hadoop-23C.version} + true + + + org.apache.hadoop + hadoop-yarn-client + ${hadoop-23C.version} + true + + + org.apache.tez + tez-tests + ${tez.version} + true + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop-23C.version} + true + test-jar + + + diff --git shims/0.23C/src/main/java/org/apache/hadoop/hive/shims/Hadoop23CShims.java shims/0.23C/src/main/java/org/apache/hadoop/hive/shims/Hadoop23CShims.java new file mode 100644 index 0000000..b431a7f --- /dev/null +++ shims/0.23C/src/main/java/org/apache/hadoop/hive/shims/Hadoop23CShims.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.shims; + +import java.io.IOException; +import java.lang.Integer; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; +import java.util.EnumSet; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor; + +/** + * Implemention of shims against Hadoop 0.23 + caching. + */ +public class Hadoop23CShims extends Hadoop23Shims { + + private static final class ByteBufferPoolAdapter implements ByteBufferPool { + private ByteBufferPoolShim pool; + public ByteBufferPoolAdapter(ByteBufferPoolShim pool) { + this.pool = pool; + } + + @Override + public final ByteBuffer getBuffer(boolean direct, int length) { + return this.pool.getBuffer(direct, length); + } + + @Override + public final void putBuffer(ByteBuffer buffer) { + this.pool.putBuffer(buffer); + } + } + + private static final class ZeroCopyAdapter implements ZeroCopyReaderShim { + private final FSDataInputStream in; + private final ByteBufferPoolAdapter pool; + private final static EnumSet checksummed = EnumSet.noneOf(ReadOption.class); + private final static EnumSet nochecksums = EnumSet.of(ReadOption.SKIP_CHECKSUMS); + + public ZeroCopyAdapter(FSDataInputStream in, ByteBufferPoolShim poolshim) { + this.in = in; + if(poolshim != null) { + pool = new ByteBufferPoolAdapter(poolshim); + } else { + pool = null; + } + } + public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException { + EnumSet options = nochecksums; + if(verifyChecksums) { + options = checksummed; + } + return this.in.read(this.pool, maxLength, options); + } + public final void releaseBuffer(ByteBuffer buffer) { + this.in.releaseBuffer(buffer); + } + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException { + return new ZeroCopyAdapter(in, pool); + } + + private final class DirectDecompressorAdapter implements DirectDecompressorShim { + private final DirectDecompressor decompressor; + + public DirectDecompressorAdapter(DirectDecompressor decompressor) { + this.decompressor = decompressor; + } + + public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException { + this.decompressor.decompress(src, dst); + } + } + + @Override + public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) { + DirectDecompressor decompressor = null; + switch(codec) { + case ZLIB: { + decompressor = new ZlibDirectDecompressor(); + } + break; + case ZLIB_NOHEADER: { + decompressor = new ZlibDirectDecompressor(CompressionHeader.NO_HEADER, 0); + } + break; + case SNAPPY: { + decompressor = new SnappyDirectDecompressor(); + } + break; + } + if(decompressor != null) { + return new DirectDecompressorAdapter(decompressor); + } + /* not supported */ + return null; + } +} diff --git shims/aggregator/pom.xml shims/aggregator/pom.xml index 7aa8c4c..77ef113 100644 --- shims/aggregator/pom.xml +++ shims/aggregator/pom.xml @@ -63,5 +63,11 @@ ${project.version} runtime + + org.apache.hive.shims + hive-shims-0.23C + ${project.version} + runtime + diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 2b3c6c1..d410db4 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -24,6 +24,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Comparator; import java.util.Iterator; @@ -36,11 +37,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -520,4 +523,68 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); + + /** + * a hadoop.io ByteBufferPool shim. + */ + public interface ByteBufferPoolShim { + /** + * Get a new ByteBuffer from the pool. The pool can provide this from + * removing a buffer from its internal cache, or by allocating a + * new buffer. + * + * @param direct Whether the buffer should be direct. + * @param length The minimum length the buffer will have. + * @return A new ByteBuffer. Its capacity can be less + * than what was requested, but must be at + * least 1 byte. + */ + ByteBuffer getBuffer(boolean direct, int length); + + /** + * Release a buffer back to the pool. + * The pool may choose to put this buffer into its cache/free it. + * + * @param buffer a direct bytebuffer + */ + void putBuffer(ByteBuffer buffer); + } + + /** + * Provides an HDFS ZeroCopyReader shim. + * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to) + * @param in ByteBufferPoolShim to allocate fallback buffers with + * + * @return returns null if not supported + */ + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException; + + public interface ZeroCopyReaderShim { + /** + * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer. + * Also move the in stream by that amount. The data read can be small than maxLength. + * + * @return ByteBuffer read from the stream, + */ + public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException; + /** + * Release a ByteBuffer obtained from a read on the + * Also move the in stream by that amount. The data read can be small than maxLength. + * + */ + public void releaseBuffer(ByteBuffer buffer); + } + + public enum DirectCompressionType { + NONE, + ZLIB_NOHEADER, + ZLIB, + SNAPPY, + }; + + public interface DirectDecompressorShim { + public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException; + } + + public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec); } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java index bf9c84f..34010c2 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java @@ -44,6 +44,7 @@ HADOOP_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Hadoop20Shims"); HADOOP_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Hadoop20SShims"); HADOOP_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Hadoop23Shims"); + HADOOP_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.hive.shims.Hadoop23CShims"); } /** @@ -57,6 +58,7 @@ JETTY_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Jetty20Shims"); JETTY_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Jetty20SShims"); JETTY_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Jetty23Shims"); + JETTY_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.hive.shims.Jetty23Shims"); } /** @@ -69,6 +71,7 @@ EVENT_COUNTER_SHIM_CLASSES.put("0.20", "org.apache.hadoop.metrics.jvm.EventCounter"); EVENT_COUNTER_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.log.metrics.EventCounter"); EVENT_COUNTER_SHIM_CLASSES.put("0.23", "org.apache.hadoop.log.metrics.EventCounter"); + EVENT_COUNTER_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.log.metrics.EventCounter"); } /** @@ -149,7 +152,13 @@ public static String getMajorVersion() { case 1: return "0.20S"; case 2: - return "0.23"; + try { + Class.forName("org.apache.hadoop.fs.CacheFlag", false, + ShimLoader.class.getClassLoader()); + return "0.23C"; + } catch (ClassNotFoundException ce) { + return "0.23"; + } default: throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers); } diff --git shims/pom.xml shims/pom.xml index 9843836..a66b186 100644 --- shims/pom.xml +++ shims/pom.xml @@ -37,6 +37,7 @@ common-secure 0.20S 0.23 + 0.23C aggregator