Uploaded image for project: 'ORC'
  1. ORC
  2. ORC-52

Add support for mapreduce InputFormat and OutputFormat

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.1.0
    • Component/s: Java, MapReduce
    • Labels:
      None

      Description

      We have the mapred InputFormat and OutputFormat, but we need one for the newer API.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user omalley opened a pull request:

          https://github.com/apache/orc/pull/27

          ORC-52. Create ORC InputFormat and OutputFormat implementations for org.apache.hadoop.mapreduce API

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/omalley/orc orc-52

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/orc/pull/27.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #27


          commit f06304d518b396df93ca8b1695524a9736244ee8
          Author: Owen O'Malley <omalley@apache.org>
          Date: 2016-05-23T17:49:26Z

          ORC-56. Update the cmake build system to build, test, and package the java
          artifacts also.

          commit 1c47a593f9b4f0bbd74f1c835b1ea073c45245c4
          Author: Owen O'Malley <omalley@apache.org>
          Date: 2016-05-24T21:28:36Z

          ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and
          OutputFormat.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user omalley opened a pull request: https://github.com/apache/orc/pull/27 ORC-52 . Create ORC InputFormat and OutputFormat implementations for org.apache.hadoop.mapreduce API You can merge this pull request into a Git repository by running: $ git pull https://github.com/omalley/orc orc-52 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/orc/pull/27.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #27 commit f06304d518b396df93ca8b1695524a9736244ee8 Author: Owen O'Malley <omalley@apache.org> Date: 2016-05-23T17:49:26Z ORC-56 . Update the cmake build system to build, test, and package the java artifacts also. commit 1c47a593f9b4f0bbd74f1c835b1ea073c45245c4 Author: Owen O'Malley <omalley@apache.org> Date: 2016-05-24T21:28:36Z ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and OutputFormat.
          Hide
          ddt889966 yuan ding added a comment -

          A typo happens in OrcMapReduceRecordReader class name:
          /Users/bdcoe/orc/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapReduceRecordReader.java:[38,8] class OrcMapreduceRecordReader is public, should be declared in a file named OrcMapreduceRecordReader.java

          Show
          ddt889966 yuan ding added a comment - A typo happens in OrcMapReduceRecordReader class name: /Users/bdcoe/orc/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapReduceRecordReader.java: [38,8] class OrcMapreduceRecordReader is public, should be declared in a file named OrcMapreduceRecordReader.java
          Hide
          ddt889966 yuan ding added a comment -

          Would you please add a method like setAllFileds(List<Writable>) to the OrcStruct, thanks.

          It could be nice if we have orcSerde in this new package. It could be use for tell Hive how to explore the table.

          And I tried, hope I did not do it wrong : >, I want to set my mapper/reducer like:
          public static class ResultsMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> {}
          public static class TeamCounter extends Reducer<NullWritable, OrcStruct, NullWritable, OrcStruct> {
          Then it fail because “No such method exception Hadoop <init>”, I am not sure but feels like it needs the empty constructor in OrcStruct so that Hadoop reflection could works. So …what do you think about it?

          Sry but I have a little advice for the folder structure, like in hadoop lib, things under mapred package is the old code, under mapreduce package is the new 2.* code, I won’t mix them, but in this new orc package, I have to call something from orc.mapreduce like outpurformat, and call something from orc.mapred like OrcStruct.

          Show
          ddt889966 yuan ding added a comment - Would you please add a method like setAllFileds(List<Writable>) to the OrcStruct, thanks. It could be nice if we have orcSerde in this new package. It could be use for tell Hive how to explore the table. And I tried, hope I did not do it wrong : >, I want to set my mapper/reducer like: public static class ResultsMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> {} public static class TeamCounter extends Reducer<NullWritable, OrcStruct, NullWritable, OrcStruct> { Then it fail because “No such method exception Hadoop <init>”, I am not sure but feels like it needs the empty constructor in OrcStruct so that Hadoop reflection could works. So …what do you think about it? Sry but I have a little advice for the folder structure, like in hadoop lib, things under mapred package is the old code, under mapreduce package is the new 2.* code, I won’t mix them, but in this new orc package, I have to call something from orc.mapreduce like outpurformat, and call something from orc.mapred like OrcStruct.
          Hide
          owen.omalley Owen O'Malley added a comment -

          Ok, I updated the pull request with:

          • added OrcStruct.setAllFields(Writable...)
          • moved the record reader & writer for the mapreduce api to the mapreduce package
          • fixed the capitalization of the OrcMapreduceRecordReader filename.

          The exception is because you are sending the OrcStruct through the shuffle, which I overlooked. In order to do that we need to provide the actual type information to the key and value in the configuration. The hard part is that the MapReduce framework doesn't tell the object whether it is the key or value. I'd propose that we add:

          • OrcKey and OrcValue wrapper types that contain an OrcStruct
          • They use the value of "orc.mapred.key.type" and "orc.mapred.value.type" from the configuration to initialize their OrcStruct.
          • The ORC writer can silently ignore the OrcKey and OrcValue.

          Thoughts?

          Show
          owen.omalley Owen O'Malley added a comment - Ok, I updated the pull request with: added OrcStruct.setAllFields(Writable...) moved the record reader & writer for the mapreduce api to the mapreduce package fixed the capitalization of the OrcMapreduceRecordReader filename. The exception is because you are sending the OrcStruct through the shuffle, which I overlooked. In order to do that we need to provide the actual type information to the key and value in the configuration. The hard part is that the MapReduce framework doesn't tell the object whether it is the key or value. I'd propose that we add: OrcKey and OrcValue wrapper types that contain an OrcStruct They use the value of "orc.mapred.key.type" and "orc.mapred.value.type" from the configuration to initialize their OrcStruct. The ORC writer can silently ignore the OrcKey and OrcValue. Thoughts?
          Hide
          owen.omalley Owen O'Malley added a comment -

          Ok, I updated the pull request with OrcKey and OrcValue including a junit test using mrunit to test that I could send them through through as least mrunit's fake shuffle.

          I also depend on ORC-53 and make OrcKey WritableComparable.

          Show
          owen.omalley Owen O'Malley added a comment - Ok, I updated the pull request with OrcKey and OrcValue including a junit test using mrunit to test that I could send them through through as least mrunit's fake shuffle. I also depend on ORC-53 and make OrcKey WritableComparable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wagnermarkd commented on a diff in the pull request:

          https://github.com/apache/orc/pull/27#discussion_r65271707

          — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcInputFormat.java —
          @@ -0,0 +1,95 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.orc.mapreduce;
          +
          +import com.esotericsoftware.kryo.Kryo;
          +import com.esotericsoftware.kryo.io.Input;
          +import org.apache.commons.codec.binary.Base64;
          +import org.apache.hadoop.conf.Configuration;
          +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
          +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
          +import org.apache.hadoop.io.Writable;
          +import org.apache.hadoop.mapreduce.InputSplit;
          +import org.apache.hadoop.mapreduce.RecordReader;
          +import org.apache.hadoop.mapreduce.TaskAttemptContext;
          +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
          +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
          +
          +
          +import java.io.IOException;
          +
          +import org.apache.hadoop.io.NullWritable;
          +import org.apache.orc.OrcConf;
          +import org.apache.orc.OrcFile;
          +import org.apache.orc.Reader;
          +import org.apache.orc.TypeDescription;
          +
          +/**
          + * An ORC input format that satisfies the org.apache.hadoop.mapreduce API.
          + */
          +public class OrcInputFormat<V extends Writable>
          + extends FileInputFormat<NullWritable, V> {
          +
          + /**
          + * Put the given SearchArgument into the configuration for an OrcInputFormat.
          + * @param conf the configuration to modify
          + * @param sarg the SearchArgument to put in the configuration
          + * @param columnNames the list of column names for the SearchArgument
          + */
          + public static void setSearchArgument(Configuration conf,
          + SearchArgument sarg,
          + String[] columnNames)

          { + org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg, + columnNames); + }

          +
          + @Override
          + public RecordReader<NullWritable, V>
          + createRecordReader(InputSplit inputSplit,
          + TaskAttemptContext taskAttemptContext
          + ) throws IOException, InterruptedException {
          + FileSplit split = (FileSplit) inputSplit;
          + Configuration conf = taskAttemptContext.getConfiguration();
          + Reader file = OrcFile.createReader(split.getPath(),
          + OrcFile.readerOptions(conf)
          + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
          + TypeDescription schema =
          + TypeDescription.fromString(OrcConf.SCHEMA.getString(conf));
          + Reader.Options options = new Reader.Options()
          + .range(split.getStart(), split.getLength())
          + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
          + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
          — End diff –

          These two lines seem to duplicate logic in RecordReaderImpl: https://github.com/apache/orc/blob/7a4fe255e84bea097ca478d2008d257e07f8808a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java#L184. As far as I can see that's the only reason conf is passed at all. It could be cleaner if Reader.Options's constructor pulled the requisite values from the conf and only stored/exposed those.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wagnermarkd commented on a diff in the pull request: https://github.com/apache/orc/pull/27#discussion_r65271707 — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcInputFormat.java — @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; + +/** + * An ORC input format that satisfies the org.apache.hadoop.mapreduce API. + */ +public class OrcInputFormat<V extends Writable> + extends FileInputFormat<NullWritable, V> { + + /** + * Put the given SearchArgument into the configuration for an OrcInputFormat. + * @param conf the configuration to modify + * @param sarg the SearchArgument to put in the configuration + * @param columnNames the list of column names for the SearchArgument + */ + public static void setSearchArgument(Configuration conf, + SearchArgument sarg, + String[] columnNames) { + org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg, + columnNames); + } + + @Override + public RecordReader<NullWritable, V> + createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext + ) throws IOException, InterruptedException { + FileSplit split = (FileSplit) inputSplit; + Configuration conf = taskAttemptContext.getConfiguration(); + Reader file = OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + TypeDescription schema = + TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)); + Reader.Options options = new Reader.Options() + .range(split.getStart(), split.getLength()) + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); — End diff – These two lines seem to duplicate logic in RecordReaderImpl: https://github.com/apache/orc/blob/7a4fe255e84bea097ca478d2008d257e07f8808a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java#L184 . As far as I can see that's the only reason conf is passed at all. It could be cleaner if Reader.Options's constructor pulled the requisite values from the conf and only stored/exposed those.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wagnermarkd commented on a diff in the pull request:

          https://github.com/apache/orc/pull/27#discussion_r65273378

          — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcOutputFormat.java —
          @@ -0,0 +1,83 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.orc.mapreduce;
          +
          +import org.apache.hadoop.conf.Configuration;
          +import org.apache.hadoop.fs.Path;
          +import org.apache.hadoop.io.NullWritable;
          +import org.apache.hadoop.io.Writable;
          +import org.apache.hadoop.mapreduce.OutputCommitter;
          +import org.apache.hadoop.mapreduce.RecordWriter;
          +import org.apache.hadoop.mapreduce.TaskAttemptContext;
          +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
          +import org.apache.hadoop.util.ReflectionUtils;
          +import org.apache.orc.CompressionKind;
          +import org.apache.orc.OrcConf;
          +import org.apache.orc.OrcFile;
          +import org.apache.orc.TypeDescription;
          +import org.apache.orc.Writer;
          +
          +import java.io.IOException;
          +
          +/**
          + * An ORC output format that satisfies the org.apache.hadoop.mapreduce API.
          + */
          +public class OrcOutputFormat<V extends Writable>
          + extends FileOutputFormat<NullWritable, V> {
          + private static final String EXTENSION = ".orc";
          + // This is useful for unit tests or local runs where you don't need the
          + // output committer.
          + public static final String SKIP_TEMP_DIRECTORY =
          + "orc.mapreduce.output.skip-temporary-directory";
          +
          + @Override
          + public RecordWriter<NullWritable, V>
          + getRecordWriter(TaskAttemptContext taskAttemptContext
          + ) throws IOException {
          + Configuration conf = taskAttemptContext.getConfiguration();
          + Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION);
          + Writer writer = OrcFile.createWriter(filename,
          + OrcFile.writerOptions(conf)
          — End diff –

          Same question as reader options: Can we pull all this boiler plate into the writerOptions method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wagnermarkd commented on a diff in the pull request: https://github.com/apache/orc/pull/27#discussion_r65273378 — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcOutputFormat.java — @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; + +/** + * An ORC output format that satisfies the org.apache.hadoop.mapreduce API. + */ +public class OrcOutputFormat<V extends Writable> + extends FileOutputFormat<NullWritable, V> { + private static final String EXTENSION = ".orc"; + // This is useful for unit tests or local runs where you don't need the + // output committer. + public static final String SKIP_TEMP_DIRECTORY = + "orc.mapreduce.output.skip-temporary-directory"; + + @Override + public RecordWriter<NullWritable, V> + getRecordWriter(TaskAttemptContext taskAttemptContext + ) throws IOException { + Configuration conf = taskAttemptContext.getConfiguration(); + Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION); + Writer writer = OrcFile.createWriter(filename, + OrcFile.writerOptions(conf) — End diff – Same question as reader options: Can we pull all this boiler plate into the writerOptions method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wagnermarkd commented on a diff in the pull request:

          https://github.com/apache/orc/pull/27#discussion_r65275023

          — Diff: java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java —
          @@ -0,0 +1,202 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.orc.mapreduce;
          +
          +import org.apache.hadoop.io.IntWritable;
          +import org.apache.hadoop.io.NullWritable;
          +import org.apache.hadoop.io.Text;
          +import org.apache.hadoop.io.serializer.Deserializer;
          +import org.apache.hadoop.io.serializer.Serialization;
          +import org.apache.hadoop.io.serializer.Serializer;
          +import org.apache.hadoop.io.serializer.WritableSerialization;
          +import org.apache.hadoop.mapred.JobConf;
          +import org.apache.hadoop.mapreduce.Mapper;
          +import org.apache.hadoop.mapreduce.Reducer;
          +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
          +import org.apache.orc.TypeDescription;
          +import org.apache.orc.mapred.OrcKey;
          +import org.apache.orc.mapred.OrcStruct;
          +import org.apache.orc.mapred.OrcValue;
          +import org.junit.Test;
          +
          +import java.io.DataInputStream;
          +import java.io.DataOutputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.Iterator;
          +
          +public class TestMrUnit {
          + JobConf conf = new JobConf();
          +
          + /**
          + * Split the input struct into its two parts.
          + */
          + public static class MyMapper
          + extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
          + private OrcKey keyWrapper = new OrcKey();
          + private OrcValue valueWrapper = new OrcValue();
          +
          + @Override
          + protected void map(NullWritable key,
          + OrcStruct value,
          + Context context
          + ) throws IOException, InterruptedException

          { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + context.write(keyWrapper, valueWrapper); + }

          + }
          +
          + /**
          + * Glue the key and values back together.
          + */
          + public static class MyReducer
          + extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
          + private OrcStruct output = new OrcStruct(TypeDescription.fromString
          + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
          + private final NullWritable nada = NullWritable.get();
          +
          + @Override
          + protected void reduce(OrcKey key,
          + Iterable<OrcValue> values,
          + Context context
          + ) throws IOException, InterruptedException {
          + output.setFieldValue(0, key.key);
          + for(OrcValue value: values)

          { + output.setFieldValue(1, value.value); + context.write(nada, output); + }

          + }
          + }
          +
          + /**
          + * This class is intended to support MRUnit's object copying for input and
          + * output objects.
          + *
          + * Real mapreduce contexts should NEVER use this class.
          — End diff –

          Isn't there the same challenge for true MR contexts? The reducer needs to be aware of the schema for the values which it is serializing. Orc

          Show
          githubbot ASF GitHub Bot added a comment - Github user wagnermarkd commented on a diff in the pull request: https://github.com/apache/orc/pull/27#discussion_r65275023 — Diff: java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java — @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcKey; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class TestMrUnit { + JobConf conf = new JobConf(); + + /** + * Split the input struct into its two parts. + */ + public static class MyMapper + extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> { + private OrcKey keyWrapper = new OrcKey(); + private OrcValue valueWrapper = new OrcValue(); + + @Override + protected void map(NullWritable key, + OrcStruct value, + Context context + ) throws IOException, InterruptedException { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + context.write(keyWrapper, valueWrapper); + } + } + + /** + * Glue the key and values back together. + */ + public static class MyReducer + extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> { + private OrcStruct output = new OrcStruct(TypeDescription.fromString + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>")); + private final NullWritable nada = NullWritable.get(); + + @Override + protected void reduce(OrcKey key, + Iterable<OrcValue> values, + Context context + ) throws IOException, InterruptedException { + output.setFieldValue(0, key.key); + for(OrcValue value: values) { + output.setFieldValue(1, value.value); + context.write(nada, output); + } + } + } + + /** + * This class is intended to support MRUnit's object copying for input and + * output objects. + * + * Real mapreduce contexts should NEVER use this class. — End diff – Isn't there the same challenge for true MR contexts? The reducer needs to be aware of the schema for the values which it is serializing. Orc
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user omalley commented on a diff in the pull request:

          https://github.com/apache/orc/pull/27#discussion_r65282144

          — Diff: java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java —
          @@ -0,0 +1,202 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.orc.mapreduce;
          +
          +import org.apache.hadoop.io.IntWritable;
          +import org.apache.hadoop.io.NullWritable;
          +import org.apache.hadoop.io.Text;
          +import org.apache.hadoop.io.serializer.Deserializer;
          +import org.apache.hadoop.io.serializer.Serialization;
          +import org.apache.hadoop.io.serializer.Serializer;
          +import org.apache.hadoop.io.serializer.WritableSerialization;
          +import org.apache.hadoop.mapred.JobConf;
          +import org.apache.hadoop.mapreduce.Mapper;
          +import org.apache.hadoop.mapreduce.Reducer;
          +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
          +import org.apache.orc.TypeDescription;
          +import org.apache.orc.mapred.OrcKey;
          +import org.apache.orc.mapred.OrcStruct;
          +import org.apache.orc.mapred.OrcValue;
          +import org.junit.Test;
          +
          +import java.io.DataInputStream;
          +import java.io.DataOutputStream;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.util.Iterator;
          +
          +public class TestMrUnit {
          + JobConf conf = new JobConf();
          +
          + /**
          + * Split the input struct into its two parts.
          + */
          + public static class MyMapper
          + extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
          + private OrcKey keyWrapper = new OrcKey();
          + private OrcValue valueWrapper = new OrcValue();
          +
          + @Override
          + protected void map(NullWritable key,
          + OrcStruct value,
          + Context context
          + ) throws IOException, InterruptedException

          { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + context.write(keyWrapper, valueWrapper); + }

          + }
          +
          + /**
          + * Glue the key and values back together.
          + */
          + public static class MyReducer
          + extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
          + private OrcStruct output = new OrcStruct(TypeDescription.fromString
          + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
          + private final NullWritable nada = NullWritable.get();
          +
          + @Override
          + protected void reduce(OrcKey key,
          + Iterable<OrcValue> values,
          + Context context
          + ) throws IOException, InterruptedException {
          + output.setFieldValue(0, key.key);
          + for(OrcValue value: values)

          { + output.setFieldValue(1, value.value); + context.write(nada, output); + }

          + }
          + }
          +
          + /**
          + * This class is intended to support MRUnit's object copying for input and
          + * output objects.
          + *
          + * Real mapreduce contexts should NEVER use this class.
          — End diff –

          My goal with OrcKey and OrcValue was to encode the respective types once in the JobConf rather than encode it per a value. They let you control the key and value types of the shuffle with knobs that don't conflict with either the input or output.

          So if you are shuffling with two OrcStructs, you would define your JobConf like (filling in the "..." with the appropriate fields):

          orc.mapred.key.type=struct<...>
          orc.mapred.value.type=struct<vals:array<..>
          mapreduce.map.output.key.class=OrcKey
          mapreduce.map.output.value.class=OrcValue

          I guess that I should actually improve that to:

          orc.mapred.input.type=struct<...>
          orc.mapred.output.type=struct<...>
          orc.mapred.map.output.key.type=struct<...>
          orc.mapred.map.output.value.type=struct<...>
          mapreduce.map.output.key.class=OrcKey
          mapreduce.map.output.value.class=OrcValue

          The orc.mapred.input.type setting is only necessary if your application wants to use the schema evolution to convert to a specific type. The orc.mapred.output.type would control the schema of the output format.

          Does that make sense?

          Show
          githubbot ASF GitHub Bot added a comment - Github user omalley commented on a diff in the pull request: https://github.com/apache/orc/pull/27#discussion_r65282144 — Diff: java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java — @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcKey; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class TestMrUnit { + JobConf conf = new JobConf(); + + /** + * Split the input struct into its two parts. + */ + public static class MyMapper + extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> { + private OrcKey keyWrapper = new OrcKey(); + private OrcValue valueWrapper = new OrcValue(); + + @Override + protected void map(NullWritable key, + OrcStruct value, + Context context + ) throws IOException, InterruptedException { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + context.write(keyWrapper, valueWrapper); + } + } + + /** + * Glue the key and values back together. + */ + public static class MyReducer + extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> { + private OrcStruct output = new OrcStruct(TypeDescription.fromString + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>")); + private final NullWritable nada = NullWritable.get(); + + @Override + protected void reduce(OrcKey key, + Iterable<OrcValue> values, + Context context + ) throws IOException, InterruptedException { + output.setFieldValue(0, key.key); + for(OrcValue value: values) { + output.setFieldValue(1, value.value); + context.write(nada, output); + } + } + } + + /** + * This class is intended to support MRUnit's object copying for input and + * output objects. + * + * Real mapreduce contexts should NEVER use this class. — End diff – My goal with OrcKey and OrcValue was to encode the respective types once in the JobConf rather than encode it per a value. They let you control the key and value types of the shuffle with knobs that don't conflict with either the input or output. So if you are shuffling with two OrcStructs, you would define your JobConf like (filling in the "..." with the appropriate fields): orc.mapred.key.type=struct<...> orc.mapred.value.type=struct<vals:array<..> mapreduce.map.output.key.class=OrcKey mapreduce.map.output.value.class=OrcValue I guess that I should actually improve that to: orc.mapred.input.type=struct<...> orc.mapred.output.type=struct<...> orc.mapred.map.output.key.type=struct<...> orc.mapred.map.output.value.type=struct<...> mapreduce.map.output.key.class=OrcKey mapreduce.map.output.value.class=OrcValue The orc.mapred.input.type setting is only necessary if your application wants to use the schema evolution to convert to a specific type. The orc.mapred.output.type would control the schema of the output format. Does that make sense?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user omalley commented on a diff in the pull request:

          https://github.com/apache/orc/pull/27#discussion_r65283394

          — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcInputFormat.java —
          @@ -0,0 +1,95 @@
          +/**
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.orc.mapreduce;
          +
          +import com.esotericsoftware.kryo.Kryo;
          +import com.esotericsoftware.kryo.io.Input;
          +import org.apache.commons.codec.binary.Base64;
          +import org.apache.hadoop.conf.Configuration;
          +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
          +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
          +import org.apache.hadoop.io.Writable;
          +import org.apache.hadoop.mapreduce.InputSplit;
          +import org.apache.hadoop.mapreduce.RecordReader;
          +import org.apache.hadoop.mapreduce.TaskAttemptContext;
          +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
          +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
          +
          +
          +import java.io.IOException;
          +
          +import org.apache.hadoop.io.NullWritable;
          +import org.apache.orc.OrcConf;
          +import org.apache.orc.OrcFile;
          +import org.apache.orc.Reader;
          +import org.apache.orc.TypeDescription;
          +
          +/**
          + * An ORC input format that satisfies the org.apache.hadoop.mapreduce API.
          + */
          +public class OrcInputFormat<V extends Writable>
          + extends FileInputFormat<NullWritable, V> {
          +
          + /**
          + * Put the given SearchArgument into the configuration for an OrcInputFormat.
          + * @param conf the configuration to modify
          + * @param sarg the SearchArgument to put in the configuration
          + * @param columnNames the list of column names for the SearchArgument
          + */
          + public static void setSearchArgument(Configuration conf,
          + SearchArgument sarg,
          + String[] columnNames)

          { + org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg, + columnNames); + }

          +
          + @Override
          + public RecordReader<NullWritable, V>
          + createRecordReader(InputSplit inputSplit,
          + TaskAttemptContext taskAttemptContext
          + ) throws IOException, InterruptedException {
          + FileSplit split = (FileSplit) inputSplit;
          + Configuration conf = taskAttemptContext.getConfiguration();
          + Reader file = OrcFile.createReader(split.getPath(),
          + OrcFile.readerOptions(conf)
          + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
          + TypeDescription schema =
          + TypeDescription.fromString(OrcConf.SCHEMA.getString(conf));
          + Reader.Options options = new Reader.Options()
          + .range(split.getStart(), split.getLength())
          + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
          + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
          — End diff –

          Yeah, let me figure out how to merge the common code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user omalley commented on a diff in the pull request: https://github.com/apache/orc/pull/27#discussion_r65283394 — Diff: java/mapreduce/src/java/org.apache.orc.mapreduce/OrcInputFormat.java — @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; + +/** + * An ORC input format that satisfies the org.apache.hadoop.mapreduce API. + */ +public class OrcInputFormat<V extends Writable> + extends FileInputFormat<NullWritable, V> { + + /** + * Put the given SearchArgument into the configuration for an OrcInputFormat. + * @param conf the configuration to modify + * @param sarg the SearchArgument to put in the configuration + * @param columnNames the list of column names for the SearchArgument + */ + public static void setSearchArgument(Configuration conf, + SearchArgument sarg, + String[] columnNames) { + org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg, + columnNames); + } + + @Override + public RecordReader<NullWritable, V> + createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext + ) throws IOException, InterruptedException { + FileSplit split = (FileSplit) inputSplit; + Configuration conf = taskAttemptContext.getConfiguration(); + Reader file = OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + TypeDescription schema = + TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)); + Reader.Options options = new Reader.Options() + .range(split.getStart(), split.getLength()) + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); — End diff – Yeah, let me figure out how to merge the common code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wagnermarkd commented on the issue:

          https://github.com/apache/orc/pull/27

          LGTM. The new names are helpful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wagnermarkd commented on the issue: https://github.com/apache/orc/pull/27 LGTM. The new names are helpful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/orc/pull/27

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/orc/pull/27
          Hide
          owen.omalley Owen O'Malley added a comment -

          I just committed this. Thanks for the reviews, Mark!

          Show
          owen.omalley Owen O'Malley added a comment - I just committed this. Thanks for the reviews, Mark!
          Hide
          leftylev Lefty Leverenz added a comment -

          Doc note: This creates four configuration parameters in OrcConf.java and removes one parameter, so they should be documented. Some general documentation would also be good.

          • orc.mapred.input.schema
          • orc.mapred.map.output.key.schema
          • orc.mapred.map.output.value.schema
          • orc.mapred.output.schema
          • (removed: orc.schema)

          How do we want to track doc issues – with TODOC<release#> labels like the Hive project?

          Show
          leftylev Lefty Leverenz added a comment - Doc note: This creates four configuration parameters in OrcConf.java and removes one parameter, so they should be documented. Some general documentation would also be good. orc.mapred.input.schema orc.mapred.map.output.key.schema orc.mapred.map.output.value.schema orc.mapred.output.schema ( removed: orc.schema) How do we want to track doc issues – with TODOC<release#> labels like the Hive project?

            People

            • Assignee:
              owen.omalley Owen O'Malley
              Reporter:
              owen.omalley Owen O'Malley
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development