diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java index c46db33..65e799a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java @@ -116,6 +116,10 @@ public class HCatStorer extends HCatBaseStorer { "Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE); } + String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION); + if (externalLocation != null) { + outputJobInfo.setLocation(externalLocation); + } try { HCatOutputFormat.setOutput(job, outputJobInfo); } catch (HCatException he) { diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java new file mode 100644 index 0000000..f9c87bc --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.pig.impl.util.UDFContext; + +public class HCatStorerWrapper extends HCatStorer { + + private String sign; + private String externalDir; + + public HCatStorerWrapper(String partSpecs, String schema, String externalDir) throws Exception { + super(partSpecs, schema); + this.externalDir = externalDir; + } + + public HCatStorerWrapper(String partSpecs, String externalDir) throws Exception { + super(partSpecs); + this.externalDir = externalDir; + } + + public HCatStorerWrapper(String externalDir) throws Exception{ + super(); + this.externalDir = externalDir; + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + Properties udfProps = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[] { sign }); + udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, externalDir); + super.setStoreLocation(location, job); + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + sign = signature; + super.setStoreFuncUDFContextSignature(signature); + } +} diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java new file mode 100644 index 0000000..698422a --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.UUID; + +import org.apache.commons.lang.SystemUtils; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.junit.Assert; +import org.junit.Test; + +public class TestHCatStorerWrapper extends HCatBaseTest { + + private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data"; + + @Test + public void testStoreExternalTableWithExternalDir() throws IOException, CommandNeedRetryException{ + + File tmpExternalDir = new File(SystemUtils.getJavaIoTmpDir(), UUID.randomUUID().toString()); + tmpExternalDir.deleteOnExit(); + + String part_val = "100"; + + driver.run("drop table junit_external"); + String createTable = "create external table junit_external(a int, b string) partitioned by (c string) stored as RCFILE"; + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + int LOOP_SIZE = 3; + String[] inputData = new String[LOOP_SIZE*LOOP_SIZE]; + int k = 0; + for(int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for(int j=1;j<=LOOP_SIZE;j++) { + inputData[k++] = si + "\t"+j; + } + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData); + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("A = load '"+INPUT_FILE_NAME+"' as (a:int, b:chararray);"); + server.registerQuery("store A into 'default.junit_external' using " + HCatStorerWrapper.class.getName() + + "('c=" + part_val + "','" + tmpExternalDir.getAbsolutePath() + "');"); + server.executeBatch(); + + Assert.assertTrue(tmpExternalDir.exists()); + Assert.assertTrue(new File(tmpExternalDir.getAbsoluteFile() + "/" + "part-m-00000").exists()); + + driver.run("select * from junit_external"); + ArrayList res = new ArrayList(); + driver.getResults(res); + driver.run("drop table junit_external"); + Iterator itr = res.iterator(); + for(int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for(int j=1;j<=LOOP_SIZE;j++) { + Assert.assertEquals( si + "\t" + j + "\t" + part_val,itr.next()); + } + } + Assert.assertFalse(itr.hasNext()); + + } +} diff --git src/java/org/apache/hcatalog/common/HCatConstants.java src/java/org/apache/hcatalog/common/HCatConstants.java index 626d91b..ee5efe0 100644 --- src/java/org/apache/hcatalog/common/HCatConstants.java +++ src/java/org/apache/hcatalog/common/HCatConstants.java @@ -37,6 +37,7 @@ public final class HCatConstants { public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args"; public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter"; public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ","; + public static final String HCAT_PIG_STORER_EXTERNAL_LOCATION = HCAT_PIG_STORER + ".external.location"; public static final String HCAT_PIG_STORER_LOCATION_SET = HCAT_PIG_STORER + ".location.set"; public static final String HCAT_PIG_INNER_TUPLE_NAME = "hcat.pig.inner.tuple.name"; public static final String HCAT_PIG_INNER_TUPLE_NAME_DEFAULT = "innertuple"; diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java index 8e692dc..f6630cd 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -286,7 +286,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { if (!dynamicPartitioningUsed) { partitionsToAdd.add( constructPartition( - context, + context, jobInfo, tblPath.toString(), jobInfo.getPartitionValues() , jobInfo.getOutputSchema(), getStorerParameterMap(storer) , table, fs @@ -295,7 +295,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { for (Entry> entry : partitionsDiscoveredByPath.entrySet()) { partitionsToAdd.add( constructPartition( - context, + context, jobInfo, getPartitionRootLocation(entry.getKey(), entry.getValue().size()), entry.getValue() , jobInfo.getOutputSchema(), getStorerParameterMap(storer) , table, fs @@ -400,6 +400,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { /** * Generate partition metadata object to be used to add to metadata. + * @param context The job context. + * @param jobInfo The OutputJobInfo. * @param partLocnRoot The table-equivalent location root of the partition * (temporary dir if dynamic partition, table dir if static) * @param partKVs The keyvalue pairs that form the partition @@ -414,7 +416,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { */ private Partition constructPartition( - JobContext context, + JobContext context, OutputJobInfo jobInfo, String partLocnRoot, Map partKVs, HCatSchema outputSchema, Map params, Table table, FileSystem fs, @@ -438,16 +440,26 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // Sets permissions and group name on partition dirs and files. - Path partPath = new Path(partLocnRoot); - int i = 0; - for (FieldSchema partKey : table.getPartitionKeys()) { - if (i++ != 0) { - applyGroupAndPerms(fs, partPath, perms, grpName, false); + Path partPath; + if (Boolean.valueOf((String)table.getProperty("EXTERNAL")) + && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { + // honor external table that specifies the location + partPath = new Path(jobInfo.getLocation()); + } else { + partPath = new Path(partLocnRoot); + int i = 0; + for (FieldSchema partKey : table.getPartitionKeys()) { + if (i++ != 0) { + applyGroupAndPerms(fs, partPath, perms, grpName, false); + } + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } - partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } + // Apply the group and permissions to the leaf partition and files. applyGroupAndPerms(fs, partPath, perms, grpName, true); + + // Set the location in the StorageDescriptor if (dynamicPartitioningUsed) { String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs); if (harProcessor.isEnabled()) { diff --git src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java index 3f368d8..d4c3470 100644 --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java @@ -116,8 +116,12 @@ public class FosterStorageHandler extends HCatStorageHandler { String outputLocation; - // For non-partitioned tables, we send them to the temp dir - if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { + if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL")) + && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { + // honor external table that specifies the location + outputLocation = jobInfo.getLocation(); + } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { + // For non-partitioned tables, we send them to the temp dir outputLocation = TEMP_DIR_NAME; } else { List cols = new ArrayList();