diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java index dc9d005..1f230e5 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; @@ -152,7 +153,13 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata return new HCatFieldSchema(fSchema.alias, Type.STRING, null); case DataType.INTEGER: - return new HCatFieldSchema(fSchema.alias, Type.INT, null); + List supportedIntegerConversions = Lists.newArrayList( + Type.TINYINT, Type.SMALLINT, Type.INT); + if (!supportedIntegerConversions.contains(hcatFieldSchema.getType())) { + throw new FrontendException("Unsupported type: " + type + " in Pig's schema", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null); case DataType.LONG: return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null); @@ -301,7 +308,17 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata case DOUBLE: return pigObj; case SMALLINT: + if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) { + throw new BackendException("Value " + pigObj + " is outside the bounds of column " + + hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + } + return ((Integer) pigObj).shortValue(); case TINYINT: + if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) { + throw new BackendException("Value " + pigObj + " is outside the bounds of column " + + hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE); + } + return ((Integer) pigObj).byteValue(); case BOOLEAN: // would not pass schema validation anyway throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE); @@ -337,7 +354,6 @@ public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata for(FieldSchema pigField : pigSchema.getFields()){ HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema); - validateSchema(pigField, hcatField); } diff --git hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index dee0f74..3550488 100644 --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -405,9 +405,6 @@ public class PigHCatUtil { try { Type hType = hcatField.getType(); switch(hType){ - // We don't do type promotion/demotion. - case SMALLINT: - case TINYINT: case BOOLEAN: throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); case ARRAY: diff --git hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java new file mode 100644 index 0000000..ed04581 --- /dev/null +++ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderStorer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.mapreduce.HCatBaseTest; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Iterator; +import java.util.List; + +/** + * Test that require both HCatLoader and HCatStorer. For read or write only functionality, + * please consider @{link TestHCatLoader} or @{link TestHCatStorer}. + */ +public class TestHCatLoaderStorer extends HCatBaseTest { + + /** + * Ensure Pig can read/write tinyint/smallint columns. + */ + @Test + public void testSmallTinyInt() throws Exception { + + String readTblName = "test_small_tiny_int"; + File dataDir = new File(TEST_DATA_DIR + "/testSmallTinyIntData"); + File dataFile = new File(dataDir, "testSmallTinyInt.tsv"); + + String writeTblName = "test_small_tiny_int_write"; + File writeDataFile = new File(TEST_DATA_DIR, writeTblName + ".tsv"); + + FileUtil.fullyDelete(dataDir); // Might not exist + Assert.assertTrue(dataDir.mkdir()); + + HcatTestUtils.createTestDataFile(dataFile.getAbsolutePath(), new String[]{ + String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE), + String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE) + }); + + // Create a table with smallint/tinyint columns, load data, and query from Hive. + Assert.assertEquals(0, driver.run("drop table if exists " + readTblName).getResponseCode()); + Assert.assertEquals(0, driver.run("create external table " + readTblName + + " (my_small_int smallint, my_tiny_int tinyint)" + + " row format delimited fields terminated by '\t' stored as textfile").getResponseCode()); + Assert.assertEquals(0, driver.run("load data local inpath '" + + dataDir.getAbsolutePath() + "' into table " + readTblName).getResponseCode()); + + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery( + "data = load '" + readTblName + "' using org.apache.hcatalog.pig.HCatLoader();"); + + // Ensure Pig schema is correct. + Schema schema = server.dumpSchema("data"); + Assert.assertEquals(2, schema.getFields().size()); + Assert.assertEquals("my_small_int", schema.getField(0).alias); + Assert.assertEquals(DataType.INTEGER, schema.getField(0).type); + Assert.assertEquals("my_tiny_int", schema.getField(1).alias); + Assert.assertEquals(DataType.INTEGER, schema.getField(1).type); + + // Ensure Pig can read data correctly. + Iterator it = server.openIterator("data"); + Tuple t = it.next(); + Assert.assertEquals(new Integer(Short.MIN_VALUE), t.get(0)); + Assert.assertEquals(new Integer(Byte.MIN_VALUE), t.get(1)); + t = it.next(); + Assert.assertEquals(new Integer(Short.MAX_VALUE), t.get(0)); + Assert.assertEquals(new Integer(Byte.MAX_VALUE), t.get(1)); + Assert.assertFalse(it.hasNext()); + + // Ensure Pig can write correctly to smallint/tinyint columns. This means values within the + // bounds of the column type are written, and values outside throw an exception. + Assert.assertEquals(0, driver.run("drop table if exists " + writeTblName).getResponseCode()); + Assert.assertEquals(0, driver.run("create table " + writeTblName + + " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode()); + + // Values within the column type bounds. + HcatTestUtils.createTestDataFile(writeDataFile.getAbsolutePath(), new String[]{ + String.format("%d\t%d", Short.MIN_VALUE, Byte.MIN_VALUE), + String.format("%d\t%d", Short.MAX_VALUE, Byte.MAX_VALUE) + }); + smallTinyIntBoundsCheckHelper(writeDataFile.getAbsolutePath(), ExecJob.JOB_STATUS.COMPLETED); + + // Values outside the column type bounds will fail at runtime. + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooSmall.tsv", new String[]{ + String.format("%d\t%d", Short.MIN_VALUE - 1, 0)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooSmall.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/shortTooBig.tsv", new String[]{ + String.format("%d\t%d", Short.MAX_VALUE + 1, 0)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/shortTooBig.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooSmall.tsv", new String[]{ + String.format("%d\t%d", 0, Byte.MIN_VALUE - 1)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooSmall.tsv", ExecJob.JOB_STATUS.FAILED); + + HcatTestUtils.createTestDataFile(TEST_DATA_DIR + "/byteTooBig.tsv", new String[]{ + String.format("%d\t%d", 0, Byte.MAX_VALUE + 1)}); + smallTinyIntBoundsCheckHelper(TEST_DATA_DIR + "/byteTooBig.tsv", ExecJob.JOB_STATUS.FAILED); + } + + private void smallTinyIntBoundsCheckHelper(String data, ExecJob.JOB_STATUS expectedStatus) + throws Exception { + Assert.assertEquals(0, driver.run("drop table if exists test_tbl").getResponseCode()); + Assert.assertEquals(0, driver.run("create table test_tbl" + + " (my_small_int smallint, my_tiny_int tinyint) stored as rcfile").getResponseCode()); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerQuery("data = load '" + data + + "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);"); + server.registerQuery( + "store data into 'test_tbl' using org.apache.hcatalog.pig.HCatStorer();"); + List jobs = server.executeBatch(); + Assert.assertEquals(expectedStatus, jobs.get(0).getStatus()); + } +} diff --git src/java/org/apache/hcatalog/data/HCatRecordSerDe.java src/java/org/apache/hcatalog/data/HCatRecordSerDe.java index ec9b8c5..90baefa 100644 --- src/java/org/apache/hcatalog/data/HCatRecordSerDe.java +++ src/java/org/apache/hcatalog/data/HCatRecordSerDe.java @@ -186,6 +186,11 @@ public class HCatRecordSerDe implements SerDe { Object res = null; if (fieldObjectInspector.getCategory() == Category.PRIMITIVE){ res = ((PrimitiveObjectInspector)fieldObjectInspector).getPrimitiveJavaObject(field); + if (Short.class.isAssignableFrom(res.getClass())) { + return new Integer((Short) res); + } else if (Byte.class.isAssignableFrom(res.getClass())) { + return new Integer((Byte) res); + } } else if (fieldObjectInspector.getCategory() == Category.STRUCT){ res = serializeStruct(field,(StructObjectInspector)fieldObjectInspector); } else if (fieldObjectInspector.getCategory() == Category.LIST){