diff --git src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java index 99bebc5..e761f80 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java @@ -20,7 +20,7 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.Map; -import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -54,7 +54,7 @@ class HCatRecordReader extends RecordReader { /** The storage handler used */ private final HCatStorageHandler storageHandler; - private SerDe serde; + private Deserializer deserializer; private Map valuesNotInDataCols; @@ -82,7 +82,7 @@ class HCatRecordReader extends RecordReader { HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); - serde = createSerDe(hcatSplit, storageHandler, taskContext); + createDeserializer(hcatSplit, storageHandler, taskContext); // Pull the output schema out of the TaskAttemptContext outputSchema = (HCatSchema) HCatUtil.deserialize( @@ -108,22 +108,20 @@ class HCatRecordReader extends RecordReader { InternalUtil.createReporter(taskContext)); } - private SerDe createSerDe(HCatSplit hcatSplit, HCatStorageHandler storageHandler, + private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { - SerDe serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), + deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), taskContext.getConfiguration()); try { - InternalUtil.initializeInputSerDe(serde, storageHandler.getConf(), + InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(), hcatSplit.getPartitionInfo().getTableInfo(), hcatSplit.getPartitionInfo().getPartitionSchema()); } catch (SerDeException e) { - throw new IOException("Failed initializing SerDe " + throw new IOException("Failed initializing deserializer " + storageHandler.getSerDeClass().getName(), e); } - - return serde; } /* (non-Javadoc) @@ -145,7 +143,7 @@ class HCatRecordReader extends RecordReader { try { - r = new LazyHCatRecord(serde.deserialize(currentValue),serde.getObjectInspector()); + r = new LazyHCatRecord(deserializer.deserialize(currentValue), deserializer.getObjectInspector()); DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); int i = 0; for (String fieldName : outputSchema.getFieldNames()){ diff --git src/java/org/apache/hcatalog/mapreduce/InternalUtil.java src/java/org/apache/hcatalog/mapreduce/InternalUtil.java index fa78a61..34c843f 100644 --- src/java/org/apache/hcatalog/mapreduce/InternalUtil.java +++ src/java/org/apache/hcatalog/mapreduce/InternalUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -50,13 +53,14 @@ import java.util.Map; import java.util.Properties; class InternalUtil { + private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class); static StorerInfo extractStorerInfo(StorageDescriptor sd, Map properties) throws IOException { Properties hcatProperties = new Properties(); for (String key : properties.keySet()){ hcatProperties.put(key, properties.get(key)); } - + // also populate with StorageDescriptor->SerDe.Parameters for (Map.Entryparam : sd.getSerdeInfo().getParameters().entrySet()) { @@ -132,28 +136,26 @@ class InternalUtil { //TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, - OutputJobInfo jobInfo) - throws SerDeException { - initializeSerDe(serDe, conf, jobInfo.getTableInfo(), - jobInfo.getOutputSchema()); + static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) + throws SerDeException { + serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema())); } - static void initializeInputSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - initializeSerDe(serDe, conf, info, s); + static void initializeDeserializer(Deserializer deserializer, Configuration conf, + HCatTableInfo info, HCatSchema schema) throws SerDeException { + Properties props = getSerdeProperties(info, schema); + LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props); + deserializer.initialize(conf, props); } - static void initializeSerDe(SerDe serDe, Configuration conf, - HCatTableInfo info, HCatSchema s) - throws SerDeException { - Properties props = new Properties(); + private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) + throws SerDeException { + Properties props = new Properties(); List fields = HCatUtil.getFieldSchemaList(s.getFields()); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, - MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, - MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); // setting these props to match LazySimpleSerde props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N"); @@ -162,7 +164,7 @@ class InternalUtil { //add props from params set in table schema props.putAll(info.getStorerInfo().getProperties()); - serDe.initialize(conf,props); + return props; } static Reporter createReporter(TaskAttemptContext context) { diff --git src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java new file mode 100644 index 0000000..0c5b982 --- /dev/null +++ src/test/org/apache/hcatalog/mapreduce/TestHCatHiveThriftCompatibility.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.thrift.test.IntString; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.Iterator; + +public class TestHCatHiveThriftCompatibility extends HCatBaseTest { + + private boolean setUpComplete = false; + private Path intStringSeq; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + if (setUpComplete) { + return; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TIOStreamTransport transport = new TIOStreamTransport(out); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + IntString intString = new IntString(1, "one", 1); + intString.write(protocol); + BytesWritable bytesWritable = new BytesWritable(out.toByteArray()); + + intStringSeq = new Path(TEST_DATA_DIR + "/data/intString.seq"); + LOG.info("Creating data file: " + intStringSeq); + + SequenceFile.Writer seqFileWriter = SequenceFile.createWriter( + intStringSeq.getFileSystem(hiveConf), hiveConf, intStringSeq, + NullWritable.class, BytesWritable.class); + seqFileWriter.append(NullWritable.get(), bytesWritable); + seqFileWriter.close(); + + setUpComplete = true; + } + + /** + * Create a table with no explicit schema and ensure its correctly + * discovered from the thrift struct. + */ + @Test + public void testDynamicCols() throws Exception { + Assert.assertEquals(0, driver.run("drop table if exists test_thrift").getResponseCode()); + Assert.assertEquals(0, driver.run( + "create external table test_thrift " + + "partitioned by (year string) " + + "row format serde 'org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer' " + + "with serdeproperties ( " + + " 'serialization.class'='org.apache.hadoop.hive.serde2.thrift.test.IntString', " + + " 'serialization.format'='org.apache.thrift.protocol.TBinaryProtocol') " + + "stored as" + + " inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat'" + + " outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'") + .getResponseCode()); + Assert.assertEquals(0, + driver.run("alter table test_thrift add partition (year = '2012') location '" + + intStringSeq.getParent() + "'").getResponseCode()); + + PigServer pigServer = new PigServer(ExecType.LOCAL); + pigServer.registerQuery("A = load 'test_thrift' using org.apache.hcatalog.pig.HCatLoader();"); + + Schema expectedSchema = new Schema(); + expectedSchema.add(new Schema.FieldSchema("myint", DataType.INTEGER)); + expectedSchema.add(new Schema.FieldSchema("mystring", DataType.CHARARRAY)); + expectedSchema.add(new Schema.FieldSchema("underscore_int", DataType.INTEGER)); + expectedSchema.add(new Schema.FieldSchema("year", DataType.CHARARRAY)); + + Assert.assertEquals(expectedSchema, pigServer.dumpSchema("A")); + + Iterator iterator = pigServer.openIterator("A"); + Tuple t = iterator.next(); + Assert.assertEquals(1, t.get(0)); + Assert.assertEquals("one", t.get(1)); + Assert.assertEquals(1, t.get(2)); + Assert.assertEquals("2012", t.get(3)); + + Assert.assertFalse(iterator.hasNext()); + } +} \ No newline at end of file