diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java index fa3e30f..8654cd6 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -19,6 +19,7 @@ package org.apache.hcatalog.mapreduce; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,6 +48,7 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.thrift.TException; +import org.apache.hcatalog.mapreduce.FileRecordWriterContainer; import java.io.IOException; import java.util.ArrayList; @@ -95,20 +97,25 @@ public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, HCatRecord> rw = - new FileRecordWriterContainer( - HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed() ? - null : - getBaseOutputFormat() - .getRecordWriter(null, - new JobConf(context.getConfiguration()), - FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part"), - InternalUtil.createReporter(context)), - context); - return rw; + RecordWriter, HCatRecord> rw; + if (HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()){ + // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. + // (That's because records can't be written until the values of the dynamic partitions are deduced. + // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) + rw = new FileRecordWriterContainer((org.apache.hadoop.mapred.RecordWriter)null,context); + } else { + Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); + Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part")); + + rw = new FileRecordWriterContainer( + getBaseOutputFormat().getRecordWriter( + parentDir.getFileSystem(context.getConfiguration()), + new JobConf(context.getConfiguration()), + childPath.toString(), + InternalUtil.createReporter(context)), + context); + } + return rw; } @Override diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java index 60a5cb0..ff2f3bd 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -218,10 +218,14 @@ public void write(WritableComparable key, HCatRecord value) throws IOExceptio //setupTask() baseOutputCommitter.setupTask(currTaskContext); + Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = new Path(parentDir,FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + org.apache.hadoop.mapred.RecordWriter baseRecordWriter = - baseOF.getRecordWriter(null, + baseOF.getRecordWriter( + parentDir.getFileSystem(currTaskContext.getConfiguration()), currTaskContext.getJobConf(), - FileOutputFormat.getUniqueFile(currTaskContext, "part", ""), + childPath.toString(), InternalUtil.createReporter(currTaskContext)); baseDynamicWriters.put(dynKey, baseRecordWriter); diff --git a/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java new file mode 100644 index 0000000..f5f989c --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hcatalog/fileformats/TestOrcDynamicPartitioned.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.fileformats; + +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hcatalog.mapreduce.TestHCatDynamicPartitioned; +import org.junit.BeforeClass; + +public class TestOrcDynamicPartitioned extends TestHCatDynamicPartitioned { + + @BeforeClass + public static void generateInputData() throws Exception { + tableName = "testOrcDynamicPartitionedTable"; + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateDataColumns(); + } + + + @Override + protected String inputFormat() { + return OrcInputFormat.class.getName(); + } + + @Override + protected String outputFormat() { + return OrcOutputFormat.class.getName(); + } + + @Override + protected String serdeClass() { + return OrcSerde.class.getName(); + } + + +} diff --git a/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java index 6e6ae44..b0b5018 100644 --- a/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java @@ -38,10 +38,13 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -75,10 +78,6 @@ protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; protected static String tableName = "testHCatMapReduceTable"; - protected String inputFormat = RCFileInputFormat.class.getName(); - protected String outputFormat = RCFileOutputFormat.class.getName(); - protected String serdeClass = ColumnarSerDe.class.getName(); - private static List writeRecords = new ArrayList(); private static List readRecords = new ArrayList(); @@ -88,6 +87,18 @@ private static FileSystem fs; + protected String inputFormat() { + return RCFileInputFormat.class.getName(); + } + + protected String outputFormat() { + return RCFileOutputFormat.class.getName(); + } + + protected String serdeClass() { + return ColumnarSerDe.class.getName(); + } + @BeforeClass public static void setUpOneTime() throws Exception { fs = new LocalFileSystem(); @@ -142,9 +153,9 @@ public void createTable() throws Exception { sd.getSerdeInfo().setName(tbl.getTableName()); sd.getSerdeInfo().setParameters(new HashMap()); sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); - sd.getSerdeInfo().setSerializationLib(serdeClass); - sd.setInputFormat(inputFormat); - sd.setOutputFormat(outputFormat); + sd.getSerdeInfo().setSerializationLib(serdeClass()); + sd.setInputFormat(inputFormat()); + sd.setOutputFormat(outputFormat()); Map tableParams = new HashMap(); tbl.setParameters(tableParams); diff --git a/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 491316c..af25e2d 100644 --- a/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -50,8 +50,8 @@ private static List writeRecords; private static List dataColumns; private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); - private static final int NUM_RECORDS = 20; - private static final int NUM_PARTITIONS = 5; + protected static final int NUM_RECORDS = 20; + protected static final int NUM_PARTITIONS = 5; @BeforeClass public static void generateInputData() throws Exception { @@ -60,14 +60,14 @@ public static void generateInputData() throws Exception { generateDataColumns(); } - private static void generateDataColumns() throws HCatException { + protected static void generateDataColumns() throws HCatException { dataColumns = new ArrayList(); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", serdeConstants.STRING_TYPE_NAME, ""))); } - private static void generateWriteRecords(int max, int mod, int offset) { + protected static void generateWriteRecords(int max, int mod, int offset) { writeRecords = new ArrayList(); for (int i = 0; i < max; i++) { diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java index f09a6fb..1e8ee03 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java @@ -65,8 +65,14 @@ private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass private static boolean setupHasRun = false; + private static Map> basicInputData; + protected String storageFormat() { + return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')"; + } + private void dropTable(String tablename) throws IOException, CommandNeedRetryException { driver.run("drop table " + tablename); } @@ -77,8 +83,7 @@ private void createTable(String tablename, String schema, String partitionedBy) if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } - createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + - "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + createTable = createTable + "stored as " +storageFormat(); int retCode = driver.run(createTable).getResponseCode(); if (retCode != 0) { throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]"); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerMulti.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerMulti.java index 7bddaa3..4cd8f7a 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerMulti.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerMulti.java @@ -46,9 +46,14 @@ private static final String BASIC_TABLE = "junit_unparted_basic"; private static final String PARTITIONED_TABLE = "junit_parted_basic"; private static Driver driver; - + private static Map> basicInputData; + protected String storageFormat() { + return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')"; + } + private void dropTable(String tablename) throws IOException, CommandNeedRetryException { driver.run("drop table " + tablename); } @@ -59,8 +64,7 @@ private void createTable(String tablename, String schema, String partitionedBy) if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } - createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + - "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + createTable = createTable + "stored as " + storageFormat(); int retCode = driver.run(createTable).getResponseCode(); if (retCode != 0) { throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]"); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoader.java new file mode 100644 index 0000000..eb49f3a --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoader.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +public class TestOrcHCatLoader extends TestHCatLoader { + + @Override + protected String storageFormat() { + return "orc"; + } + +} \ No newline at end of file diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatStorer.java new file mode 100644 index 0000000..99121f1 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatStorer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +public class TestOrcHCatStorer extends TestHCatStorerMulti { + + @Override + protected String storageFormat() { + return "orc"; + } +} \ No newline at end of file