diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java index dc9d005..1f230e5 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; @@ -152,7 +153,13 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata return new HCatFieldSchema(fSchema.alias, Type.STRING, null); case DataType.INTEGER: - return new HCatFieldSchema(fSchema.alias, Type.INT, null); + List supportedIntegerConversions = Lists.newArrayList( + Type.TINYINT, Type.SMALLINT, Type.INT); + if (!supportedIntegerConversions.contains(hcatFieldSchema.getType())) { + throw new FrontendException("Unsupported type: " + type + " in Pig's schema", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null); case DataType.LONG: return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null); @@ -301,7 +308,17 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata case DOUBLE: return pigObj; case SMALLINT: + if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) { + throw new BackendException("Value " + pigObj + " is outside the bounds of column " + + hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + } + return ((Integer) pigObj).shortValue(); case TINYINT: + if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) { + throw new BackendException("Value " + pigObj + " is outside the bounds of column " + + hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + } + return ((Integer) pigObj).byteValue(); case BOOLEAN: // would not pass schema validation anyway throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE); @@ -337,7 +354,6 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata for(FieldSchema pigField : pigSchema.getFields()){ HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema); - validateSchema(pigField, hcatField); } diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java index 28d860b..579811a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatLoader.java @@ -187,6 +187,8 @@ public class HCatLoader extends HCatBaseLoader { @Override public ResourceSchema getSchema(String location, Job job) throws IOException { HCatContext.getInstance().mergeConf(job.getConfiguration()); + HCatContext.getInstance().getConf().setBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, true); Table table = phutil.getTable(location, hcatServerUri!=null?hcatServerUri:PigHCatUtil.getHCatServerUri(job), diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java index 2bb94e6..68ae13a 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatContext; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatOutputFormat; @@ -77,6 +78,9 @@ public class HCatStorer extends HCatBaseStorer { @Override public void setStoreLocation(String location, Job job) throws IOException { + HCatContext.getInstance().mergeConf(job.getConfiguration()); + HCatContext.getInstance().getConf().setBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, false); Configuration config = job.getConfiguration(); config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index dee0f74..3550488 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -405,9 +405,6 @@ public class PigHCatUtil { try { Type hType = hcatField.getType(); switch(hType){ - // We don't do type promotion/demotion. - case SMALLINT: - case TINYINT: case BOOLEAN: throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); case ARRAY: diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java new file mode 100644 index 0000000..ed04581 --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Iterator; +import java.util.List; + +/** + * Test that require both HCatLoader and HCatStorer. For read or write only functionality, + * please consider @{link TestHCatLoader} or @{link TestHCatStorer}. + */ +public class TestHCatLoaderStorer extends HCatBaseTest { + + /** + * Ensure Pig can read/write tinyint/smallint columns. + */ + @Test + public void testSmallTinyInt() throws Exception { + + String readTblName = "test_small_tiny_int"; + File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData"); + File dataFile = new File(dataDir, "testSmallTinyInt.tsv"); + + String writeTblName = "test_small_tiny_int_write"; + File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv"); + + FileUtil.fullyDelete(dataDir); // Might not exist + Assert.assertTrue(dataDir.mkdir()); + + HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{ + String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE), + String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE) + }); + + // Create a table with smallint/tinyint columns, load data, and query from Hive. + Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode()); + Assert.assertEquals(0, driver.run("create external table " + readTblName + + " (my_small_int smallint, my_tiny_int tinyint)" + + " row format delimited fields terminated by '\t' stored as textfile").getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + + dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode()); + + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery( + "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();"); + + // Ensure Pig schema is correct. + Schema schema = server.dumpSchema("data"); + Assert.assertEquals(2, schema.getFields().size()); + Assert.assertEquals("my_small_int", schema.getField(0).alias); + Assert.assertEquals(DataType.INTEGER, schema.getField(0).type); + Assert.assertEquals("my_tiny_int", schema.getField(1).alias); + Assert.assertEquals(DataType.INTEGER, schema.getField(1).type); + + // Ensure Pig can read data correctly. + Iterator it = server.openIterator("data"); + Tuple t = it.next(); + Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0)); + Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1)); + t = it.next(); + Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0)); + Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1)); + Assert.assertFalse(it.hasNext()); + + // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the + // bounds of the column type are written, and values outside throw an exception. + Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode()); + Assert.assertEquals(0, driver.run("create table " + writeTblName + + " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode()); + + // Values within the column type bounds. + HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{ + String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE), + String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE) + }); + smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED); + + // Values outside the column type bounds will fail at runtime. + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{ + String.format("%d\t%d", Short.MIN_VALUE - 1, 0)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{ + String.format("%d\t%d", Short.MAX_VALUE + 1, 0)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{ + String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{ + String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED); + } + + private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus) + throws Exception { + Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode()); + Assert.assertEquals(0, driver.run("create table test_tbl" + + " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode()); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("data = load '" + data + + "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);"); + server.registerQuery( + "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();"); + List jobs = server.executeBatch(); + Assert.assertEquals(expectedStatus, jobs.get(0).getStatus()); + } +} diff --git src/java/org/apache/hcatalog/common/HCatConstants.java src/java/org/apache/hcatalog/common/HCatConstants.java index b0f6f70..b7cde4b 100644 --- src/java/org/apache/hcatalog/common/HCatConstants.java +++ src/java/org/apache/hcatalog/common/HCatConstants.java @@ -129,4 +129,16 @@ public final class HCatConstants { public static final String HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER = "hcat.data.convert.boolean.to.integer"; public static final boolean HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT = false; + + /** + * {@value} (default: {@value #HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT}). + * Hive tables support tinyint and smallint columns, while not all processing frameworks support + * these types (Pig only has integer for example). Enable this property to promote tinyint and + * smallint columns to integer at runtime. Note that writes to tinyint and smallint columns + * enforce bounds checking and jobs will fail if attempting to write values outside the column + * bounds. + */ + public static final String HCAT_DATA_TINY_SMALL_INT_PROMOTION = + "hcat.data.tiny.small.int.promotion"; + public static final boolean HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT = false; } diff --git src/java/org/apache/hcatalog/data/HCatRecordSerDe.java src/java/org/apache/hcatalog/data/HCatRecordSerDe.java index 6d6bf3e..9137e22 100644 --- src/java/org/apache/hcatalog/data/HCatRecordSerDe.java +++ src/java/org/apache/hcatalog/data/HCatRecordSerDe.java @@ -183,16 +183,26 @@ public class HCatRecordSerDe implements SerDe { * Return underlying Java Object from an object-representation * that is readable by a provided ObjectInspector. */ - public static Object serializeField(Object field, - ObjectInspector fieldObjectInspector) throws SerDeException { - Object res = null; + public static Object serializeField(Object field, ObjectInspector fieldObjectInspector) + throws SerDeException { + + Object res; if (fieldObjectInspector.getCategory() == Category.PRIMITIVE){ - if (field != null && + if (field != null && field instanceof Boolean && HCatContext.getInstance().getConf().getBoolean( HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT) && - field instanceof Boolean) { + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { res = ((Boolean) field) ? 1 : 0; + } else if (field != null && field instanceof Short && + HCatContext.getInstance().getConf().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { + res = new Integer((Short) field); + } else if (field != null && field instanceof Byte && + HCatContext.getInstance().getConf().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { + res = new Integer((Byte) field); } else { res = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); } diff --git src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java index 3292a6f..c8ad37c 100644 --- src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java +++ src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java @@ -19,6 +19,7 @@ package org.apache.hcatalog.data.schema; import java.io.Serializable; +import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.hcatalog.common.HCatException; public class HCatFieldSchema implements Serializable { @@ -209,11 +210,12 @@ public class HCatFieldSchema implements Serializable { @Override public String toString() { - return "HCatFieldSchema [" - + (fieldName != null ? "fieldName=" + fieldName + ", " : "fieldName=null") - + (comment != null ? "comment=" + comment + ", " : "comment=null") - + (type != null ? "type=" + getTypeString() + ", " : "type=null") - + (category != null ? "category=" + category : "category=null") + "]"; + return new ToStringBuilder(this) + .append("fieldName", fieldName) + .append("comment", comment) + .append("type", getTypeString()) + .append("category", category) + .toString(); } public String getTypeString(){ diff --git src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java index 463ad41..9906873 100644 --- src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java +++ src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java @@ -144,7 +144,9 @@ public class HCatSchemaUtils { HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT) ? Type.INT : Type.BOOLEAN; case BYTE: - return Type.TINYINT; + return HCatContext.getInstance().getConf().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT) ? Type.INT : Type.TINYINT; case DOUBLE: return Type.DOUBLE; case FLOAT: @@ -154,7 +156,10 @@ public class HCatSchemaUtils { case LONG: return Type.BIGINT; case SHORT: - return Type.SMALLINT; + return HCatContext.getInstance().getConf().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT) ? + Type.INT : Type.SMALLINT; case STRING: return Type.STRING; case BINARY: