diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index b15fdb8..0592c1c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -335,6 +335,9 @@ public class RCFile { decompressBuffer.reset(); DataInputStream valueIn = new DataInputStream(deflatFilter); deflatFilter.resetState(); + if (deflatFilter instanceof SchemaAwareCompressionInputStream) { + ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex); + } decompressBuffer.reset(compressedData.getData(), keyBuffer.eachColumnValueLen[colIndex]); @@ -586,6 +589,8 @@ public class RCFile { KeyBuffer key = null; ValueBuffer value = null; + private int[] plainTotalColumnLength; + private int[] comprTotalColumnLength; /* * used for buffering appends before flush them out @@ -744,6 +749,9 @@ public class RCFile { finalizeFileHeader(); key = new KeyBuffer(columnNumber); value = new ValueBuffer(key); + + plainTotalColumnLength = new int[columnNumber]; + comprTotalColumnLength = new int[columnNumber]; } /** Write the initial part of file header. */ @@ -792,6 +800,9 @@ public class RCFile { compressionBuffer[i] = new NonSyncDataOutputBuffer(); deflateFilter[i] = codec.createOutputStream(compressionBuffer[i], compressor); + if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) { + ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i); + } deflateOut[i] = new DataOutputStream(new BufferedOutputStream( deflateFilter[i])); } @@ -896,12 +907,16 @@ public class RCFile { value.setColumnValueBuffer(compressionBuffer[columnIndex], columnIndex); valueLength += colLen; + plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex]; + comprTotalColumnLength[columnIndex] += colLen; } else { int colLen = columnValuePlainLength[columnIndex]; key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen, columnIndex); value.setColumnValueBuffer(columnValue, columnIndex); valueLength += colLen; + plainTotalColumnLength[columnIndex] += colLen; + comprTotalColumnLength[columnIndex] += colLen; } columnValuePlainLength[columnIndex] = 0; } @@ -996,6 +1011,12 @@ public class RCFile { out.close(); out = null; } + if (LOG.isInfoEnabled()) { + for (int i = 0; i < columnNumber; i++) { + LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i] + + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]); + } + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java new file mode 100644 index 0000000..2007ae3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.InputStream; + +import org.apache.hadoop.io.compress.*; +/** + * + * SchemaAwareCompressionInputStream adds the ability to inform the compression + * stream what column is being read. + * + */ +public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream { + + protected SchemaAwareCompressionInputStream(InputStream in) { + super(in); + } + + /** + * The column being read + * + * @param columnIndex the index of the column. Use -1 for non-column data + */ + public abstract void setColumnIndex(int columnIndex); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java new file mode 100644 index 0000000..75db038 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.OutputStream; + +import org.apache.hadoop.io.compress.*; + +/** + * + * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream + * the current column being compressed. + * + */ +public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream { + + protected SchemaAwareCompressionOutputStream(OutputStream out) { + super(out); + } + + /** + * + * The column being output + * + * @param columnIndex the index of the column. Use -1 for non-column data + */ + public abstract void setColumnIndex(int columnIndex); +}