diff --git pom.xml pom.xml
index 41f5337..29cb55c 100644
--- pom.xml
+++ pom.xml
@@ -99,6 +99,7 @@
0.20.2
1.2.1
2.2.0
+ 2.4.0-SNAPSHOT
0.96.0-hadoop1
0.96.0-hadoop2
@@ -185,6 +186,17 @@
false
+
+ apache.snapshots
+ Apache Snapshot Repository
+ http://repository.apache.org/snapshots
+
+ false
+
+
+ true
+
+
diff --git ql/pom.xml ql/pom.xml
index 7087a4c..c5db303 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -483,7 +483,7 @@
org.apache.hive.shims:hive-shims-0.20
org.apache.hive.shims:hive-shims-0.20S
org.apache.hive.shims:hive-shims-0.23
- org.apache.hive.shims:hive-shims-0.23
+ org.apache.hive.shims:hive-shims-0.23C
org.apache.hive.shims:hive-shims-common
org.apache.hive.shims:hive-shims-common-secure
com.googlecode.javaewah:JavaEWAH
diff --git shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
index ec1f18e..4b2e7ad 100644
--- shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
+++ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,8 +51,10 @@
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
@@ -755,6 +758,7 @@ public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi
public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
return new ProxyFileSystem(fs, uri);
}
+
@Override
public Map getHadoopConfNames() {
Map ret = new HashMap();
@@ -773,4 +777,16 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ /* not supported */
+ return null;
+ }
}
diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index d0ff7d4..5e8ed3f 100644
--- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +38,9 @@
import org.apache.hadoop.fs.ProxyFileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.ClusterStatus;
@@ -410,4 +413,16 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ /* not supported */
+ return null;
+ }
}
diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 54c38ee..7ffe027 100644
--- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -27,11 +27,13 @@
import java.util.Map;
import java.util.HashMap;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.io.FileNotFoundException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -41,7 +43,9 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -556,5 +560,17 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) {
ret.put("MAPREDSETUPCLEANUPNEEDED", "mapreduce.job.committer.setup.cleanup.needed");
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
+ }
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ /* not supported */
+ return null;
}
}
diff --git shims/aggregator/pom.xml shims/aggregator/pom.xml
index 7aa8c4c..77ef113 100644
--- shims/aggregator/pom.xml
+++ shims/aggregator/pom.xml
@@ -63,5 +63,11 @@
${project.version}
runtime
+
+ org.apache.hive.shims
+ hive-shims-0.23C
+ ${project.version}
+ runtime
+
diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 2b3c6c1..d410db4 100644
--- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -24,6 +24,7 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Comparator;
import java.util.Iterator;
@@ -36,11 +37,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -520,4 +523,68 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte
public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
public Map getHadoopConfNames();
+
+ /**
+ * a hadoop.io ByteBufferPool shim.
+ */
+ public interface ByteBufferPoolShim {
+ /**
+ * Get a new ByteBuffer from the pool. The pool can provide this from
+ * removing a buffer from its internal cache, or by allocating a
+ * new buffer.
+ *
+ * @param direct Whether the buffer should be direct.
+ * @param length The minimum length the buffer will have.
+ * @return A new ByteBuffer. Its capacity can be less
+ * than what was requested, but must be at
+ * least 1 byte.
+ */
+ ByteBuffer getBuffer(boolean direct, int length);
+
+ /**
+ * Release a buffer back to the pool.
+ * The pool may choose to put this buffer into its cache/free it.
+ *
+ * @param buffer a direct bytebuffer
+ */
+ void putBuffer(ByteBuffer buffer);
+ }
+
+ /**
+ * Provides an HDFS ZeroCopyReader shim.
+ * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+ * @param in ByteBufferPoolShim to allocate fallback buffers with
+ *
+ * @return returns null if not supported
+ */
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+ public interface ZeroCopyReaderShim {
+ /**
+ * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ * @return ByteBuffer read from the stream,
+ */
+ public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+ /**
+ * Release a ByteBuffer obtained from a read on the
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ */
+ public void releaseBuffer(ByteBuffer buffer);
+ }
+
+ public enum DirectCompressionType {
+ NONE,
+ ZLIB_NOHEADER,
+ ZLIB,
+ SNAPPY,
+ };
+
+ public interface DirectDecompressorShim {
+ public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
+ }
+
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec);
}
diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
index bf9c84f..34010c2 100644
--- shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
+++ shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
@@ -44,6 +44,7 @@
HADOOP_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Hadoop20Shims");
HADOOP_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Hadoop20SShims");
HADOOP_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Hadoop23Shims");
+ HADOOP_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.hive.shims.Hadoop23CShims");
}
/**
@@ -57,6 +58,7 @@
JETTY_SHIM_CLASSES.put("0.20", "org.apache.hadoop.hive.shims.Jetty20Shims");
JETTY_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.hive.shims.Jetty20SShims");
JETTY_SHIM_CLASSES.put("0.23", "org.apache.hadoop.hive.shims.Jetty23Shims");
+ JETTY_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.hive.shims.Jetty23Shims");
}
/**
@@ -69,6 +71,7 @@
EVENT_COUNTER_SHIM_CLASSES.put("0.20", "org.apache.hadoop.metrics.jvm.EventCounter");
EVENT_COUNTER_SHIM_CLASSES.put("0.20S", "org.apache.hadoop.log.metrics.EventCounter");
EVENT_COUNTER_SHIM_CLASSES.put("0.23", "org.apache.hadoop.log.metrics.EventCounter");
+ EVENT_COUNTER_SHIM_CLASSES.put("0.23C", "org.apache.hadoop.log.metrics.EventCounter");
}
/**
@@ -149,7 +152,13 @@ public static String getMajorVersion() {
case 1:
return "0.20S";
case 2:
- return "0.23";
+ try {
+ Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+ ShimLoader.class.getClassLoader());
+ return "0.23C";
+ } catch (ClassNotFoundException ce) {
+ return "0.23";
+ }
default:
throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);
}
diff --git shims/pom.xml shims/pom.xml
index 9843836..a66b186 100644
--- shims/pom.xml
+++ shims/pom.xml
@@ -37,6 +37,7 @@
common-secure
0.20S
0.23
+ 0.23C
aggregator