Uploaded image for project: 'Tajo'
  1. Tajo
  2. TAJO-933

Fork some classes of Parquet as builtin third-party classes

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: Storage
    • Labels:

      Description

      Parquet has strict modifier and encapsulation design. This is well designed, but it does not allow us to add desired features to Parquet. For example, it is hard to get the written file size and how many memory buffer is filled.

      I propose forking some classes of Parquet file format as embed third-party classes. After then, we can revise them tailored for Tajo.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user hyunsik opened a pull request:

          https://github.com/apache/tajo/pull/75

          TAJO-933: Fork some classes of Parquet as builtin third-party classes.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/hyunsik/tajo TAJO-933

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/tajo/pull/75.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #75


          commit b99e9f15ab36146c12ec5bd98bc546ec124aeaf0
          Author: Hyunsik Choi <hyunsik@apache.org>
          Date: 2014-07-15T03:27:56Z

          TAJO-933: Fork some classes of Parquet as builtin third-party classes.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user hyunsik opened a pull request: https://github.com/apache/tajo/pull/75 TAJO-933 : Fork some classes of Parquet as builtin third-party classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hyunsik/tajo TAJO-933 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/tajo/pull/75.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #75 commit b99e9f15ab36146c12ec5bd98bc546ec124aeaf0 Author: Hyunsik Choi <hyunsik@apache.org> Date: 2014-07-15T03:27:56Z TAJO-933 : Fork some classes of Parquet as builtin third-party classes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/75#discussion_r14917894

          — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java —
          @@ -0,0 +1,220 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.tajo.storage.thirdparty.parquet;
          +
          +import org.apache.hadoop.conf.Configuration;
          +import org.apache.hadoop.fs.Path;
          +import parquet.column.ParquetProperties;
          +import parquet.hadoop.api.WriteSupport;
          +import parquet.hadoop.metadata.CompressionCodecName;
          +import parquet.schema.MessageType;
          +
          +import java.io.Closeable;
          +import java.io.IOException;
          +
          +public class ParquetWriter<T> implements Closeable {
          +
          + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
          + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
          + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
          + CompressionCodecName.UNCOMPRESSED;
          + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
          + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
          + public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
          + ParquetProperties.WriterVersion.PARQUET_1_0;
          +
          + private final InternalParquetRecordWriter<T> writer;
          +
          + /**
          + * Create a new ParquetWriter.
          + * (with dictionary encoding enabled and validation off)
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @throws java.io.IOException
          + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
          + */
          + public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold (both data and dictionary)
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + boolean enableDictionary,
          + boolean validating) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, + DEFAULT_WRITER_VERSION); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * Directly instantiates a Hadoop

          {@link org.apache.hadoop.conf.Configuration}

          which reads
          + * configuration from the classpath.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @param writerVersion version of parquetWriter from

          {@link ParquetProperties.WriterVersion}
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating,
          + ParquetProperties.WriterVersion writerVersion) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); + }
          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}

          + * @param conf Hadoop configuration to use while accessing the filesystem
          + * @throws IOException
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating,
          + ParquetProperties.WriterVersion writerVersion,
          + Configuration conf) throws IOException

          { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter<T>( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + }

          +
          + /**
          + * Create a new ParquetWriter. The default block size is 50 MB.The default
          — End diff –

          Is it a typo? You already set DEFAULT_BLOCK_SIZE to 128MB.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on a diff in the pull request: https://github.com/apache/tajo/pull/75#discussion_r14917894 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java — @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.column.ParquetProperties; +import parquet.hadoop.api.WriteSupport; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.Closeable; +import java.io.IOException; + +public class ParquetWriter<T> implements Closeable { + + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = + CompressionCodecName.UNCOMPRESSED; + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; + public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = + ParquetProperties.WriterVersion.PARQUET_1_0; + + private final InternalParquetRecordWriter<T> writer; + + /** + * Create a new ParquetWriter. + * (with dictionary encoding enabled and validation off) + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean) + */ + public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold (both data and dictionary) + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, + DEFAULT_WRITER_VERSION); + } + + /** + * Create a new ParquetWriter. + * + * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads + * configuration from the classpath. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} + * @param conf Hadoop configuration to use while accessing the filesystem + * @throws IOException + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion, + Configuration conf) throws IOException { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter<T>( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + } + + /** + * Create a new ParquetWriter. The default block size is 50 MB.The default — End diff – Is it a typo? You already set DEFAULT_BLOCK_SIZE to 128MB.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-48992337

          Why don't you implement unit test cases for this issue?

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-48992337 Why don't you implement unit test cases for this issue?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on a diff in the pull request:

          https://github.com/apache/tajo/pull/75#discussion_r14918013

          — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java —
          @@ -0,0 +1,220 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.tajo.storage.thirdparty.parquet;
          +
          +import org.apache.hadoop.conf.Configuration;
          +import org.apache.hadoop.fs.Path;
          +import parquet.column.ParquetProperties;
          +import parquet.hadoop.api.WriteSupport;
          +import parquet.hadoop.metadata.CompressionCodecName;
          +import parquet.schema.MessageType;
          +
          +import java.io.Closeable;
          +import java.io.IOException;
          +
          +public class ParquetWriter<T> implements Closeable {
          +
          + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
          + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
          + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
          + CompressionCodecName.UNCOMPRESSED;
          + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
          + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
          + public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
          + ParquetProperties.WriterVersion.PARQUET_1_0;
          +
          + private final InternalParquetRecordWriter<T> writer;
          +
          + /**
          + * Create a new ParquetWriter.
          + * (with dictionary encoding enabled and validation off)
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @throws java.io.IOException
          + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean)
          + */
          + public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold (both data and dictionary)
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + boolean enableDictionary,
          + boolean validating) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating) throws IOException

          { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, + DEFAULT_WRITER_VERSION); + }

          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * Directly instantiates a Hadoop

          {@link org.apache.hadoop.conf.Configuration}

          which reads
          + * configuration from the classpath.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @param writerVersion version of parquetWriter from

          {@link ParquetProperties.WriterVersion}
          + * @throws IOException
          + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating,
          + ParquetProperties.WriterVersion writerVersion) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); + }
          +
          + /**
          + * Create a new ParquetWriter.
          + *
          + * @param file the file to create
          + * @param writeSupport the implementation to write a record to a RecordConsumer
          + * @param compressionCodecName the compression codec to use
          + * @param blockSize the block size threshold
          + * @param pageSize the page size threshold
          + * @param dictionaryPageSize the page size threshold for the dictionary pages
          + * @param enableDictionary to turn dictionary encoding on
          + * @param validating to turn on validation using the schema
          + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}

          + * @param conf Hadoop configuration to use while accessing the filesystem
          + * @throws IOException
          + */
          + public ParquetWriter(
          + Path file,
          + WriteSupport<T> writeSupport,
          + CompressionCodecName compressionCodecName,
          + int blockSize,
          + int pageSize,
          + int dictionaryPageSize,
          + boolean enableDictionary,
          + boolean validating,
          + ParquetProperties.WriterVersion writerVersion,
          + Configuration conf) throws IOException

          { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter<T>( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + }

          +
          + /**
          + * Create a new ParquetWriter. The default block size is 50 MB.The default
          — End diff –

          This is the thirdparty code. I think that we don't need to take care them unless there are some critical problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on a diff in the pull request: https://github.com/apache/tajo/pull/75#discussion_r14918013 — Diff: tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java — @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.column.ParquetProperties; +import parquet.hadoop.api.WriteSupport; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.Closeable; +import java.io.IOException; + +public class ParquetWriter<T> implements Closeable { + + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = + CompressionCodecName.UNCOMPRESSED; + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; + public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = + ParquetProperties.WriterVersion.PARQUET_1_0; + + private final InternalParquetRecordWriter<T> writer; + + /** + * Create a new ParquetWriter. + * (with dictionary encoding enabled and validation off) + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean) + */ + public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold (both data and dictionary) + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, + DEFAULT_WRITER_VERSION); + } + + /** + * Create a new ParquetWriter. + * + * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads + * configuration from the classpath. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} + * @throws IOException + * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} + * @param conf Hadoop configuration to use while accessing the filesystem + * @throws IOException + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion, + Configuration conf) throws IOException { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter<T>( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + } + + /** + * Create a new ParquetWriter. The default block size is 50 MB.The default — End diff – This is the thirdparty code. I think that we don't need to take care them unless there are some critical problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-48992605

          In my opinion, we don't need additional unit tests because this patch just forks some of parquet classes which were included in parquet library.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-48992605 In my opinion, we don't need additional unit tests because this patch just forks some of parquet classes which were included in parquet library.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user blrunner commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-48992795

          Okay, I also agree with you.
          Thanks your contribution.

          Push it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user blrunner commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-48992795 Okay, I also agree with you. Thanks your contribution. Push it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-48993519

          Thank you for your quick review. I'll commit it shortly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-48993519 Thank you for your quick review. I'll commit it shortly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/tajo/pull/75

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/tajo/pull/75
          Hide
          hyunsik Hyunsik Choi added a comment -

          committed it to master branch.

          Show
          hyunsik Hyunsik Choi added a comment - committed it to master branch.
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Tajo-master-build #299 (See https://builds.apache.org/job/Tajo-master-build/299/)
          TAJO-933: Fork some classes of Parquet as builtin third-party classes. (hyunsik: rev 9cf107191bac3592d55c59ae6b60ab6051d5656d)

          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
          • CHANGES
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
          • tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Tajo-master-build #299 (See https://builds.apache.org/job/Tajo-master-build/299/ ) TAJO-933 : Fork some classes of Parquet as builtin third-party classes. (hyunsik: rev 9cf107191bac3592d55c59ae6b60ab6051d5656d) tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java CHANGES tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user davidzchen commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-49010360

          Just wondering, why are we forking the classes rather than contribute the changes to add the interfaces we need to Parquet, especially since Parquet is in the Apache Incubator now?

          Show
          githubbot ASF GitHub Bot added a comment - Github user davidzchen commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-49010360 Just wondering, why are we forking the classes rather than contribute the changes to add the interfaces we need to Parquet, especially since Parquet is in the Apache Incubator now?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hyunsik commented on the pull request:

          https://github.com/apache/tajo/pull/75#issuecomment-49011928

          Hi @davidzchen,

          We just need current reading offset in order to get the task progress. Also, we need writing offset and the current memory buffer size in order to estimate the output file size to be written. In addition to them, there is no other purpose. If you know a better approach for them, feel free to suggest us.

          In addition, we will ask these features to Parquet community. BTW, I think that we need this feature right now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hyunsik commented on the pull request: https://github.com/apache/tajo/pull/75#issuecomment-49011928 Hi @davidzchen, We just need current reading offset in order to get the task progress. Also, we need writing offset and the current memory buffer size in order to estimate the output file size to be written. In addition to them, there is no other purpose. If you know a better approach for them, feel free to suggest us. In addition, we will ask these features to Parquet community. BTW, I think that we need this feature right now.

            People

            • Assignee:
              hyunsik Hyunsik Choi
              Reporter:
              hyunsik Hyunsik Choi
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development