diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java index 6328b50..9ad94f6 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/HCatRecordSerDe.java @@ -266,28 +266,29 @@ public static Object serializeField(Object field, ObjectInspector fieldObjectIns private static Object serializePrimitiveField(Object field, ObjectInspector fieldObjectInspector) { - if (field != null && HCatContext.INSTANCE.getConf().isPresent()) { + Object f = ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); + if (f != null && HCatContext.INSTANCE.getConf().isPresent()) { Configuration conf = HCatContext.INSTANCE.getConf().get(); - if (field instanceof Boolean && + if (f instanceof Boolean && conf.getBoolean( HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) { - return ((Boolean) field) ? 1 : 0; - } else if (field instanceof Short && + return ((Boolean) f) ? 1 : 0; + } else if (f instanceof Short && conf.getBoolean( HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - return new Integer((Short) field); - } else if (field instanceof Byte && + return new Integer((Short) f); + } else if (f instanceof Byte && conf.getBoolean( HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) { - return new Integer((Byte) field); + return new Integer((Byte) f); } } - return ((PrimitiveObjectInspector) fieldObjectInspector).getPrimitiveJavaObject(field); + return f; } /** diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java b/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java index 1b4971f..b3ea7b0 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java @@ -31,8 +31,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatContext; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; @@ -140,17 +138,9 @@ private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fie private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { switch (((PrimitiveTypeInfo) basePrimitiveTypeInfo).getPrimitiveCategory()) { case BOOLEAN: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER, - HCatConstants.HCAT_DATA_CONVERT_BOOLEAN_TO_INTEGER_DEFAULT)) ? - Type.INT : Type.BOOLEAN; + return Type.BOOLEAN; case BYTE: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? - Type.INT : Type.TINYINT; + return Type.TINYINT; case DOUBLE: return Type.DOUBLE; case FLOAT: @@ -160,11 +150,7 @@ private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) { case LONG: return Type.BIGINT; case SHORT: - return (HCatContext.INSTANCE.getConf().isPresent() && - HCatContext.INSTANCE.getConf().get().getBoolean( - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, - HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)) ? - Type.INT : Type.SMALLINT; + return Type.SMALLINT; case STRING: return Type.STRING; case BINARY: diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java index f6d609b..06f8471 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java @@ -104,6 +104,10 @@ } } + static public boolean pigHasBooleanSupport(){ + return pigHasBooleanSupport; + } + static public Pair getDBTableNames(String location) throws IOException { // the location string will be of the form: // . - parse it and diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java new file mode 100644 index 0000000..8727bf1 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatContext; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; + +public class TestE2EScenarios extends TestCase { + + private static final String TEST_DATA_DIR = System.getProperty("user.dir") + + "/build/test/data/" + TestHCatLoader.class.getCanonicalName(); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + + private static final String TEXTFILE_LOCN = TEST_DATA_DIR + "/textfile"; + + private static Driver driver; + + protected String storageFormat() { + return "orc"; + } + + @Override + protected void setUp() throws Exception { + + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + new File(TEST_WAREHOUSE_DIR).mkdirs(); + + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + + } + + @Override + protected void tearDown() throws Exception { + dropTable("inpy"); + dropTable("rc5318"); + dropTable("orc5318"); + } + + private void dropTable(String tablename) throws IOException, CommandNeedRetryException { + driver.run("drop table " + tablename); + } + + private void createTable(String tablename, String schema, String partitionedBy, String storageFormat) throws IOException, CommandNeedRetryException { + String createTable; + createTable = "create table " + tablename + "(" + schema + ") "; + if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { + createTable = createTable + "partitioned by (" + partitionedBy + ") "; + } + if (storageFormat != null){ + createTable = createTable + "stored as " +storageFormat; + } + driverRun(createTable); + } + + private void driverRun(String cmd) throws IOException, CommandNeedRetryException { + int retCode = driver.run(cmd).getResponseCode(); + if (retCode != 0) { + throw new IOException("Failed to run [" + + cmd + "], return code from hive driver : [" + retCode + "]"); + } + } + + private void pigDump(String tableName) throws IOException { + PigServer server = new PigServer(ExecType.LOCAL); + + System.err.println("==="); + System.err.println(tableName+":"); + server.registerQuery("X = load '" + tableName + + "' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("X"); + while (XIter.hasNext()) { + Tuple t = XIter.next(); + for (Object o : t.getAll()){ + System.err.print( + "\t(" + o.getClass().getName() + ":" + + o.toString() + ")" + ); + } + System.err.println(""); + } + System.err.println("==="); + } + + + private void copyTable(String in, String out) throws IOException, InterruptedException { + Job ijob = new Job(); + Job ojob = new Job(); + HCatInputFormat inpy = new HCatInputFormat(); + inpy.setInput(ijob , null, in); + HCatOutputFormat oupy = new HCatOutputFormat(); + oupy.setOutput(ojob, + OutputJobInfo.create(null, out, new HashMap() + )); + + // Test HCatContext + + System.err.println("HCatContext INSTANCE is present : " +HCatContext.INSTANCE.getConf().isPresent()); + if (HCatContext.INSTANCE.getConf().isPresent()){ + System.err.println("HCatContext tinyint->int promotion says " + + HCatContext.INSTANCE.getConf().get().getBoolean( + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION, + HCatConstants.HCAT_DATA_TINY_SMALL_INT_PROMOTION_DEFAULT)); + } + + HCatSchema tableSchema = inpy.getTableSchema(ijob.getConfiguration()); + System.err.println("Copying from ["+in+"] to ["+out+"] with schema : "+ tableSchema.toString()); + oupy.setSchema(ojob, tableSchema); + oupy.checkOutputSpecs(ojob); + OutputCommitter oc = oupy.getOutputCommitter(createTaskAttemptContext(ojob.getConfiguration())); + oc.setupJob(ojob); + + for (InputSplit split : inpy.getSplits(ijob)){ + + TaskAttemptContext rtaskContext = createTaskAttemptContext(ijob.getConfiguration()); + TaskAttemptContext wtaskContext = createTaskAttemptContext(ojob.getConfiguration()); + + RecordReader rr = inpy.createRecordReader(split, rtaskContext); + rr.initialize(split, rtaskContext); + + OutputCommitter taskOc = oupy.getOutputCommitter(wtaskContext); + taskOc.setupTask(wtaskContext); + RecordWriter, HCatRecord> rw = oupy.getRecordWriter(wtaskContext); + + while(rr.nextKeyValue()){ + rw.write(rr.getCurrentKey(), rr.getCurrentValue()); + } + rw.close(wtaskContext); + taskOc.commitTask(wtaskContext); + rr.close(); + } + + oc.commitJob(ojob); + } + + private TaskAttemptContext createTaskAttemptContext(Configuration tconf) { + Configuration conf = (tconf == null) ? (new Configuration()) : tconf; + TaskAttemptID taskId = new TaskAttemptID(); + conf.setInt("mapred.task.partition", taskId.getId()); + conf.set("mapred.task.id", "attempt__0000_r_000000_" + taskId.getId()); + TaskAttemptContext rtaskContext = new TaskAttemptContext(conf , taskId ); + return rtaskContext; + } + + + public void testReadOrcAndRCFromPig() throws Exception { + String tableSchema = "ti tinyint, si smallint,i int, bi bigint, f float, d double, b boolean"; + + HcatTestUtils.createTestDataFile(TEXTFILE_LOCN, + new String[]{ + "-3\0019001\00186400\0014294967297\00134.532\0012184239842983489.1231231234\001true" + ,"0\0010\0010\0010\0010\0010\001false" + } + ); + + // write this out to a file, and import it into hive + createTable("inpy",tableSchema,null,"textfile"); + createTable("rc5318",tableSchema,null,"rcfile"); + createTable("orc5318",tableSchema,null,"orc"); + driverRun("LOAD DATA LOCAL INPATH '"+TEXTFILE_LOCN+"' OVERWRITE INTO TABLE inpy"); + + // write it out from hive to an rcfile table, and to an orc table +// driverRun("insert overwrite table rc5318 select * from inpy"); + copyTable("inpy","rc5318"); +// driverRun("insert overwrite table orc5318 select * from inpy"); + copyTable("inpy","orc5318"); + + pigDump("inpy"); + pigDump("rc5318"); + pigDump("orc5318"); + + } + +} diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java index e907c73..47339d9 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoader.java @@ -65,7 +65,7 @@ private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass private static boolean setupHasRun = false; - + private static Map> basicInputData; protected String storageFormat() { @@ -413,7 +413,7 @@ public void testConvertBooleanToInt() throws Exception { File inputDataDir = new File(inputFileName).getParentFile(); inputDataDir.mkdir(); - String[] lines = new String[]{"llama\t1", "alpaca\t0"}; + String[] lines = new String[]{"llama\ttrue", "alpaca\tfalse"}; HcatTestUtils.createTestDataFile(inputFileName, lines); assertEquals(0, driver.run("drop table if exists " + tbl).getResponseCode()); @@ -433,13 +433,15 @@ public void testConvertBooleanToInt() throws Exception { assertEquals("a", schema.getField(0).alias); assertEquals(DataType.CHARARRAY, schema.getField(0).type); assertEquals("b", schema.getField(1).alias); - assertEquals(DataType.INTEGER, schema.getField(1).type); + if (PigHCatUtil.pigHasBooleanSupport()){ + assertEquals(DataType.BOOLEAN, schema.getField(1).type); + } else { + assertEquals(DataType.INTEGER, schema.getField(1).type); + } Iterator iterator = server.openIterator("data"); Tuple t = iterator.next(); assertEquals("llama", t.get(0)); - // TODO: Figure out how to load a text file into Hive with boolean columns. This next assert - // passes because data was loaded as integers, not because it was converted. assertEquals(1, t.get(1)); t = iterator.next(); assertEquals("alpaca", t.get(0)); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java index 111b606..1be55dd 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java @@ -60,14 +60,18 @@ private void dropTable(String tablename) throws IOException, CommandNeedRetryExc driver.run("drop table " + tablename); } + protected String storageFormat() { + return "RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver')"; + } + private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException { String createTable; createTable = "create table " + tablename + "(" + schema + ") "; if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { createTable = createTable + "partitioned by (" + partitionedBy + ") "; } - createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + - "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + createTable = createTable + "stored as " + storageFormat(); LOG.info("Creating table:\n {}", createTable); CommandProcessorResponse result = driver.run(createTable); int retCode = result.getResponseCode(); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java new file mode 100644 index 0000000..7d91364 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestOrcHCatLoaderComplexSchema.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.pig; + +public class TestOrcHCatLoaderComplexSchema extends TestHCatLoaderComplexSchema { + + @Override + protected String storageFormat() { + return "orc"; + } + +}