diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index b15fdb8..23d7c16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -335,6 +335,9 @@ public class RCFile { decompressBuffer.reset(); DataInputStream valueIn = new DataInputStream(deflatFilter); deflatFilter.resetState(); + if (deflatFilter instanceof SchemaAwareCompressionInputStream) { + ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex); + } decompressBuffer.reset(compressedData.getData(), keyBuffer.eachColumnValueLen[colIndex]); @@ -792,6 +795,9 @@ public class RCFile { compressionBuffer[i] = new NonSyncDataOutputBuffer(); deflateFilter[i] = codec.createOutputStream(compressionBuffer[i], compressor); + if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) { + ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i); + } deflateOut[i] = new DataOutputStream(new BufferedOutputStream( deflateFilter[i])); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java new file mode 100644 index 0000000..7298906 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.InputStream; + +import org.apache.hadoop.io.compress.*; +/** + * + * SchemaAwareCompressionInputStream adds the ability to inform the compression + * stream what column is being read. + * + */ +public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream { + + protected SchemaAwareCompressionInputStream(InputStream in) { + super(in); + } + + /** + * The column being read + * + * @param columnIndex the index of the column + */ + public abstract void setColumnIndex(int columnIndex); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java new file mode 100644 index 0000000..175d5c9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.OutputStream; + +import org.apache.hadoop.io.compress.*; + +/** + * + * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream + * the current column being compressed. + * + */ +public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream { + + protected SchemaAwareCompressionOutputStream(OutputStream out) { + super(out); + } + + /** + * + * The column being output + * + * @param columnIndex the index of the column + */ + public abstract void setColumnIndex(int columnIndex); +}