commit 9eb13461b717c7b7897329cacba9be65736244bc Author: Owen O'Malley Date: Mon May 2 14:24:07 2016 -0700 first pass of 9660 diff --git orc/src/gen/protobuf-java/org/apache/orc/OrcProto.java orc/src/gen/protobuf-java/org/apache/orc/OrcProto.java index 24715c3..7e82fda 100644 --- orc/src/gen/protobuf-java/org/apache/orc/OrcProto.java +++ orc/src/gen/protobuf-java/org/apache/orc/OrcProto.java @@ -6715,6 +6715,20 @@ public Builder clearHasNull() { * optional .orc.proto.ColumnStatistics statistics = 2; */ org.apache.orc.OrcProto.ColumnStatisticsOrBuilder getStatisticsOrBuilder(); + + // repeated uint32 lengths = 3 [packed = true]; + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + java.util.List getLengthsList(); + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + int getLengthsCount(); + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + int getLengths(int index); } /** * Protobuf type {@code orc.proto.RowIndexEntry} @@ -6801,6 +6815,27 @@ private RowIndexEntry( bitField0_ |= 0x00000001; break; } + case 24: { + if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + lengths_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000004; + } + lengths_.add(input.readUInt32()); + break; + } + case 26: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + if (!((mutable_bitField0_ & 0x00000004) == 0x00000004) && input.getBytesUntilLimit() > 0) { + lengths_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000004; + } + while (input.getBytesUntilLimit() > 0) { + lengths_.add(input.readUInt32()); + } + input.popLimit(limit); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -6812,6 +6847,9 @@ private RowIndexEntry( if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { positions_ = java.util.Collections.unmodifiableList(positions_); } + if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) { + lengths_ = java.util.Collections.unmodifiableList(lengths_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -6890,9 +6928,34 @@ public boolean hasStatistics() { return statistics_; } + // repeated uint32 lengths = 3 [packed = true]; + public static final int LENGTHS_FIELD_NUMBER = 3; + private java.util.List lengths_; + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public java.util.List + getLengthsList() { + return lengths_; + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public int getLengthsCount() { + return lengths_.size(); + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public int getLengths(int index) { + return lengths_.get(index); + } + private int lengthsMemoizedSerializedSize = -1; + private void initFields() { positions_ = java.util.Collections.emptyList(); statistics_ = org.apache.orc.OrcProto.ColumnStatistics.getDefaultInstance(); + lengths_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6916,6 +6979,13 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeMessage(2, statistics_); } + if (getLengthsList().size() > 0) { + output.writeRawVarint32(26); + output.writeRawVarint32(lengthsMemoizedSerializedSize); + } + for (int i = 0; i < lengths_.size(); i++) { + output.writeUInt32NoTag(lengths_.get(i)); + } getUnknownFields().writeTo(output); } @@ -6943,6 +7013,20 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(2, statistics_); } + { + int dataSize = 0; + for (int i = 0; i < lengths_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeUInt32SizeNoTag(lengths_.get(i)); + } + size += dataSize; + if (!getLengthsList().isEmpty()) { + size += 1; + size += com.google.protobuf.CodedOutputStream + .computeInt32SizeNoTag(dataSize); + } + lengthsMemoizedSerializedSize = dataSize; + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7068,6 +7152,8 @@ public Builder clear() { statisticsBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); + lengths_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -7109,6 +7195,11 @@ public Builder clone() { } else { result.statistics_ = statisticsBuilder_.build(); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + lengths_ = java.util.Collections.unmodifiableList(lengths_); + bitField0_ = (bitField0_ & ~0x00000004); + } + result.lengths_ = lengths_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -7138,6 +7229,16 @@ public Builder mergeFrom(org.apache.orc.OrcProto.RowIndexEntry other) { if (other.hasStatistics()) { mergeStatistics(other.getStatistics()); } + if (!other.lengths_.isEmpty()) { + if (lengths_.isEmpty()) { + lengths_ = other.lengths_; + bitField0_ = (bitField0_ & ~0x00000004); + } else { + ensureLengthsIsMutable(); + lengths_.addAll(other.lengths_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7348,6 +7449,72 @@ public Builder clearStatistics() { return statisticsBuilder_; } + // repeated uint32 lengths = 3 [packed = true]; + private java.util.List lengths_ = java.util.Collections.emptyList(); + private void ensureLengthsIsMutable() { + if (!((bitField0_ & 0x00000004) == 0x00000004)) { + lengths_ = new java.util.ArrayList(lengths_); + bitField0_ |= 0x00000004; + } + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public java.util.List + getLengthsList() { + return java.util.Collections.unmodifiableList(lengths_); + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public int getLengthsCount() { + return lengths_.size(); + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public int getLengths(int index) { + return lengths_.get(index); + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public Builder setLengths( + int index, int value) { + ensureLengthsIsMutable(); + lengths_.set(index, value); + onChanged(); + return this; + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public Builder addLengths(int value) { + ensureLengthsIsMutable(); + lengths_.add(value); + onChanged(); + return this; + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public Builder addAllLengths( + java.lang.Iterable values) { + ensureLengthsIsMutable(); + super.addAll(values, lengths_); + onChanged(); + return this; + } + /** + * repeated uint32 lengths = 3 [packed = true]; + */ + public Builder clearLengths() { + lengths_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000004); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:orc.proto.RowIndexEntry) } @@ -19088,56 +19255,57 @@ public Builder setMagicBytes( "Statistics\0225\n\020binaryStatistics\030\010 \001(\0132\033.o" + "rc.proto.BinaryStatistics\022;\n\023timestampSt" + "atistics\030\t \001(\0132\036.orc.proto.TimestampStat" + - "istics\022\017\n\007hasNull\030\n \001(\010\"W\n\rRowIndexEntry" + + "istics\022\017\n\007hasNull\030\n \001(\010\"l\n\rRowIndexEntry" + "\022\025\n\tpositions\030\001 \003(\004B\002\020\001\022/\n\nstatistics\030\002 " + - "\001(\0132\033.orc.proto.ColumnStatistics\"3\n\010RowI" + - "ndex\022\'\n\005entry\030\001 \003(\0132\030.orc.proto.RowIndex" + - "Entry\"7\n\013BloomFilter\022\030\n\020numHashFunctions" + - "\030\001 \001(\r\022\016\n\006bitset\030\002 \003(\006\"?\n\020BloomFilterInd", - "ex\022+\n\013bloomFilter\030\001 \003(\0132\026.orc.proto.Bloo" + - "mFilter\"\325\001\n\006Stream\022$\n\004kind\030\001 \001(\0162\026.orc.p" + - "roto.Stream.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006leng" + - "th\030\003 \001(\004\"\204\001\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001" + - "\022\n\n\006LENGTH\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DIC" + - "TIONARY_COUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_IN" + - "DEX\020\006\022\020\n\014BLOOM_FILTER\020\007\"\234\001\n\016ColumnEncodi" + - "ng\022,\n\004kind\030\001 \001(\0162\036.orc.proto.ColumnEncod" + - "ing.Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind" + - "\022\n\n\006DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V", - "2\020\002\022\021\n\rDICTIONARY_V2\020\003\"v\n\014StripeFooter\022\"" + - "\n\007streams\030\001 \003(\0132\021.orc.proto.Stream\022*\n\007co" + - "lumns\030\002 \003(\0132\031.orc.proto.ColumnEncoding\022\026" + - "\n\016writerTimezone\030\003 \001(\t\"\341\002\n\004Type\022\"\n\004kind\030" + - "\001 \001(\0162\024.orc.proto.Type.Kind\022\024\n\010subtypes\030" + - "\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\022\025\n\rmaximum" + - "Length\030\004 \001(\r\022\021\n\tprecision\030\005 \001(\r\022\r\n\005scale" + - "\030\006 \001(\r\"\321\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t" + - "\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022" + - "\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tT", - "IMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020" + - "\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\022\013\n\007V" + - "ARCHAR\020\020\022\010\n\004CHAR\020\021\"x\n\021StripeInformation\022" + - "\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\n" + - "dataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024" + - "\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataItem" + - "\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\014\"A\n\020StripeS" + - "tatistics\022-\n\010colStats\030\001 \003(\0132\033.orc.proto." + - "ColumnStatistics\"<\n\010Metadata\0220\n\013stripeSt" + - "ats\030\001 \003(\0132\033.orc.proto.StripeStatistics\"\222", - "\002\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconte" + - "ntLength\030\002 \001(\004\022-\n\007stripes\030\003 \003(\0132\034.orc.pr" + - "oto.StripeInformation\022\036\n\005types\030\004 \003(\0132\017.o" + - "rc.proto.Type\022-\n\010metadata\030\005 \003(\0132\033.orc.pr" + - "oto.UserMetadataItem\022\024\n\014numberOfRows\030\006 \001" + - "(\004\022/\n\nstatistics\030\007 \003(\0132\033.orc.proto.Colum" + - "nStatistics\022\026\n\016rowIndexStride\030\010 \001(\r\"\305\001\n\n" + - "PostScript\022\024\n\014footerLength\030\001 \001(\004\022/\n\013comp" + - "ression\030\002 \001(\0162\032.orc.proto.CompressionKin" + - "d\022\034\n\024compressionBlockSize\030\003 \001(\004\022\023\n\007versi", - "on\030\004 \003(\rB\002\020\001\022\026\n\016metadataLength\030\005 \001(\004\022\025\n\r" + - "writerVersion\030\006 \001(\r\022\016\n\005magic\030\300> \001(\t*:\n\017C" + - "ompressionKind\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SN" + - "APPY\020\002\022\007\n\003LZO\020\003B\020\n\016org.apache.orc" + "\001(\0132\033.orc.proto.ColumnStatistics\022\023\n\007leng" + + "ths\030\003 \003(\rB\002\020\001\"3\n\010RowIndex\022\'\n\005entry\030\001 \003(\013" + + "2\030.orc.proto.RowIndexEntry\"7\n\013BloomFilte" + + "r\022\030\n\020numHashFunctions\030\001 \001(\r\022\016\n\006bitset\030\002 ", + "\003(\006\"?\n\020BloomFilterIndex\022+\n\013bloomFilter\030\001" + + " \003(\0132\026.orc.proto.BloomFilter\"\325\001\n\006Stream\022" + + "$\n\004kind\030\001 \001(\0162\026.orc.proto.Stream.Kind\022\016\n" + + "\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"\204\001\n\004Kind\022\013" + + "\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n\017DIC" + + "TIONARY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004\022\r\n\t" + + "SECONDARY\020\005\022\r\n\tROW_INDEX\020\006\022\020\n\014BLOOM_FILT" + + "ER\020\007\"\234\001\n\016ColumnEncoding\022,\n\004kind\030\001 \001(\0162\036." + + "orc.proto.ColumnEncoding.Kind\022\026\n\016diction" + + "arySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006DIRECT\020\000\022\016\n\nDIC", + "TIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022\021\n\rDICTIONARY_V" + + "2\020\003\"v\n\014StripeFooter\022\"\n\007streams\030\001 \003(\0132\021.o" + + "rc.proto.Stream\022*\n\007columns\030\002 \003(\0132\031.orc.p" + + "roto.ColumnEncoding\022\026\n\016writerTimezone\030\003 " + + "\001(\t\"\341\002\n\004Type\022\"\n\004kind\030\001 \001(\0162\024.orc.proto.T" + + "ype.Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldN" + + "ames\030\003 \003(\t\022\025\n\rmaximumLength\030\004 \001(\r\022\021\n\tpre" + + "cision\030\005 \001(\r\022\r\n\005scale\030\006 \001(\r\"\321\001\n\004Kind\022\013\n\007" + + "BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003INT\020\003\022" + + "\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006STRI", + "NG\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004LIST\020" + + "\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n\007DEC" + + "IMAL\020\016\022\010\n\004DATE\020\017\022\013\n\007VARCHAR\020\020\022\010\n\004CHAR\020\021\"" + + "x\n\021StripeInformation\022\016\n\006offset\030\001 \001(\004\022\023\n\013" + + "indexLength\030\002 \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n" + + "\014footerLength\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(" + + "\004\"/\n\020UserMetadataItem\022\014\n\004name\030\001 \001(\t\022\r\n\005v" + + "alue\030\002 \001(\014\"A\n\020StripeStatistics\022-\n\010colSta" + + "ts\030\001 \003(\0132\033.orc.proto.ColumnStatistics\"<\n" + + "\010Metadata\0220\n\013stripeStats\030\001 \003(\0132\033.orc.pro", + "to.StripeStatistics\"\222\002\n\006Footer\022\024\n\014header" + + "Length\030\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022-\n\007s" + + "tripes\030\003 \003(\0132\034.orc.proto.StripeInformati" + + "on\022\036\n\005types\030\004 \003(\0132\017.orc.proto.Type\022-\n\010me" + + "tadata\030\005 \003(\0132\033.orc.proto.UserMetadataIte" + + "m\022\024\n\014numberOfRows\030\006 \001(\004\022/\n\nstatistics\030\007 " + + "\003(\0132\033.orc.proto.ColumnStatistics\022\026\n\016rowI" + + "ndexStride\030\010 \001(\r\"\305\001\n\nPostScript\022\024\n\014foote" + + "rLength\030\001 \001(\004\022/\n\013compression\030\002 \001(\0162\032.orc" + + ".proto.CompressionKind\022\034\n\024compressionBlo", + "ckSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001\022\026\n\016met" + + "adataLength\030\005 \001(\004\022\025\n\rwriterVersion\030\006 \001(\r" + + "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" + + "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003B\020\n\016" + + "org.apache.orc" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -19203,7 +19371,7 @@ public Builder setMagicBytes( internal_static_orc_proto_RowIndexEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_orc_proto_RowIndexEntry_descriptor, - new java.lang.String[] { "Positions", "Statistics", }); + new java.lang.String[] { "Positions", "Statistics", "Lengths", }); internal_static_orc_proto_RowIndex_descriptor = getDescriptor().getMessageTypes().get(10); internal_static_orc_proto_RowIndex_fieldAccessorTable = new diff --git orc/src/java/org/apache/orc/impl/BitFieldWriter.java orc/src/java/org/apache/orc/impl/BitFieldWriter.java index aa5f886..04d9bb9 100644 --- orc/src/java/org/apache/orc/impl/BitFieldWriter.java +++ orc/src/java/org/apache/orc/impl/BitFieldWriter.java @@ -17,10 +17,6 @@ */ package org.apache.orc.impl; -import org.apache.orc.impl.PositionRecorder; -import org.apache.orc.impl.PositionedOutputStream; -import org.apache.orc.impl.RunLengthByteWriter; - import java.io.IOException; public class BitFieldWriter { @@ -70,4 +66,8 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(8 - bitsLeft); } + + public void registerCallback(PositionedOutputStream.CompressionCallback cb) { + output.registerCallback(cb); + } } diff --git orc/src/java/org/apache/orc/impl/IntegerWriter.java orc/src/java/org/apache/orc/impl/IntegerWriter.java index 419054f..9b4058c 100644 --- orc/src/java/org/apache/orc/impl/IntegerWriter.java +++ orc/src/java/org/apache/orc/impl/IntegerWriter.java @@ -44,4 +44,9 @@ * @throws IOException */ void flush() throws IOException; + + /** + * Register a callback for when the next compression is done. + */ + void registerCallback(OutStream.CompressionCallback callback); } diff --git orc/src/java/org/apache/orc/impl/OutStream.java orc/src/java/org/apache/orc/impl/OutStream.java index 81662cc..1a16115 100644 --- orc/src/java/org/apache/orc/impl/OutStream.java +++ orc/src/java/org/apache/orc/impl/OutStream.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; public class OutStream extends PositionedOutputStream { @@ -65,6 +67,7 @@ private final CompressionCodec codec; private long compressedBytes = 0; private long uncompressedBytes = 0; + private final List callbacks = new ArrayList<>(); public OutStream(String name, int bufferSize, @@ -77,6 +80,21 @@ public OutStream(String name, this.suppress = false; } + /** + * Register a callback for when the next compression buffer is completed. + * @param callback the method to call when the block is done. + */ + @Override + public void registerCallback(CompressionCallback callback) { + if (codec == null) { + callback.compressionDone(uncompressedBytes); + } else if (current == null || current.position() == HEADER_SIZE) { + callback.compressionDone(compressedBytes); + } else { + callbacks.add(callback); + } + } + public void clear() throws IOException { flush(); suppress = false; @@ -155,6 +173,13 @@ public void write(byte[] bytes, int offset, int length) throws IOException { } } + private void doCallbacks(long position) { + for(CompressionCallback callback: callbacks) { + callback.compressionDone(position); + } + callbacks.clear(); + } + private void spill() throws java.io.IOException { // if there isn't anything in the current buffer, don't spill if (current == null || @@ -185,6 +210,7 @@ private void spill() throws java.io.IOException { } compressedBytes += totalBytes + HEADER_SIZE; writeHeader(compressed, sizePosn, totalBytes, false); + doCallbacks(compressedBytes); // if we have less than the next header left, spill it. if (compressed.remaining() < HEADER_SIZE) { compressed.flip(); @@ -194,6 +220,7 @@ private void spill() throws java.io.IOException { } } else { compressedBytes += uncompressedBytes + HEADER_SIZE; + doCallbacks(compressedBytes); uncompressedBytes = 0; // we are using the original, but need to spill the current // compressed buffer first. So back up to where we started, diff --git orc/src/java/org/apache/orc/impl/PositionedOutputStream.java orc/src/java/org/apache/orc/impl/PositionedOutputStream.java index d412939..86e536b 100644 --- orc/src/java/org/apache/orc/impl/PositionedOutputStream.java +++ orc/src/java/org/apache/orc/impl/PositionedOutputStream.java @@ -22,6 +22,13 @@ public abstract class PositionedOutputStream extends OutputStream { + public interface CompressionCallback { + /** + * Called when each compression finished. + */ + void compressionDone(long length); + } + /** * Record the current position to the recorder. * @param recorder the object that receives the position @@ -36,4 +43,10 @@ public abstract void getPosition(PositionRecorder recorder * @return the number of bytes used by buffers. */ public abstract long getBufferSize(); + + /** + * Register a callback for when the next compression buffer is completed. + * @param callback the method to call when the block is done. + */ + public abstract void registerCallback(CompressionCallback callback); } diff --git orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java index 09108b2..c72cfe2 100644 --- orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java +++ orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java @@ -18,6 +18,8 @@ package org.apache.orc.impl; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * A streamFactory that writes a sequence of bytes. A control byte is written before @@ -33,6 +35,8 @@ private int numLiterals = 0; private boolean repeat = false; private int tailRunLength = 0; + private List callbacks = + new ArrayList<>(); public RunLengthByteWriter(PositionedOutputStream output) { this.output = output; @@ -51,6 +55,10 @@ private void writeValues() throws IOException { tailRunLength = 0; numLiterals = 0; } + for(PositionedOutputStream.CompressionCallback cb: callbacks) { + output.registerCallback(cb); + } + callbacks.clear(); } public void flush() throws IOException { @@ -103,4 +111,12 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(numLiterals); } + + public void registerCallback(PositionedOutputStream.CompressionCallback cb) { + if (numLiterals == 0) { + output.registerCallback(cb); + } else { + callbacks.add(cb); + } + } } diff --git orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java index 3e5f2e2..3563523 100644 --- orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java +++ orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java @@ -18,6 +18,8 @@ package org.apache.orc.impl; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * A streamFactory that writes a sequence of integers. A control byte is written before @@ -39,6 +41,8 @@ private boolean repeat = false; private int tailRunLength = 0; private SerializationUtils utils; + private final List callbacks = + new ArrayList<>(); public RunLengthIntegerWriter(PositionedOutputStream output, boolean signed) { @@ -70,6 +74,10 @@ private void writeValues() throws IOException { repeat = false; numLiterals = 0; tailRunLength = 0; + for(OutStream.CompressionCallback cb: callbacks) { + output.registerCallback(cb); + }; + callbacks.clear(); } } @@ -80,6 +88,15 @@ public void flush() throws IOException { } @Override + public void registerCallback(OutStream.CompressionCallback callback) { + if (numLiterals == 0) { + output.registerCallback(callback); + } else { + callbacks.add(callback); + } + } + + @Override public void write(long value) throws IOException { if (numLiterals == 0) { literals[numLiterals++] = value; @@ -139,5 +156,4 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(numLiterals); } - } diff --git orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java index fab2801..c3d10a2 100644 --- orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java +++ orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java @@ -18,6 +18,8 @@ package org.apache.orc.impl; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * A writer that performs light weight compression over sequence of integers. @@ -159,6 +161,8 @@ private boolean isFixedDelta; private SerializationUtils utils; private boolean alignedBitpacking; + private final List callbacks = + new ArrayList<>(); RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { this(output, signed, true); @@ -188,6 +192,10 @@ private void writeValues() throws IOException { // clear all the variables clear(); + for(PositionedOutputStream.CompressionCallback cb: callbacks) { + output.registerCallback(cb); + } + callbacks.clear(); } } @@ -716,6 +724,15 @@ public void flush() throws IOException { } @Override + public void registerCallback(OutStream.CompressionCallback callback) { + if (numLiterals == 0) { + output.registerCallback(callback); + } else { + callbacks.add(callback); + } + } + + @Override public void write(long val) throws IOException { if (numLiterals == 0) { initializeLiterals(val); diff --git orc/src/java/org/apache/orc/impl/StringRedBlackTree.java orc/src/java/org/apache/orc/impl/StringRedBlackTree.java index c353ab0..ab50f6d 100644 --- orc/src/java/org/apache/orc/impl/StringRedBlackTree.java +++ orc/src/java/org/apache/orc/impl/StringRedBlackTree.java @@ -191,6 +191,19 @@ public void getText(Text result, int originalPosition) { byteArray.setText(result, offset, length); } + public int write(OutputStream out, + int originalPosition) throws IOException { + int offset = keyOffsets.get(originalPosition); + int length; + if (originalPosition + 1 == keyOffsets.size()) { + length = byteArray.size() - offset; + } else { + length = keyOffsets.get(originalPosition + 1) - offset; + } + byteArray.write(out, offset, length); + return length; + } + /** * Get the size of the character data in the table. * @return the bytes used by the table diff --git orc/src/java/org/apache/orc/impl/WriterImpl.java orc/src/java/org/apache/orc/impl/WriterImpl.java index b2966e0..b6459f6 100644 --- orc/src/java/org/apache/orc/impl/WriterImpl.java +++ orc/src/java/org/apache/orc/impl/WriterImpl.java @@ -97,9 +97,6 @@ private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; - // threshold above which buffer size will be automatically resized - private static final int COLUMN_COUNT_THRESHOLD = 1000; - private final FileSystem fs; private final Path path; private final long defaultStripeSize; @@ -401,19 +398,6 @@ public void output(ByteBuffer buffer) throws IOException { } } - private static class RowIndexPositionRecorder implements PositionRecorder { - private final OrcProto.RowIndexEntry.Builder builder; - - RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) { - this.builder = builder; - } - - @Override - public void addPosition(long position) { - builder.addPositions(position); - } - } - /** * Interface from the Writer to the TreeWriters. This limits the visibility * that the TreeWriters have into the Writer. @@ -564,14 +548,14 @@ public boolean hasWriterTimeZone() { private abstract static class TreeWriter { protected final int id; protected final BitFieldWriter isPresent; + private long isPresentStart; private final boolean isCompressed; protected final ColumnStatisticsImpl indexStatistics; protected final ColumnStatisticsImpl stripeColStatistics; private final ColumnStatisticsImpl fileStatistics; protected TreeWriter[] childrenWriters; - protected final RowIndexPositionRecorder rowIndexPosition; - private final OrcProto.RowIndex.Builder rowIndex; - private final OrcProto.RowIndexEntry.Builder rowIndexEntry; + protected final OrcProto.RowIndex.Builder rowIndex; + protected RowIndexEntry rowIndexEntry; private final PositionedOutputStream rowIndexStream; private final PositionedOutputStream bloomFilterStream; protected final BloomFilterIO bloomFilter; @@ -612,8 +596,7 @@ public boolean hasWriterTimeZone() { fileStatistics = ColumnStatisticsImpl.create(schema); childrenWriters = new TreeWriter[0]; rowIndex = OrcProto.RowIndex.newBuilder(); - rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); - rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); + rowIndexEntry = new RowIndexEntry(); stripeStatsBuilders = Lists.newArrayList(); if (streamFactory.buildIndex()) { rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX); @@ -634,18 +617,10 @@ public boolean hasWriterTimeZone() { } } - protected OrcProto.RowIndex.Builder getRowIndex() { - return rowIndex; - } - protected ColumnStatisticsImpl getStripeStatistics() { return stripeColStatistics; } - protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() { - return rowIndexEntry; - } - IntegerWriter createIntegerWriter(PositionedOutputStream output, boolean signed, boolean isDirectV2, StreamFactory writer) { @@ -743,6 +718,17 @@ private void removeIsPresentPositions() { } /** + * Flush all of the streams in preparation for writing the stripe. + * This guarantees that all of the compression callbacks are done before + * we start building the indexes. + */ + void flush() throws IOException { + if (isPresent != null) { + isPresent.flush(); + } + } + + /** * Write the stripe out to the file. * @param builder the stripe footer that contains the information about the * layout of the stripe. The TreeWriter is required to update @@ -754,17 +740,13 @@ private void removeIsPresentPositions() { */ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { - if (isPresent != null) { - isPresent.flush(); - - // if no nulls are found in a stream, then suppress the stream - if(!foundNulls) { - isPresentOutStream.suppress(); - // since isPresent bitstream is suppressed, update the index to - // remove the positions of the isPresent stream - if (rowIndexStream != null) { - removeIsPresentPositions(); - } + // if no nulls are found in a stream, then suppress the stream + if(isPresent != null && !foundNulls) { + isPresentOutStream.suppress(); + // since isPresent bitstream is suppressed, update the index to + // remove the positions of the isPresent stream + if (rowIndexStream != null) { + removeIsPresentPositions(); } } @@ -791,7 +773,6 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, rowIndexStream.flush(); } rowIndex.clear(); - rowIndexEntry.clear(); // write the bloom filter to out stream if (bloomFilterStream != null) { @@ -826,23 +807,105 @@ private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder, } /** + * This class builds the RowIndexEntry for the row indexes. It represents + * the information about a row group (10,000 rows) with both the statistics + * and the position information. + */ + class RowIndexEntry implements PositionRecorder { + int nextStream = 0; + int streamsLeft = 0; + boolean isActive = false; + final OrcProto.RowIndexEntry.Builder entry; + + RowIndexEntry() { + entry = OrcProto.RowIndexEntry.newBuilder(); + } + + void finishStream(int streamId, long length) { + entry.setPositions(streamId, length); + if (--streamsLeft == 0 && isActive) { + rowIndex.addEntry(entry); + } + } + + @Override + public void addPosition(long offset) { + entry.addPositions(offset); + } + + /** + * The entry must be active and have no more remaining callbacks, + * for it to be finalized. + */ + public void activate() { + if (!isActive) { + isActive = true; + if (streamsLeft == 0) { + rowIndex.addEntry(entry); + } + } + } + + /** + * Add a callback to this entry. + * @return the new callback + */ + public StreamCallback addCallback(long startPosition) { + int streamId = nextStream++; + streamsLeft += 1; + entry.addLengths(0); + return new StreamCallback(this, startPosition, streamId); + } + } + + /** + * This class is the proxy that gets called when the next RLE and + * compression blocks are closed. + */ + static class StreamCallback + implements PositionedOutputStream.CompressionCallback { + final RowIndexEntry entry; + final int streamId; + final long startPosition; + + StreamCallback(RowIndexEntry entry, + long startPosition, + int streamId) { + this.entry = entry; + this.streamId = streamId; + this.startPosition = startPosition; + } + + @Override + public void compressionDone(long length) { + entry.finishStream(streamId, length - startPosition); + } + } + + /** * Create a row index entry with the previous location and the current * index statistics. Also merges the index statistics into the file * statistics before they are cleared. Finally, it records the start of the * next index and ensures all of the children columns also create an entry. * @throws IOException */ - void createRowIndexEntry() throws IOException { + RowIndexEntry createRowIndexEntry() throws IOException { stripeColStatistics.merge(indexStatistics); - rowIndexEntry.setStatistics(indexStatistics.serialize()); + rowIndexEntry.entry.setStatistics(indexStatistics.serialize()); + if (isPresent != null) { + isPresent.registerCallback(rowIndexEntry.addCallback(isPresentStart)); + } + RowIndexEntry oldEntry = rowIndexEntry; + + // Reset for the next index point indexStatistics.reset(); - rowIndex.addEntry(rowIndexEntry); - rowIndexEntry.clear(); + rowIndexEntry = new RowIndexEntry(); addBloomFilterEntry(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); for(TreeWriter child: childrenWriters) { child.createRowIndexEntry(); } + return oldEntry; } void addBloomFilterEntry() { @@ -857,12 +920,13 @@ void addBloomFilterEntry() { /** * Record the current position in each of this column's streams. - * @param recorder where should the locations be recorded + * @param entry where should the locations be recorded * @throws IOException */ - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry entry) throws IOException { if (isPresent != null) { - isPresent.getPosition(recorder); + isPresent.getPosition(entry); + isPresentStart = entry.entry.getPositions(0); } } @@ -881,6 +945,7 @@ long estimateMemory() { private static class BooleanTreeWriter extends TreeWriter { private final BitFieldWriter writer; + private long writerStart; BooleanTreeWriter(int columnId, TypeDescription schema, @@ -890,7 +955,7 @@ long estimateMemory() { PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.writer = new BitFieldWriter(out, 1); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -918,22 +983,38 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + writer.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - writer.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); writer.getPosition(recorder); + writerStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + writer.registerCallback(result.addCallback(writerStart)); + result.activate(); + return result; } } private static class ByteTreeWriter extends TreeWriter { private final RunLengthByteWriter writer; + private long writerStart; ByteTreeWriter(int columnId, TypeDescription schema, @@ -942,7 +1023,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { super(columnId, schema, writer, nullable); this.writer = new RunLengthByteWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA)); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -976,22 +1057,38 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + writer.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - writer.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); writer.getPosition(recorder); + writerStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + writer.registerCallback(result.addCallback(writerStart)); + result.activate(); + return result; } } private static class IntegerTreeWriter extends TreeWriter { private final IntegerWriter writer; + private long writerStart; private boolean isDirectV2 = true; IntegerTreeWriter(int columnId, @@ -1003,7 +1100,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1047,22 +1144,38 @@ void writeBatch(ColumnVector vector, int offset, } @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + writer.registerCallback(result.addCallback(writerStart)); + result.activate(); + return result; + } + + @Override + void flush() throws IOException { + super.flush(); + writer.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - writer.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); writer.getPosition(recorder); + writerStart = recorder.entry.getPositions(offset); } } private static class FloatTreeWriter extends TreeWriter { private final PositionedOutputStream stream; + private long streamStart; private final SerializationUtils utils; FloatTreeWriter(int columnId, @@ -1073,7 +1186,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1106,24 +1219,39 @@ void writeBatch(ColumnVector vector, int offset, } } + @Override + void flush() throws IOException { + super.flush(); + stream.flush(); + } @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - stream.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); stream.getPosition(recorder); + streamStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + stream.registerCallback(result.addCallback(streamStart)); + result.activate(); + return result; } } private static class DoubleTreeWriter extends TreeWriter { private final PositionedOutputStream stream; + private long streamStart; private final SerializationUtils utils; DoubleTreeWriter(int columnId, @@ -1134,7 +1262,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1168,32 +1296,47 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + stream.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - stream.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + stream.registerCallback(result.addCallback(streamStart)); + result.activate(); + return result; + } + + @Override + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); stream.getPosition(recorder); + streamStart = recorder.entry.getPositions(offset); } } private static abstract class StringBaseTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final OutStream stringOutput; - private final IntegerWriter lengthOutput; - private final IntegerWriter rowOutput; + protected final IntegerWriter lengthOutput; + private long lengthOutputStart; + protected final IntegerWriter rowOutput; protected final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); protected final DynamicIntArray rows = new DynamicIntArray(); - protected final PositionedOutputStream directStreamOutput; - protected final IntegerWriter directLengthOutput; - private final List savedRowIndex = - new ArrayList(); + protected final OutStream dataStream; + private long dataStreamStart; + private final List savedRowIndex = new ArrayList<>(); private final boolean buildIndex; private final List rowIndexValueCount = new ArrayList(); // If the number of keys in a dictionary is greater than this fraction of @@ -1201,8 +1344,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final double dictionaryKeySizeThreshold; protected boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; - private boolean doneDictionaryCheck; - private final boolean strideDictionaryCheck; + private boolean needStrideDictionaryCheck; StringBaseTreeWriter(int columnId, TypeDescription schema, @@ -1212,103 +1354,149 @@ void recordPosition(PositionRecorder recorder) throws IOException { this.isDirectV2 = isNewWriteFormat(writer); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); - lengthOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - rowOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + dataStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); + OutStream lengthStream = writer.createStream(id, + OrcProto.Stream.Kind.LENGTH); + lengthOutput = createIntegerWriter(lengthStream, false, isDirectV2, + writer); + rowOutput = createIntegerWriter(dataStream, false, isDirectV2, writer); + recordPosition(rowIndexEntry); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); - directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); - directLengthOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + Configuration conf = writer.getConfiguration(); dictionaryKeySizeThreshold = OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); - strideDictionaryCheck = - OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); - doneDictionaryCheck = false; + needStrideDictionaryCheck = + isDirectV2 && + OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); } - private boolean checkDictionaryEncoding() { - if (!doneDictionaryCheck) { + private boolean checkDictionaryEncoding() throws IOException { + if (needStrideDictionaryCheck && rows.size() > 0) { // Set the flag indicating whether or not to use dictionary encoding // based on whether or not the fraction of distinct keys over number of // non-null rows is less than the configured threshold - float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f; - useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold; - doneDictionaryCheck = true; + float ratio = (float) (dictionary.size()) / rows.size(); + useDictionaryEncoding = ratio <= dictionaryKeySizeThreshold; + needStrideDictionaryCheck = false; + if (!useDictionaryEncoding) { + switchToDirect(); + } } return useDictionaryEncoding; } @Override - void writeStripe(OrcProto.StripeFooter.Builder builder, - int requiredIndexEntries) throws IOException { + void flush() throws IOException { + super.flush(); // if rows in stripe is less than dictionaryCheckAfterRows, dictionary // checking would not have happened. So do it again here. checkDictionaryEncoding(); + lengthOutput.flush(); if (useDictionaryEncoding) { - flushDictionary(); + writeDictionaryStripe(); + stringOutput.flush(); + rowOutput.flush(); } else { - // flushout any left over entries from dictionary - if (rows.size() > 0) { - flushDictionary(); - } - + dataStream.flush(); // suppress the stream for every stripe if dictionary is disabled stringOutput.suppress(); } + } - // we need to build the rowindex before calling super, since it - // writes it out. + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - stringOutput.flush(); - lengthOutput.flush(); - rowOutput.flush(); - directStreamOutput.flush(); - directLengthOutput.flush(); - // reset all of the fields to be ready for the next stripe. - dictionary.clear(); - savedRowIndex.clear(); - rowIndexValueCount.clear(); - recordPosition(rowIndexPosition); - rowIndexValueCount.add(0L); + createRowIndexEntry(); - if (!useDictionaryEncoding) { + if (useDictionaryEncoding) { + // reset all of the fields to be ready for the next stripe. + dictionary.clear(); + savedRowIndex.clear(); + rowIndexValueCount.clear(); + rowIndexValueCount.add(0L); + } else { // record the start positions of first index stride of next stripe i.e // beginning of the direct streams when dictionary is disabled - recordDirectStreamPosition(); + recordPosition(rowIndexEntry); } } - private void flushDictionary() throws IOException { - final int[] dumpOrder = new int[dictionary.size()]; - - if (useDictionaryEncoding) { - // Write the dictionary by traversing the red-black tree writing out - // the bytes and lengths; and creating the map from the original order - // to the final sorted order. - - dictionary.visit(new StringRedBlackTree.Visitor() { - private int currentId = 0; - @Override - public void visit(StringRedBlackTree.VisitorContext context - ) throws IOException { - context.writeBytes(stringOutput); - lengthOutput.write(context.getLength()); - dumpOrder[context.getOriginalPosition()] = currentId++; + /** + * After trying the dictionary, fall back to a direct encoding. This + * means that we need to write the values that are currently stored + * and clear them out. + * @throws IOException + */ + private void switchToDirect() throws IOException { + int length = rows.size(); + int rowIndexEntry = 0; + // some of the entries may have been added already, and we want to + // process them from a clean slate + rowIndex.clearEntry(); + RowIndexEntry previous = null; + dataStreamStart = 0; + lengthOutputStart = 0; + // write the values translated into the dump order. + for(int i = 0; i <= length; ++i) { + // now that we are writing out the row values, we can finalize the + // row index + if (buildIndex) { + while (i == rowIndexValueCount.get(rowIndexEntry) && + rowIndexEntry < savedRowIndex.size()) { + if (previous != null) { + dataStream.registerCallback(previous.addCallback( + dataStreamStart)); + lengthOutput.registerCallback(previous.addCallback( + lengthOutputStart)); + } + previous = savedRowIndex.get(rowIndexEntry++); + int offset = previous.entry.getPositionsCount(); + dataStream.getPosition(previous); + dataStreamStart = previous.entry.getPositions(offset); + offset = previous.entry.getPositionsCount(); + lengthOutput.getPosition(previous); + lengthOutputStart = previous.entry.getPositions(offset); } - }); - } else { - // for direct encoding, we don't want the dictionary data stream - stringOutput.suppress(); + } + if (i != length) { + int len = dictionary.write(dataStream, rows.get(i)); + lengthOutput.write(len); + } } + dictionary.clear(); + rows.clear(); + savedRowIndex.clear(); + rowIndexValueCount.clear(); + } + + private void writeDictionaryStripe() throws IOException { + final int[] dumpOrder = new int[dictionary.size()]; + + // Write the dictionary by traversing the red-black tree writing out + // the bytes and lengths; and creating the map from the original order + // to the final sorted order. + + dictionary.visit(new StringRedBlackTree.Visitor() { + private int currentId = 0; + @Override + public void visit(StringRedBlackTree.VisitorContext context + ) throws IOException { + context.writeBytes(stringOutput); + lengthOutput.write(context.getLength()); + dumpOrder[context.getOriginalPosition()] = currentId++; + } + }); int length = rows.size(); int rowIndexEntry = 0; - OrcProto.RowIndex.Builder rowIndex = getRowIndex(); - Text text = new Text(); + // some of the entries may have been added already, and we want to + // process them from a clean slate + rowIndex.clearEntry(); + RowIndexEntry previous = null; + dataStreamStart = 0; // write the values translated into the dump order. for(int i = 0; i <= length; ++i) { // now that we are writing out the row values, we can finalize the @@ -1316,26 +1504,18 @@ public void visit(StringRedBlackTree.VisitorContext context if (buildIndex) { while (i == rowIndexValueCount.get(rowIndexEntry) && rowIndexEntry < savedRowIndex.size()) { - OrcProto.RowIndexEntry.Builder base = - savedRowIndex.get(rowIndexEntry++).toBuilder(); - if (useDictionaryEncoding) { - rowOutput.getPosition(new RowIndexPositionRecorder(base)); - } else { - PositionRecorder posn = new RowIndexPositionRecorder(base); - directStreamOutput.getPosition(posn); - directLengthOutput.getPosition(posn); + if (previous != null) { + rowOutput.registerCallback(previous.addCallback( + dataStreamStart)); } - rowIndex.addEntry(base.build()); + previous = savedRowIndex.get(rowIndexEntry++); + int offset = previous.entry.getPositionsCount(); + rowOutput.getPosition(previous); + dataStreamStart = previous.entry.getPositions(offset); } } if (i != length) { - if (useDictionaryEncoding) { - rowOutput.write(dumpOrder[rows.get(i)]); - } else { - dictionary.getText(text, rows.get(i)); - directStreamOutput.write(text.getBytes(), 0, text.getLength()); - directLengthOutput.write(text.getLength()); - } + rowOutput.write(dumpOrder[rows.get(i)]); } } rows.clear(); @@ -1364,43 +1544,39 @@ public void visit(StringRedBlackTree.VisitorContext context } /** - * This method doesn't call the super method, because unlike most of the - * other TreeWriters, this one can't record the position in the streams - * until the stripe is being flushed. Therefore it saves all of the entries - * and augments them with the final information as the stripe is written. + * When we are using dictionary encoding, this method is significantly + * different than the others. Because the data stream isn't written until + * the stripe is being flushed, we can't know the positions in the stream. * @throws IOException */ @Override - void createRowIndexEntry() throws IOException { - getStripeStatistics().merge(indexStatistics); - OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry(); - rowIndexEntry.setStatistics(indexStatistics.serialize()); - indexStatistics.reset(); - OrcProto.RowIndexEntry base = rowIndexEntry.build(); - savedRowIndex.add(base); - rowIndexEntry.clear(); - addBloomFilterEntry(); - recordPosition(rowIndexPosition); - rowIndexValueCount.add(Long.valueOf(rows.size())); - if (strideDictionaryCheck) { - checkDictionaryEncoding(); - } - if (!useDictionaryEncoding) { - if (rows.size() > 0) { - flushDictionary(); - // just record the start positions of next index stride - recordDirectStreamPosition(); - } else { - // record the start positions of next index stride - recordDirectStreamPosition(); - getRowIndex().addEntry(base); - } + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + checkDictionaryEncoding(); + if (useDictionaryEncoding) { + rowIndexValueCount.add(Long.valueOf(rows.size())); + savedRowIndex.add(result); + } else { + dataStream.registerCallback(result.addCallback(dataStreamStart)); + lengthOutput.registerCallback(result.addCallback(lengthOutputStart)); + result.activate(); } + return result; } - private void recordDirectStreamPosition() throws IOException { - directStreamOutput.getPosition(rowIndexPosition); - directLengthOutput.getPosition(rowIndexPosition); + @Override + public void recordPosition(RowIndexEntry recorder) throws IOException { + super.recordPosition(recorder); + if (useDictionaryEncoding) { + // we'll fill this in later + } else { + int offset = recorder.entry.getPositionsCount(); + dataStream.getPosition(recorder); + dataStreamStart = recorder.entry.getPositions(offset); + offset = recorder.entry.getPositionsCount(); + lengthOutput.getPosition(recorder); + lengthOutputStart = recorder.entry.getPositions(offset); + } } @Override @@ -1422,42 +1598,59 @@ void writeBatch(ColumnVector vector, int offset, int length) throws IOException { super.writeBatch(vector, offset, length); BytesColumnVector vec = (BytesColumnVector) vector; - if (vector.isRepeating) { - if (vector.noNulls || !vector.isNull[0]) { - if (useDictionaryEncoding) { + if (useDictionaryEncoding) { + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]); - for(int i=0; i < length; ++i) { + for (int i = 0; i < length; ++i) { rows.add(id); } - } else { - for(int i=0; i < length; ++i) { - directStreamOutput.write(vec.vector[0], vec.start[0], - vec.length[0]); - directLengthOutput.write(vec.length[0]); + indexStatistics.updateString(vec.vector[0], vec.start[0], + vec.length[0], length); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); } } - indexStatistics.updateString(vec.vector[0], vec.start[0], - vec.length[0], length); - if (createBloomFilter) { - bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + rows.add(dictionary.add(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i])); + indexStatistics.updateString(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } } } } else { - for(int i=0; i < length; ++i) { - if (vec.noNulls || !vec.isNull[i + offset]) { - if (useDictionaryEncoding) { - rows.add(dictionary.add(vec.vector[offset + i], - vec.start[offset + i], vec.length[offset + i])); - } else { - directStreamOutput.write(vec.vector[offset + i], - vec.start[offset + i], vec.length[offset + i]); - directLengthOutput.write(vec.length[offset + i]); + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + for (int i = 0; i < length; ++i) { + dataStream.write(vec.vector[0], vec.start[0], + vec.length[0]); + lengthOutput.write(vec.length[0]); } - indexStatistics.updateString(vec.vector[offset + i], - vec.start[offset + i], vec.length[offset + i], 1); + indexStatistics.updateString(vec.vector[0], vec.start[0], + vec.length[0], length); if (createBloomFilter) { - bloomFilter.addBytes(vec.vector[offset + i], + bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + dataStream.write(vec.vector[offset + i], vec.start[offset + i], vec.length[offset + i]); + lengthOutput.write(vec.length[offset + i]); + indexStatistics.updateString(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } } } } @@ -1507,8 +1700,8 @@ void writeBatch(ColumnVector vector, int offset, } } else { for(int i=0; i < length; ++i) { - directStreamOutput.write(ptr, ptrOffset, itemLength); - directLengthOutput.write(itemLength); + dataStream.write(ptr, ptrOffset, itemLength); + lengthOutput.write(itemLength); } } indexStatistics.updateString(ptr, ptrOffset, itemLength, length); @@ -1535,8 +1728,8 @@ void writeBatch(ColumnVector vector, int offset, if (useDictionaryEncoding) { rows.add(dictionary.add(ptr, ptrOffset, itemLength)); } else { - directStreamOutput.write(ptr, ptrOffset, itemLength); - directLengthOutput.write(itemLength); + dataStream.write(ptr, ptrOffset, itemLength); + lengthOutput.write(itemLength); } indexStatistics.updateString(ptr, ptrOffset, itemLength, 1); if (createBloomFilter) { @@ -1551,7 +1744,7 @@ void writeBatch(ColumnVector vector, int offset, /** * Under the covers, varchar is written to ORC the same way as string. */ - private static class VarcharTreeWriter extends StringBaseTreeWriter { + private static class VarcharTreeWriter extends StringTreeWriter { private final int maxLength; VarcharTreeWriter(int columnId, @@ -1565,47 +1758,20 @@ void writeBatch(ColumnVector vector, int offset, @Override void writeBatch(ColumnVector vector, int offset, int length) throws IOException { - super.writeBatch(vector, offset, length); BytesColumnVector vec = (BytesColumnVector) vector; - if (vector.isRepeating) { - if (vector.noNulls || !vector.isNull[0]) { - int itemLength = Math.min(vec.length[0], maxLength); - if (useDictionaryEncoding) { - int id = dictionary.add(vec.vector[0], vec.start[0], itemLength); - for(int i=0; i < length; ++i) { - rows.add(id); - } - } else { - for(int i=0; i < length; ++i) { - directStreamOutput.write(vec.vector[0], vec.start[0], - itemLength); - directLengthOutput.write(itemLength); - } - } - indexStatistics.updateString(vec.vector[0], vec.start[0], - itemLength, length); - if (createBloomFilter) { - bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength); + if (vec.isRepeating) { + if (vec.noNulls || !vec.isNull[0]) { + int origLen = vec.length[0]; + if (origLen > maxLength) { + vec.length[0] = maxLength; } + super.writeBatch(vec, offset, length); + vec.length[0] = origLen; } } else { - for(int i=0; i < length; ++i) { - if (vec.noNulls || !vec.isNull[i + offset]) { - int itemLength = Math.min(vec.length[offset + i], maxLength); - if (useDictionaryEncoding) { - rows.add(dictionary.add(vec.vector[offset + i], - vec.start[offset + i], itemLength)); - } else { - directStreamOutput.write(vec.vector[offset + i], - vec.start[offset + i], itemLength); - directLengthOutput.write(itemLength); - } - indexStatistics.updateString(vec.vector[offset + i], - vec.start[offset + i], itemLength, 1); - if (createBloomFilter) { - bloomFilter.addBytes(vec.vector[offset + i], - vec.start[offset + i], itemLength); - } + for(int r=0; r < length; ++r) { + if (vec.noNulls || !vec.isNull[r + offset]) { + } } } @@ -1614,7 +1780,9 @@ void writeBatch(ColumnVector vector, int offset, private static class BinaryTreeWriter extends TreeWriter { private final PositionedOutputStream stream; + private long streamStart; private final IntegerWriter length; + private long lengthStart; private boolean isDirectV2 = true; BinaryTreeWriter(int columnId, @@ -1627,7 +1795,7 @@ void writeBatch(ColumnVector vector, int offset, this.isDirectV2 = isNewWriteFormat(writer); this.length = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1675,21 +1843,38 @@ void writeBatch(ColumnVector vector, int offset, } } + @Override + void flush() throws IOException { + super.flush(); + stream.flush(); + length.flush(); + } @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - stream.flush(); - length.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); stream.getPosition(recorder); + streamStart = recorder.entry.getPositions(offset); + offset = recorder.entry.getPositionsCount(); length.getPosition(recorder); + lengthStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + stream.registerCallback(result.addCallback(streamStart)); + length.registerCallback(result.addCallback(lengthStart)); + result.activate(); + return result; } } @@ -1701,7 +1886,9 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class TimestampTreeWriter extends TreeWriter { private final IntegerWriter seconds; + private long secondsStart; private final IntegerWriter nanos; + private long nanosStart; private final boolean isDirectV2; private final long base_timestamp; @@ -1715,7 +1902,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); this.nanos = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); // for unit tests to set different time zones this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; writer.useWriterTimeZone(true); @@ -1770,12 +1957,17 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + seconds.flush(); + nanos.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - seconds.flush(); - nanos.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } private static long formatNanos(int nanos) { @@ -1795,15 +1987,29 @@ private static long formatNanos(int nanos) { } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); seconds.getPosition(recorder); + secondsStart = recorder.entry.getPositions(offset); + offset = recorder.entry.getPositionsCount(); nanos.getPosition(recorder); + nanosStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + seconds.registerCallback(result.addCallback(secondsStart)); + nanos.registerCallback(result.addCallback(nanosStart)); + result.activate(); + return result; } } private static class DateTreeWriter extends TreeWriter { private final IntegerWriter writer; + private long writerStart; private final boolean isDirectV2; DateTreeWriter(int columnId, @@ -1815,7 +2021,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1849,17 +2055,24 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + writer.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - writer.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); writer.getPosition(recorder); + writerStart = recorder.entry.getPositions(offset); } @Override @@ -1871,11 +2084,21 @@ void recordPosition(PositionRecorder recorder) throws IOException { return OrcProto.ColumnEncoding.newBuilder() .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + writer.registerCallback(result.addCallback(writerStart)); + result.activate(); + return result; + } } private static class DecimalTreeWriter extends TreeWriter { private final PositionedOutputStream valueStream; + private long valueStreamStart; private final IntegerWriter scaleStream; + private long scaleStreamStart; private final boolean isDirectV2; DecimalTreeWriter(int columnId, @@ -1887,7 +2110,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.scaleStream = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -1935,19 +2158,37 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + valueStream.flush(); + scaleStream.flush(); + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - valueStream.flush(); - scaleStream.flush(); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); valueStream.getPosition(recorder); + valueStreamStart = recorder.entry.getPositions(offset); + offset = recorder.entry.getPositionsCount(); scaleStream.getPosition(recorder); + scaleStreamStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + valueStream.registerCallback(result.addCallback(valueStreamStart)); + scaleStream.registerCallback(result.addCallback(scaleStreamStart)); + result.activate(); + return result; } } @@ -1964,12 +2205,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { children.get(i), writer, true); } - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override void writeRootBatch(VectorizedRowBatch batch, int offset, int length) throws IOException { + System.out.println("OOM writing batch from " + offset + " for " + length); // update the statistics for the root column indexStatistics.increment(length); // I'm assuming that the root column isn't nullable so that I don't need @@ -2022,18 +2264,36 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + for(TreeWriter child: childrenWriters) { + child.flush(); + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { + System.out.println("OOM writing struct stripe"); super.writeStripe(builder, requiredIndexEntries); for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + System.out.println("OOM called struct createRowIndexEntry"); + RowIndexEntry result = super.createRowIndexEntry(); + result.activate(); + return result; } } private static class ListTreeWriter extends TreeWriter { private final IntegerWriter lengths; + private long lengthsStart; private final boolean isDirectV2; ListTreeWriter(int columnId, @@ -2047,7 +2307,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, createTreeWriter(schema.getChildren().get(0), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -2107,25 +2367,44 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + lengths.flush(); + for(TreeWriter child: childrenWriters) { + child.flush(); + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - lengths.flush(); for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); lengths.getPosition(recorder); + lengthsStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + lengths.registerCallback(result.addCallback(lengthsStart)); + result.activate(); + return result; } } private static class MapTreeWriter extends TreeWriter { private final IntegerWriter lengths; + private long lengthsStart; private final boolean isDirectV2; MapTreeWriter(int columnId, @@ -2142,7 +2421,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { createTreeWriter(children.get(1), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -2207,25 +2486,44 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + lengths.flush(); + for(TreeWriter child: childrenWriters) { + child.flush(); + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - lengths.flush(); for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); lengths.getPosition(recorder); + lengthsStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + lengths.registerCallback(result.addCallback(lengthsStart)); + result.activate(); + return result; } } private static class UnionTreeWriter extends TreeWriter { private final RunLengthByteWriter tags; + private long tagsStart; UnionTreeWriter(int columnId, TypeDescription schema, @@ -2241,7 +2539,7 @@ void recordPosition(PositionRecorder recorder) throws IOException { tags = new RunLengthByteWriter(writer.createStream(columnId, OrcProto.Stream.Kind.DATA)); - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override @@ -2297,20 +2595,38 @@ void writeBatch(ColumnVector vector, int offset, } @Override + void flush() throws IOException { + super.flush(); + tags.flush(); + for(TreeWriter child: childrenWriters) { + child.flush(); + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); - tags.flush(); for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + recordPosition(rowIndexEntry); } @Override - void recordPosition(PositionRecorder recorder) throws IOException { + void recordPosition(RowIndexEntry recorder) throws IOException { super.recordPosition(recorder); + int offset = recorder.entry.getPositionsCount(); tags.getPosition(recorder); + tagsStart = recorder.entry.getPositions(offset); + } + + @Override + RowIndexEntry createRowIndexEntry() throws IOException { + RowIndexEntry result = super.createRowIndexEntry(); + tags.registerCallback(result.addCallback(tagsStart)); + result.activate(); + return result; } } @@ -2495,6 +2811,7 @@ private void flushStripe() throws IOException { (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); + treeWriter.flush(); treeWriter.writeStripe(builder, requiredIndexEntries); long indexSize = 0; long dataSize = 0; diff --git orc/src/protobuf/orc_proto.proto orc/src/protobuf/orc_proto.proto index f4935b4..2d18ec3 100644 --- orc/src/protobuf/orc_proto.proto +++ orc/src/protobuf/orc_proto.proto @@ -82,6 +82,7 @@ message ColumnStatistics { message RowIndexEntry { repeated uint64 positions = 1 [packed=true]; optional ColumnStatistics statistics = 2; + repeated uint32 lengths = 3 [packed=true]; } message RowIndex {