Details

    • Type: Sub-task
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: C++
    • Labels:
      None

      Description

      As ORC-178 has implemented basic column writers for integers. This JIRA is targeted at implementing ColumnWriters of remaining primitive types including float, double, string, binary, etc.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user wgtmac opened a pull request:

          https://github.com/apache/orc/pull/149

          ORC-224: Implement column writers of primitive types

          1. Implementation of ColumnWriters for float, double, tinyint, boolean, string, char, varchar, binary, date, timestamp and decimal types.
          2. Only support RleVersion1. RleVersion2 and dictionary encoding for string types are not implemented in this patch.
          3. Added tests in TestWriter.cc.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/wgtmac/orc ORC-224

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/orc/pull/149.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #149


          commit 0a9bcc0c61261dce65956a83b54dcf581844e976
          Author: Gang Wu <gang.w@alibaba-inc.com>
          Date: 2017-08-08T22:07:06Z

          ORC-224: Implement column writers of primitive types


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user wgtmac opened a pull request: https://github.com/apache/orc/pull/149 ORC-224 : Implement column writers of primitive types 1. Implementation of ColumnWriters for float, double, tinyint, boolean, string, char, varchar, binary, date, timestamp and decimal types. 2. Only support RleVersion1. RleVersion2 and dictionary encoding for string types are not implemented in this patch. 3. Added tests in TestWriter.cc. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wgtmac/orc ORC-224 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/orc/pull/149.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #149 commit 0a9bcc0c61261dce65956a83b54dcf581844e976 Author: Gang Wu <gang.w@alibaba-inc.com> Date: 2017-08-08T22:07:06Z ORC-224 : Implement column writers of primitive types
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          @omalley @majetideepak Please take a look when you have time. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 @omalley @majetideepak Please take a look when you have time. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on the issue:

          https://github.com/apache/orc/pull/149

          @wgtmac Sorry for taking longer. I will definitely try to complete this as soon as possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on the issue: https://github.com/apache/orc/pull/149 @wgtmac Sorry for taking longer. I will definitely try to complete this as soon as possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          @majetideepak No worries. Thanks for looking into this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 @majetideepak No worries. Thanks for looking into this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134311129

          — Diff: c++/test/TestWriter.cc —
          @@ -209,5 +209,612 @@ namespace orc {
          }
          EXPECT_FALSE(rowReader->next(*batch));
          }
          -}

          + TEST(Writer, writeStringAndBinaryColumn) {
          — End diff –

          These tests can definitely be improved by using a templated test class

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134311129 — Diff: c++/test/TestWriter.cc — @@ -209,5 +209,612 @@ namespace orc { } EXPECT_FALSE(rowReader->next(*batch)); } -} + TEST(Writer, writeStringAndBinaryColumn) { — End diff – These tests can definitely be improved by using a templated test class
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134309488

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            + const StringVectorBatch & stringBatch =
            + dynamic_cast<const StringVectorBatch &>(rowBatch);
            +
            + char *const * data = stringBatch.data.data() + offset;
            + const int64_t* length = stringBatch.length.data() + offset;
            + const char* notNull = stringBatch.hasNulls ?
            + stringBatch.notNull.data() + offset : nullptr;
            +
            + lengthEncoder->add(length, numValues, notNull);
            +
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (!notNull || notNull[i]) { + dataStream->write(data[i], static_cast<size_t>(length[i])); + } + }

            +
            + StringColumnStatisticsImpl* strStats =
            + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
            +
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i])

            { + strStats->update(data[i], + static_cast<size_t>(length[i])); + strStats->increase(1); + }

            else if (!hasNull) {
            + hasNull = true;

              • End diff –

          Using a separate `hasNull` variable approach is not being used for the `TimestampColumnWriter`. Will be nice to see be consistent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134309488 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + void StringColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const StringVectorBatch & stringBatch = + dynamic_cast<const StringVectorBatch &>(rowBatch); + + char *const * data = stringBatch.data.data() + offset; + const int64_t* length = stringBatch.length.data() + offset; + const char* notNull = stringBatch.hasNulls ? + stringBatch.notNull.data() + offset : nullptr; + + lengthEncoder->add(length, numValues, notNull); + + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (!notNull || notNull[i]) { + dataStream->write(data[i], static_cast<size_t>(length[i])); + } + } + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + strStats->update(data[i], + static_cast<size_t>(length[i])); + strStats->increase(1); + } else if (!hasNull) { + hasNull = true; End diff – Using a separate `hasNull` variable approach is not being used for the `TimestampColumnWriter`. Will be nice to see be consistent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134304957

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + static int64_t formatNano(int64_t nanos) {

              • End diff –

          comments here will help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134304957 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + static int64_t formatNano(int64_t nanos) { End diff – comments here will help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134308560

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + static int64_t formatNano(int64_t nanos) {
            + if (nanos == 0)

            { + return 0; + }

            else if (nanos % 100 != 0)

            { + return (nanos) << 3; + }

            else

            Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + }

            + }
            +
            + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + TimestampVectorBatch& tsBatch =
            + dynamic_cast<TimestampVectorBatch &>(rowBatch);
            +
            + const char* notNull = tsBatch.hasNulls ?
            + tsBatch.notNull.data() + offset : nullptr;
            + int64_t *secs = tsBatch.data.data() + offset;
            + int64_t *nanos = tsBatch.nanoseconds.data() + offset;
            +
            + TimestampColumnStatisticsImpl* tsStats =
            + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + }

            +
            + for (uint64_t i = 0; i < numValues; ++i) {

              • End diff –

          can this loop be fused with the above loop? Applicable to other Writers below as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134308560 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + static int64_t formatNano(int64_t nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return (nanos) << 3; + } else Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + } + } + + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + TimestampVectorBatch& tsBatch = + dynamic_cast<TimestampVectorBatch &>(rowBatch); + + const char* notNull = tsBatch.hasNulls ? + tsBatch.notNull.data() + offset : nullptr; + int64_t *secs = tsBatch.data.data() + offset; + int64_t *nanos = tsBatch.nanoseconds.data() + offset; + + TimestampColumnStatisticsImpl* tsStats = + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + } + + for (uint64_t i = 0; i < numValues; ++i) { End diff – can this loop be fused with the above loop? Applicable to other Writers below as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134308863

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            + const StringVectorBatch & stringBatch =
            + dynamic_cast<const StringVectorBatch &>(rowBatch);
            +
            + char *const * data = stringBatch.data.data() + offset;
            + const int64_t* length = stringBatch.length.data() + offset;
            + const char* notNull = stringBatch.hasNulls ?
            + stringBatch.notNull.data() + offset : nullptr;
            +
            + lengthEncoder->add(length, numValues, notNull);
            +
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (!notNull || notNull[i]) { + dataStream->write(data[i], static_cast<size_t>(length[i])); + } + }

            +
            + StringColumnStatisticsImpl* strStats =
            + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
            +
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {

              • End diff –

          Can this loop be fused with the above loop?

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134308863 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + void StringColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + const StringVectorBatch & stringBatch = + dynamic_cast<const StringVectorBatch &>(rowBatch); + + char *const * data = stringBatch.data.data() + offset; + const int64_t* length = stringBatch.length.data() + offset; + const char* notNull = stringBatch.hasNulls ? + stringBatch.notNull.data() + offset : nullptr; + + lengthEncoder->add(length, numValues, notNull); + + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (!notNull || notNull[i]) { + dataStream->write(data[i], static_cast<size_t>(length[i])); + } + } + + StringColumnStatisticsImpl* strStats = + dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get()); + + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { End diff – Can this loop be fused with the above loop?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134301225

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + static int64_t formatNano(int64_t nanos) {
            + if (nanos == 0)

            { + return 0; + }

            else if (nanos % 100 != 0)

            { + return (nanos) << 3; + }

            else

            Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + }

            + }
            +
            + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + TimestampVectorBatch& tsBatch =
            + dynamic_cast<TimestampVectorBatch &>(rowBatch);
            +
            + const char* notNull = tsBatch.hasNulls ?
            + tsBatch.notNull.data() + offset : nullptr;
            + int64_t *secs = tsBatch.data.data() + offset;
            + int64_t *nanos = tsBatch.nanoseconds.data() + offset;
            +
            + TimestampColumnStatisticsImpl* tsStats =
            + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + }

            +
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + secs[i] -= timezone.getVariant(secs[i]).gmtOffset; + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } + }

            +
            + secRleEncoder->add(secs, numValues, notNull);
            + nanoRleEncoder->add(nanos, numValues, notNull);
            + }
            +
            + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + }

            +
            + uint64_t TimestampColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + }

            +
            + void TimestampColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void TimestampColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + }

            +
            + class DateColumnWriter : public IntegerColumnWriter

            { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + }

            ;
            +
            + DateColumnWriter::DateColumnWriter(
            + const Type &type,
            + const StreamsFactory &factory,
            + const WriterOptions &options) :
            + IntegerColumnWriter(type, factory, options)

            { + // PASS + }
            +
            + void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const LongVectorBatch& longBatch =
            + dynamic_cast<const LongVectorBatch&>(rowBatch);
            +
            + const int64_t* data = longBatch.data.data() + offset;
            + const char* notNull = longBatch.hasNulls ?
            + longBatch.notNull.data() + offset : nullptr;
            +
            + rleEncoder->add(data, numValues, notNull);
            +
            + DateColumnStatisticsImpl* dateStats =
            + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + dateStats->increase(1); + dateStats->update(static_cast<int32_t>(data[i])); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + dateStats->setHasNull(hasNull);
            + }
            +
            + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[8]; + };
            +
            + Decimal64ColumnWriter::Decimal64ColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1),
            + precision(type.getPrecision()),
            + scale(type.getScale()) {
            + valueStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + std::unique_ptr<BufferedOutputStream> scaleStream =
            + factory.createStream(proto::Stream_Kind_SECONDARY);
            + scaleEncoder = createRleEncoder(std::move(scaleStream),
            + true,
            + rleVersion,
            + memPool);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const Decimal64VectorBatch& decBatch =
            + dynamic_cast<const Decimal64VectorBatch&>(rowBatch);
            +
            + const char* notNull = decBatch.hasNulls ?
            + decBatch.notNull.data() + offset : nullptr;
            + const int64_t* values = decBatch.values.data() + offset;
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + int64_t val = zigZag(values[i]);
            + char* data = buffer;
            + while (true) {
            + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + }
            + }
            + valueStream->write(buffer, static_cast<size_t>(data - buffer));
            + }
            + }
            + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
            + scaleEncoder->add(scales.data(), numValues, notNull);
            +
            + DecimalColumnStatisticsImpl* decStats =
            + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + decStats->setHasNull(hasNull);
            + }
            +
            + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + }
            +
            + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + }
            +
            + void Decimal64ColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + char buffer[16]; + };
            +
            + Decimal128ColumnWriter::Decimal128ColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + Decimal64ColumnWriter(type, factory, options) { + // PASS + }

            +
            + Int128 zigZagInt128(const Int128& value) {

              • End diff –

          Some comments here will help

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134301225 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + static int64_t formatNano(int64_t nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return (nanos) << 3; + } else Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + } + } + + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + TimestampVectorBatch& tsBatch = + dynamic_cast<TimestampVectorBatch &>(rowBatch); + + const char* notNull = tsBatch.hasNulls ? + tsBatch.notNull.data() + offset : nullptr; + int64_t *secs = tsBatch.data.data() + offset; + int64_t *nanos = tsBatch.nanoseconds.data() + offset; + + TimestampColumnStatisticsImpl* tsStats = + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + } + + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + secs[i] -= timezone.getVariant(secs[i]).gmtOffset; + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } + } + + secRleEncoder->add(secs, numValues, notNull); + nanoRleEncoder->add(nanos, numValues, notNull); + } + + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t TimestampColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + } + + void TimestampColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void TimestampColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DateColumnWriter : public IntegerColumnWriter { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + } ; + + DateColumnWriter::DateColumnWriter( + const Type &type, + const StreamsFactory &factory, + const WriterOptions &options) : + IntegerColumnWriter(type, factory, options) { + // PASS + } + + void DateColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const LongVectorBatch& longBatch = + dynamic_cast<const LongVectorBatch&>(rowBatch); + + const int64_t* data = longBatch.data.data() + offset; + const char* notNull = longBatch.hasNulls ? + longBatch.notNull.data() + offset : nullptr; + + rleEncoder->add(data, numValues, notNull); + + DateColumnStatisticsImpl* dateStats = + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + dateStats->increase(1); + dateStats->update(static_cast<int32_t>(data[i])); + } else if (!hasNull) { + hasNull = true; + } + } + dateStats->setHasNull(hasNull); + } + + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[8]; + }; + + Decimal64ColumnWriter::Decimal64ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1), + precision(type.getPrecision()), + scale(type.getScale()) { + valueStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + std::unique_ptr<BufferedOutputStream> scaleStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + scaleEncoder = createRleEncoder(std::move(scaleStream), + true, + rleVersion, + memPool); + + if (enableIndex) { + recordPosition(); + } + } + + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const Decimal64VectorBatch& decBatch = + dynamic_cast<const Decimal64VectorBatch&>(rowBatch); + + const char* notNull = decBatch.hasNulls ? + decBatch.notNull.data() + offset : nullptr; + const int64_t* values = decBatch.values.data() + offset; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + int64_t val = zigZag(values [i] ); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + } + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + } + } + decStats->setHasNull(hasNull); + } + + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + } + + void Decimal64ColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + } + + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + char buffer[16]; + }; + + Decimal128ColumnWriter::Decimal128ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + Decimal64ColumnWriter(type, factory, options) { + // PASS + } + + Int128 zigZagInt128(const Int128& value) { End diff – Some comments here will help
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134310213

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }
            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }
            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + };
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool()) {
            + dataStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + buffer.resize(isFloat ? 4 : 8);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + }
            + dataStream->write(data, bytes);
            + }
            + }
            +
            + DoubleColumnStatisticsImpl* doubleStats =
            + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + }

            + doubleStats->setHasNull(hasNull);
            + }
            +
            + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + }

            +
            + uint64_t DoubleColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + }

            +
            + void DoubleColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void DoubleColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + }

            +
            + class StringColumnWriter : public ColumnWriter

            { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + }

            ;
            +
            + StringColumnWriter::StringColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1)

            Unknown macro: { + std}

            +
            + static int64_t formatNano(int64_t nanos) {
            + if (nanos == 0)

            { + return 0; + }

            else if (nanos % 100 != 0)

            { + return (nanos) << 3; + }

            else

            Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + }

            + }
            +
            + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + TimestampVectorBatch& tsBatch =
            + dynamic_cast<TimestampVectorBatch &>(rowBatch);
            +
            + const char* notNull = tsBatch.hasNulls ?
            + tsBatch.notNull.data() + offset : nullptr;
            + int64_t *secs = tsBatch.data.data() + offset;
            + int64_t *nanos = tsBatch.nanoseconds.data() + offset;
            +
            + TimestampColumnStatisticsImpl* tsStats =
            + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + }

            +
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + secs[i] -= timezone.getVariant(secs[i]).gmtOffset; + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } + }

            +
            + secRleEncoder->add(secs, numValues, notNull);
            + nanoRleEncoder->add(nanos, numValues, notNull);
            + }
            +
            + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + }

            +
            + uint64_t TimestampColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + }

            +
            + void TimestampColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void TimestampColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + }

            +
            + class DateColumnWriter : public IntegerColumnWriter

            { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + }

            ;
            +
            + DateColumnWriter::DateColumnWriter(
            + const Type &type,
            + const StreamsFactory &factory,
            + const WriterOptions &options) :
            + IntegerColumnWriter(type, factory, options)

            { + // PASS + }
            +
            + void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const LongVectorBatch& longBatch =
            + dynamic_cast<const LongVectorBatch&>(rowBatch);
            +
            + const int64_t* data = longBatch.data.data() + offset;
            + const char* notNull = longBatch.hasNulls ?
            + longBatch.notNull.data() + offset : nullptr;
            +
            + rleEncoder->add(data, numValues, notNull);
            +
            + DateColumnStatisticsImpl* dateStats =
            + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + dateStats->increase(1); + dateStats->update(static_cast<int32_t>(data[i])); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + dateStats->setHasNull(hasNull);
            + }
            +
            + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[8]; + };
            +
            + Decimal64ColumnWriter::Decimal64ColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options),
            + rleVersion(RleVersion_1),
            + precision(type.getPrecision()),
            + scale(type.getScale()) {
            + valueStream.reset(new AppendOnlyBufferedStream(
            + factory.createStream(proto::Stream_Kind_DATA)));
            + std::unique_ptr<BufferedOutputStream> scaleStream =
            + factory.createStream(proto::Stream_Kind_SECONDARY);
            + scaleEncoder = createRleEncoder(std::move(scaleStream),
            + true,
            + rleVersion,
            + memPool);
            +
            + if (enableIndex) { + recordPosition(); + }
            + }
            +
            + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const Decimal64VectorBatch& decBatch =
            + dynamic_cast<const Decimal64VectorBatch&>(rowBatch);
            +
            + const char* notNull = decBatch.hasNulls ?
            + decBatch.notNull.data() + offset : nullptr;
            + const int64_t* values = decBatch.values.data() + offset;
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + int64_t val = zigZag(values[i]);
            + char* data = buffer;
            + while (true) {
            + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + }
            + }
            + valueStream->write(buffer, static_cast<size_t>(data - buffer));
            + }
            + }
            + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
            + scaleEncoder->add(scales.data(), numValues, notNull);
            +
            + DecimalColumnStatisticsImpl* decStats =
            + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) { + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + }
            + }
            + decStats->setHasNull(hasNull);
            + }
            +
            + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + }
            +
            + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + }
            +
            + void Decimal64ColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }
            +
            + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + }
            +
            + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + char buffer[16]; + };
            +
            + Decimal128ColumnWriter::Decimal128ColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + Decimal64ColumnWriter(type, factory, options) { + // PASS + }

            +
            + Int128 zigZagInt128(const Int128& value)

            Unknown macro: { + bool isNegative = value < 0; + Int128 val = value.abs(); + val <<= 1; + if (isNegative) { + val -= 1; + } + return val; + }

            +
            + void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const Decimal128VectorBatch & decBatch =
            + dynamic_cast<const Decimal128VectorBatch &>(rowBatch);
            +
            + const char* notNull = decBatch.hasNulls ?
            + decBatch.notNull.data() + offset : nullptr;
            + const Int128* values = decBatch.values.data() + offset;
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + Int128 val = zigZagInt128(values[i]);
            + char* data = buffer;
            + while (true) {

              • End diff –

          some comments here will help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134310213 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + buffer.resize(isFloat ? 4 : 8); + + if (enableIndex) { + recordPosition(); + } + } + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } + dataStream->write(data, bytes); + } + } + + DoubleColumnStatisticsImpl* doubleStats = + dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + doubleStats->increase(1); + doubleStats->update(doubleData[i]); + } else if (!hasNull) { + hasNull = true; + } + } + doubleStats->setHasNull(hasNull); + } + + void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(dataStream->flush()); + streams.push_back(stream); + } + + uint64_t DoubleColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += dataStream->getSize(); + return size; + } + + void DoubleColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void DoubleColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + dataStream->recordPosition(rowIndexPosition.get()); + } + + class StringColumnWriter : public ColumnWriter { + public: + StringColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + std::unique_ptr<RleEncoder> lengthEncoder; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + RleVersion rleVersion; + } ; + + StringColumnWriter::StringColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1) Unknown macro: { + std} + + static int64_t formatNano(int64_t nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return (nanos) << 3; + } else Unknown macro: { + nanos /= 100; + int64_t trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return (nanos) << 3 | trailingZeros; + } + } + + void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + TimestampVectorBatch& tsBatch = + dynamic_cast<TimestampVectorBatch &>(rowBatch); + + const char* notNull = tsBatch.hasNulls ? + tsBatch.notNull.data() + offset : nullptr; + int64_t *secs = tsBatch.data.data() + offset; + int64_t *nanos = tsBatch.nanoseconds.data() + offset; + + TimestampColumnStatisticsImpl* tsStats = + dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get()); + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + // TimestampVectorBatch already stores data in UTC + int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000; + tsStats->increase(1); + tsStats->update(millsUTC); + } else if (!tsStats->hasNull()) { + tsStats->setHasNull(true); + } + } + + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + secs[i] -= timezone.getVariant(secs[i]).gmtOffset; + secs[i] -= timezone.getEpoch(); + nanos[i] = formatNano(nanos[i]); + } + } + + secRleEncoder->add(secs, numValues, notNull); + nanoRleEncoder->add(nanos, numValues, notNull); + } + + void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(secRleEncoder->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(nanoRleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t TimestampColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += secRleEncoder->getBufferSize(); + size += nanoRleEncoder->getBufferSize(); + return size; + } + + void TimestampColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(rleVersion == RleVersion_1 ? + proto::ColumnEncoding_Kind_DIRECT : + proto::ColumnEncoding_Kind_DIRECT_V2); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void TimestampColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + secRleEncoder->recordPosition(rowIndexPosition.get()); + nanoRleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DateColumnWriter : public IntegerColumnWriter { + public: + DateColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + } ; + + DateColumnWriter::DateColumnWriter( + const Type &type, + const StreamsFactory &factory, + const WriterOptions &options) : + IntegerColumnWriter(type, factory, options) { + // PASS + } + + void DateColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const LongVectorBatch& longBatch = + dynamic_cast<const LongVectorBatch&>(rowBatch); + + const int64_t* data = longBatch.data.data() + offset; + const char* notNull = longBatch.hasNulls ? + longBatch.notNull.data() + offset : nullptr; + + rleEncoder->add(data, numValues, notNull); + + DateColumnStatisticsImpl* dateStats = + dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + dateStats->increase(1); + dateStats->update(static_cast<int32_t>(data[i])); + } else if (!hasNull) { + hasNull = true; + } + } + dateStats->setHasNull(hasNull); + } + + class Decimal64ColumnWriter : public ColumnWriter { + public: + static const uint32_t MAX_PRECISION_64 = 18; + static const uint32_t MAX_PRECISION_128 = 38; + + Decimal64ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + protected: + RleVersion rleVersion; + uint64_t precision; + uint64_t scale; + std::unique_ptr<AppendOnlyBufferedStream> valueStream; + std::unique_ptr<RleEncoder> scaleEncoder; + + private: + char buffer[8]; + }; + + Decimal64ColumnWriter::Decimal64ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options), + rleVersion(RleVersion_1), + precision(type.getPrecision()), + scale(type.getScale()) { + valueStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto::Stream_Kind_DATA))); + std::unique_ptr<BufferedOutputStream> scaleStream = + factory.createStream(proto::Stream_Kind_SECONDARY); + scaleEncoder = createRleEncoder(std::move(scaleStream), + true, + rleVersion, + memPool); + + if (enableIndex) { + recordPosition(); + } + } + + void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const Decimal64VectorBatch& decBatch = + dynamic_cast<const Decimal64VectorBatch&>(rowBatch); + + const char* notNull = decBatch.hasNulls ? + decBatch.notNull.data() + offset : nullptr; + const int64_t* values = decBatch.values.data() + offset; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + int64_t val = zigZag(values [i] ); + char* data = buffer; + while (true) { + if ((val & ~0x7f) == 0) { + *(data++) = (static_cast<char>(val)); + break; + } else { + *(data++) = static_cast<char>(0x80 | (val & 0x7f)); + // cast val to unsigned so as to force 0-fill right shift + val = (static_cast<uint64_t>(val) >> 7); + } + } + valueStream->write(buffer, static_cast<size_t>(data - buffer)); + } + } + std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale)); + scaleEncoder->add(scales.data(), numValues, notNull); + + DecimalColumnStatisticsImpl* decStats = + dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + decStats->increase(1); + decStats->update(Decimal(values[i], static_cast<int32_t>(scale))); + } else if (!hasNull) { + hasNull = true; + } + } + decStats->setHasNull(hasNull); + } + + void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream dataStream; + dataStream.set_kind(proto::Stream_Kind_DATA); + dataStream.set_column(static_cast<uint32_t>(columnId)); + dataStream.set_length(valueStream->flush()); + streams.push_back(dataStream); + + proto::Stream secondaryStream; + secondaryStream.set_kind(proto::Stream_Kind_SECONDARY); + secondaryStream.set_column(static_cast<uint32_t>(columnId)); + secondaryStream.set_length(scaleEncoder->flush()); + streams.push_back(secondaryStream); + } + + uint64_t Decimal64ColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += valueStream->getSize(); + size += scaleEncoder->getBufferSize(); + return size; + } + + void Decimal64ColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void Decimal64ColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + valueStream->recordPosition(rowIndexPosition.get()); + scaleEncoder->recordPosition(rowIndexPosition.get()); + } + + class Decimal128ColumnWriter : public Decimal64ColumnWriter { + public: + Decimal128ColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + private: + char buffer[16]; + }; + + Decimal128ColumnWriter::Decimal128ColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + Decimal64ColumnWriter(type, factory, options) { + // PASS + } + + Int128 zigZagInt128(const Int128& value) Unknown macro: { + bool isNegative = value < 0; + Int128 val = value.abs(); + val <<= 1; + if (isNegative) { + val -= 1; + } + return val; + } + + void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const Decimal128VectorBatch & decBatch = + dynamic_cast<const Decimal128VectorBatch &>(rowBatch); + + const char* notNull = decBatch.hasNulls ? + decBatch.notNull.data() + offset : nullptr; + const Int128* values = decBatch.values.data() + offset; + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + Int128 val = zigZagInt128(values [i] ); + char* data = buffer; + while (true) { End diff – some comments here will help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user majetideepak commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134319378

          — Diff: c++/test/TestWriter.cc —
          @@ -209,5 +209,612 @@ namespace orc {
          }
          EXPECT_FALSE(rowReader->next(*batch));
          }
          -}

          + TEST(Writer, writeStringAndBinaryColumn) {
          — End diff –

          See google typed tests. Those should help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user majetideepak commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134319378 — Diff: c++/test/TestWriter.cc — @@ -209,5 +209,612 @@ namespace orc { } EXPECT_FALSE(rowReader->next(*batch)); } -} + TEST(Writer, writeStringAndBinaryColumn) { — End diff – See google typed tests. Those should help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r134630866

          — Diff: c++/test/TestWriter.cc —
          @@ -209,5 +209,612 @@ namespace orc {
          }
          EXPECT_FALSE(rowReader->next(*batch));
          }
          -}

          + TEST(Writer, writeStringAndBinaryColumn) {
          — End diff –

          It is better to keep them in different test cases for following reasons:
          1. It seems that they share the same logic in testing but they differ in the data construction and verification parts for different types. It has the same rationale with the design of ColumnWriter classes.
          2. We may have different design/implementations for some types in the future. It is much easier to test them if we de-couple them into separate tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r134630866 — Diff: c++/test/TestWriter.cc — @@ -209,5 +209,612 @@ namespace orc { } EXPECT_FALSE(rowReader->next(*batch)); } -} + TEST(Writer, writeStringAndBinaryColumn) { — End diff – It is better to keep them in different test cases for following reasons: 1. It seems that they share the same logic in testing but they differ in the data construction and verification parts for different types. It has the same rationale with the design of ColumnWriter classes. 2. We may have different design/implementations for some types in the future. It is much easier to test them if we de-couple them into separate tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          @majetideepak Thanks for taking time to review my previous change! I have committed a new one based on your feedback. Please review them when you have time. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 @majetideepak Thanks for taking time to review my previous change! I have committed a new one based on your feedback. Please review them when you have time. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          Hi @omalley. Please help review this patch when you get the chance, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 Hi @omalley. Please help review this patch when you get the chance, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          Hi @omalley @prasanthj , can anyone of you review and/or merge this patch as @majetideepak has already approved this a few days ago? Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 Hi @omalley @prasanthj , can anyone of you review and/or merge this patch as @majetideepak has already approved this a few days ago? Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jamesclampffer commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r140067969

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);

              • End diff –

          It looks like this is following existing convention from the reader where return values for reinterpret_cast aren't checked but I'd think about adding an orc::InvalidArgument exception. Then you can catch the std::bad_cast exception here (or check for nullptr when assigning to a pointer) and rethrow with a more descriptive message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jamesclampffer commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r140067969 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); End diff – It looks like this is following existing convention from the reader where return values for reinterpret_cast aren't checked but I'd think about adding an orc::InvalidArgument exception. Then you can catch the std::bad_cast exception here (or check for nullptr when assigning to a pointer) and rethrow with a more descriptive message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jamesclampffer commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r140077825

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + }

            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }

            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }

            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void BooleanColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }

            +
            + class DoubleColumnWriter : public ColumnWriter

            { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }

            ;
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool())

            Unknown macro: { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto}

            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j)

            { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
              • End diff –

          I think the intent of this and the block on line 731 would be a lot more clear if you made a helper function that took input and output pointers and a type width to handle an endian agnostic copy.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jamesclampffer commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r140077825 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + } ; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) Unknown macro: { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto} + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } End diff – I think the intent of this and the block on line 731 would be a lot more clear if you made a helper function that took input and output pointers and a type width to handle an endian agnostic copy.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r140425006

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);

              • End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r140425006 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on a diff in the pull request:

          https://github.com/apache/orc/pull/149#discussion_r140425023

          — Diff: c++/src/ColumnWriter.cc —
          @@ -468,25 +472,1099 @@ namespace orc

          { rleEncoder->recordPosition(rowIndexPosition.get()); }
          • std::unique_ptr<ColumnWriter> buildWriter(
          • const Type& type,
          • const StreamsFactory& factory,
          • const WriterOptions& options) {
          • switch (static_cast<int64_t>(type.getKind())) {
          • case STRUCT:
          • return std::unique_ptr<ColumnWriter>(
          • new StructColumnWriter(
          • type,
          • factory,
          • options));
          • case INT:
          • case LONG:
          • case SHORT:
          • return std::unique_ptr<ColumnWriter>(
          • new IntegerColumnWriter(
          • type,
          • factory,
          • options));
            + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + }

            ;
            +
            + ByteColumnWriter::ByteColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options) :
            + ColumnWriter(type, factory, options)

            Unknown macro: { + std}

            +
            + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
            + int64_t* data = byteBatch.data.data() + offset;
            + const char* notNull = byteBatch.hasNulls ?
            + byteBatch.notNull.data() + offset : nullptr;
            +
            + char* byteData = reinterpret_cast<char*>(data);
            + for (uint64_t i = 0; i < numValues; ++i)

            { + byteData[i] = static_cast<char>(data[i]); + }

            + rleEncoder->add(byteData, numValues, notNull);
            +
            + BooleanColumnStatisticsImpl* boolStats =
            + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
            + bool hasNull = false;
            + for (uint64_t i = 0; i < numValues; ++i)

            Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + }

            + boolStats->setHasNull(hasNull);
            + }
            +
            + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams)

            { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + }

            +
            + uint64_t BooleanColumnWriter::getEstimatedSize() const

            { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + }

            +
            + void BooleanColumnWriter::getColumnEncoding(
            + std::vector<proto::ColumnEncoding>& encodings) const

            { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + }

            +
            + void BooleanColumnWriter::recordPosition() const

            { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + }

            +
            + class DoubleColumnWriter : public ColumnWriter

            { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + }

            ;
            +
            + DoubleColumnWriter::DoubleColumnWriter(
            + const Type& type,
            + const StreamsFactory& factory,
            + const WriterOptions& options,
            + bool isFloatType) :
            + ColumnWriter(type, factory, options),
            + isFloat(isFloatType),
            + buffer(*options.getMemoryPool())

            Unknown macro: { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto}

            +
            + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
            + uint64_t offset,
            + uint64_t numValues) {
            + ColumnWriter::add(rowBatch, offset, numValues);
            +
            + const DoubleVectorBatch& dblBatch =
            + dynamic_cast<const DoubleVectorBatch&>(rowBatch);
            +
            + const double* doubleData = dblBatch.data.data() + offset;
            + const char* notNull = dblBatch.hasNulls ?
            + dblBatch.notNull.data() + offset : nullptr;
            +
            + size_t bytes = isFloat ? 4 : 8;
            + char* data = buffer.data();
            +
            + for (uint64_t i = 0; i < numValues; ++i) {
            + if (!notNull || notNull[i]) {
            + if (isFloat) {
            + // to avoid float-double cast
            + const int32_t* intBits =
            + reinterpret_cast<const int32_t*>(&static_cast<const float&>(
            + doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j)

            { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
            + } else {
            + const int64_t* intBits =
            + reinterpret_cast<const int64_t*>(&(doubleData[i]));
            + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + }
              • End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on a diff in the pull request: https://github.com/apache/orc/pull/149#discussion_r140425023 — Diff: c++/src/ColumnWriter.cc — @@ -468,25 +472,1099 @@ namespace orc { rleEncoder->recordPosition(rowIndexPosition.get()); } std::unique_ptr<ColumnWriter> buildWriter( const Type& type, const StreamsFactory& factory, const WriterOptions& options) { switch (static_cast<int64_t>(type.getKind())) { case STRUCT: return std::unique_ptr<ColumnWriter>( new StructColumnWriter( type, factory, options)); case INT: case LONG: case SHORT: return std::unique_ptr<ColumnWriter>( new IntegerColumnWriter( type, factory, options)); + class ByteColumnWriter : public ColumnWriter { + public: + ByteColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + std::unique_ptr<ByteRleEncoder> byteRleEncoder; + } ; + + ByteColumnWriter::ByteColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options) : + ColumnWriter(type, factory, options) Unknown macro: { + std} + + void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch); + int64_t* data = byteBatch.data.data() + offset; + const char* notNull = byteBatch.hasNulls ? + byteBatch.notNull.data() + offset : nullptr; + + char* byteData = reinterpret_cast<char*>(data); + for (uint64_t i = 0; i < numValues; ++i) { + byteData[i] = static_cast<char>(data[i]); + } + rleEncoder->add(byteData, numValues, notNull); + + BooleanColumnStatisticsImpl* boolStats = + dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get()); + bool hasNull = false; + for (uint64_t i = 0; i < numValues; ++i) Unknown macro: { + if (notNull == nullptr || notNull[i]) { + boolStats->increase(1); + boolStats->update(byteData[i], 1); + } else if (!hasNull) { + hasNull = true; + } + } + boolStats->setHasNull(hasNull); + } + + void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) { + ColumnWriter::flush(streams); + + proto::Stream stream; + stream.set_kind(proto::Stream_Kind_DATA); + stream.set_column(static_cast<uint32_t>(columnId)); + stream.set_length(rleEncoder->flush()); + streams.push_back(stream); + } + + uint64_t BooleanColumnWriter::getEstimatedSize() const { + uint64_t size = ColumnWriter::getEstimatedSize(); + size += rleEncoder->getBufferSize(); + return size; + } + + void BooleanColumnWriter::getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const { + proto::ColumnEncoding encoding; + encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + encoding.set_dictionarysize(0); + encodings.push_back(encoding); + } + + void BooleanColumnWriter::recordPosition() const { + ColumnWriter::recordPosition(); + rleEncoder->recordPosition(rowIndexPosition.get()); + } + + class DoubleColumnWriter : public ColumnWriter { + public: + DoubleColumnWriter(const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloat); + + virtual void add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) override; + + virtual void flush(std::vector<proto::Stream>& streams) override; + + virtual uint64_t getEstimatedSize() const override; + + virtual void getColumnEncoding( + std::vector<proto::ColumnEncoding>& encodings) const override; + + virtual void recordPosition() const override; + + private: + bool isFloat; + std::unique_ptr<AppendOnlyBufferedStream> dataStream; + DataBuffer<char> buffer; + } ; + + DoubleColumnWriter::DoubleColumnWriter( + const Type& type, + const StreamsFactory& factory, + const WriterOptions& options, + bool isFloatType) : + ColumnWriter(type, factory, options), + isFloat(isFloatType), + buffer(*options.getMemoryPool()) Unknown macro: { + dataStream.reset(new AppendOnlyBufferedStream( + factory.createStream(proto} + + void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, + uint64_t offset, + uint64_t numValues) { + ColumnWriter::add(rowBatch, offset, numValues); + + const DoubleVectorBatch& dblBatch = + dynamic_cast<const DoubleVectorBatch&>(rowBatch); + + const double* doubleData = dblBatch.data.data() + offset; + const char* notNull = dblBatch.hasNulls ? + dblBatch.notNull.data() + offset : nullptr; + + size_t bytes = isFloat ? 4 : 8; + char* data = buffer.data(); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!notNull || notNull [i] ) { + if (isFloat) { + // to avoid float-double cast + const int32_t* intBits = + reinterpret_cast<const int32_t*>(&static_cast<const float&>( + doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } + } else { + const int64_t* intBits = + reinterpret_cast<const int64_t*>(&(doubleData [i] )); + for (size_t j = 0; j < bytes; ++j) { + data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff); + } End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wgtmac commented on the issue:

          https://github.com/apache/orc/pull/149

          @jamesclampffer @majetideepak @omalley I've updated the PR according to the feedback. Please review it again when you have time, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wgtmac commented on the issue: https://github.com/apache/orc/pull/149 @jamesclampffer @majetideepak @omalley I've updated the PR according to the feedback. Please review it again when you have time, thanks!

            People

            • Assignee:
              wgtmac Gang Wu
              Reporter:
              wgtmac Gang Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:

                Development