diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java index c46db33..65e799a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java @@ -116,6 +116,10 @@ public class HCatStorer extends HCatBaseStorer { "Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE); } + String externalLocation = (String) udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION); + if (externalLocation != null) { + outputJobInfo.setLocation(externalLocation); + } try { HCatOutputFormat.setOutput(job, outputJobInfo); } catch (HCatException he) { diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java new file mode 100644 index 0000000..e4f6611 --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.pig.impl.util.UDFContext; + +/** + * This class is used to test the HCAT_PIG_STORER_EXTERNAL_LOCATION property used in HCatStorer. + * When this property is set, HCatStorer writes the output to the location it specifies. Since + * the property can only be set in the UDFContext, we need this simpler wrapper to do three things: + *
    + *
  1. save the external dir specified in the Pig script
  2. + *
  3. set the same UDFContext signature as HCatStorer
  4. + *
  5. before {@link HCatStorer#setStoreLocation(String, Job)}, set the external dir in the UDFContext.
  6. + *
+ */ +public class HCatStorerWrapper extends HCatStorer { + + private String sign; + private String externalDir; + + public HCatStorerWrapper(String partSpecs, String schema, String externalDir) throws Exception { + super(partSpecs, schema); + this.externalDir = externalDir; + } + + public HCatStorerWrapper(String partSpecs, String externalDir) throws Exception { + super(partSpecs); + this.externalDir = externalDir; + } + + public HCatStorerWrapper(String externalDir) throws Exception{ + super(); + this.externalDir = externalDir; + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + Properties udfProps = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[] { sign }); + udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, externalDir); + super.setStoreLocation(location, job); + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + sign = signature; + super.setStoreFuncUDFContextSignature(signature); + } +} diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java new file mode 100644 index 0000000..15e8a10 --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.UUID; + +import org.apache.commons.lang.SystemUtils; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.junit.Assert; +import org.junit.Test; + +/** + * This test checks the {@link HCatConstants#HCAT_PIG_STORER_EXTERNAL_LOCATION} that we can set in the + * UDFContext of {@link HCatStorer} so that it writes to the specified external location. + * + * Since {@link HCatStorer} does not allow extra parameters in the constructor, we use {@link HCatStorerWrapper} + * that always treats the last parameter as the external path. + */ +public class TestHCatStorerWrapper extends HCatBaseTest { + + private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data"; + + @Test + public void testStoreExternalTableWithExternalDir() throws IOException, CommandNeedRetryException{ + + File tmpExternalDir = new File(SystemUtils.getJavaIoTmpDir(), UUID.randomUUID().toString()); + tmpExternalDir.deleteOnExit(); + + String part_val = "100"; + + driver.run("drop table junit_external"); + String createTable = "create external table junit_external(a int, b string) partitioned by (c string) stored as RCFILE"; + Assert.assertEquals(0, driver.run(createTable).getResponseCode()); + + int LOOP_SIZE = 3; + String[] inputData = new String[LOOP_SIZE*LOOP_SIZE]; + int k = 0; + for(int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for(int j=1;j<=LOOP_SIZE;j++) { + inputData[k++] = si + "\t"+j; + } + } + HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData); + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + logAndRegister(server, "A = load '"+INPUT_FILE_NAME+"' as (a:int, b:chararray);"); + logAndRegister(server, "store A into 'default.junit_external' using " + HCatStorerWrapper.class.getName() + + "('c=" + part_val + "','" + tmpExternalDir.getAbsolutePath() + "');"); + server.executeBatch(); + + Assert.assertTrue(tmpExternalDir.exists()); + Assert.assertTrue(new File(tmpExternalDir.getAbsoluteFile() + "/" + "part-m-00000").exists()); + + driver.run("select * from junit_external"); + ArrayList res = new ArrayList(); + driver.getResults(res); + driver.run("drop table junit_external"); + Iterator itr = res.iterator(); + for(int i = 1; i <= LOOP_SIZE; i++) { + String si = i + ""; + for(int j=1;j<=LOOP_SIZE;j++) { + Assert.assertEquals( si + "\t" + j + "\t" + part_val,itr.next()); + } + } + Assert.assertFalse(itr.hasNext()); + + } +} diff --git src/java/org/apache/hcatalog/common/HCatConstants.java src/java/org/apache/hcatalog/common/HCatConstants.java index 626d91b..d8467c5 100644 --- src/java/org/apache/hcatalog/common/HCatConstants.java +++ src/java/org/apache/hcatalog/common/HCatConstants.java @@ -43,6 +43,14 @@ public final class HCatConstants { public static final String HCAT_PIG_INNER_FIELD_NAME = "hcat.pig.inner.field.name"; public static final String HCAT_PIG_INNER_FIELD_NAME_DEFAULT = "innerfield"; + /** + * {@value} (default: null) + * When the property is set in the UDFContext of the {@link HCatStorer}, {@link HCatStorer} writes + * to the location it specifies instead of the default HCatalog location format. An example can be found + * in @{link HCatStorerWrapper}. + */ + public static final String HCAT_PIG_STORER_EXTERNAL_LOCATION = HCAT_PIG_STORER + ".external.location"; + //The keys used to store info into the job Configuration public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat"; diff --git src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java index 336a3f2..8381a74 100644 --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -288,7 +288,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { if (!dynamicPartitioningUsed) { partitionsToAdd.add( constructPartition( - context, + context, jobInfo, tblPath.toString(), jobInfo.getPartitionValues() , jobInfo.getOutputSchema(), getStorerParameterMap(storer) , table, fs @@ -297,7 +297,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { for (Entry> entry : partitionsDiscoveredByPath.entrySet()) { partitionsToAdd.add( constructPartition( - context, + context, jobInfo, getPartitionRootLocation(entry.getKey(), entry.getValue().size()), entry.getValue() , jobInfo.getOutputSchema(), getStorerParameterMap(storer) , table, fs @@ -402,6 +402,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { /** * Generate partition metadata object to be used to add to metadata. + * @param context The job context. + * @param jobInfo The OutputJobInfo. * @param partLocnRoot The table-equivalent location root of the partition * (temporary dir if dynamic partition, table dir if static) * @param partKVs The keyvalue pairs that form the partition @@ -416,7 +418,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { */ private Partition constructPartition( - JobContext context, + JobContext context, OutputJobInfo jobInfo, String partLocnRoot, Map partKVs, HCatSchema outputSchema, Map params, Table table, FileSystem fs, @@ -440,16 +442,26 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // Sets permissions and group name on partition dirs and files. - Path partPath = new Path(partLocnRoot); - int i = 0; - for (FieldSchema partKey : table.getPartitionKeys()) { - if (i++ != 0) { - applyGroupAndPerms(fs, partPath, perms, grpName, false); + Path partPath; + if (Boolean.valueOf((String)table.getProperty("EXTERNAL")) + && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { + // honor external table that specifies the location + partPath = new Path(jobInfo.getLocation()); + } else { + partPath = new Path(partLocnRoot); + int i = 0; + for (FieldSchema partKey : table.getPartitionKeys()) { + if (i++ != 0) { + applyGroupAndPerms(fs, partPath, perms, grpName, false); + } + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } - partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); } + // Apply the group and permissions to the leaf partition and files. applyGroupAndPerms(fs, partPath, perms, grpName, true); + + // Set the location in the StorageDescriptor if (dynamicPartitioningUsed) { String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs); if (harProcessor.isEnabled()) { diff --git src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java index 3f368d8..d4c3470 100644 --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java @@ -116,8 +116,12 @@ public class FosterStorageHandler extends HCatStorageHandler { String outputLocation; - // For non-partitioned tables, we send them to the temp dir - if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { + if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL")) + && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { + // honor external table that specifies the location + outputLocation = jobInfo.getLocation(); + } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { + // For non-partitioned tables, we send them to the temp dir outputLocation = TEMP_DIR_NAME; } else { List cols = new ArrayList();