diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java index 33cc5c3..c6b7446 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader.LineReader; @@ -83,6 +85,8 @@ public boolean equals(Object obj) { JobConf job = null; BytesRefWritable[] bytesRef = new BytesRefWritable[2]; boolean ignoreHdfsLoc = false; + final static int NO_COMPRESSION = 0; + final static int DO_COMPRESSION = 1; public HiveIndexResult(List indexFiles, JobConf conf) throws IOException, HiveException { @@ -182,11 +186,23 @@ private void add(Text line) throws HiveException { bucket.getOffsets().add(Long.parseLong(one_offset)); } + public int doSplitCompression(FileSplit split) throws IOException { + final Path file = split.getPath(); + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job); + CompressionCodec codec = compressionCodecs.getCodec(file); + + if (codec != null) { + return DO_COMPRESSION; + } + return NO_COMPRESSION; + } + public boolean contains(FileSplit split) throws HiveException { if (buckets == null) { return false; } + int ret; String bucketName = split.getPath().toString(); IBucket bucket = buckets.get(bucketName); if (bucket == null) { @@ -196,7 +212,16 @@ public boolean contains(FileSplit split) throws HiveException { return false; } } - + try { + ret = doSplitCompression(split); + } catch (IOException e) { + l4j.error("doSplitCompression failed:" + e.getMessage()); + return false; + } + if (DO_COMPRESSION == ret) { + l4j.info("File is compression."); + return true; + } for (Long offset : bucket.getOffsets()) { if ((offset >= split.getStart()) && (offset <= split.getStart() + split.getLength())) {