Index: eclipse-templates/.classpath =================================================================== --- eclipse-templates/.classpath (revision 1440134) +++ eclipse-templates/.classpath (working copy) @@ -48,6 +48,8 @@ + + Index: ivy/libraries.properties =================================================================== --- ivy/libraries.properties (revision 1440134) +++ ivy/libraries.properties (working copy) @@ -23,7 +23,7 @@ antlr.version=3.4 antlr-runtime.version=3.4 asm.version=3.1 -avro.version=1.7.1 +avro.version=1.7.3 datanucleus-connectionpool.version=2.0.3 datanucleus-core.version=2.0.3 datanucleus-enhancer.version=2.0.3 Index: data/files/futurama_episodes.avro =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: data/files/futurama_episodes.avro ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: serde/ivy.xml =================================================================== --- serde/ivy.xml (revision 1440134) +++ serde/ivy.xml (working copy) @@ -49,6 +49,10 @@ transitive="false"/> + + queryCols = new ArrayList(); + queryCols.add(3); + queryCols.add(1); + queryCols.add(1); + queryCols.add(3); + queryCols.add(1); + ColumnProjectionUtils.appendReadColumnIDs(hconf, queryCols); + + + // Initialize deserializer + des.initialize(hconf, props); + + ArrayList row = (ArrayList)des.deserialize(garw); + assertEquals(4, row.size()); + + Object stringObj = row.get(1); + assertEquals(stringObj, "as a cloud"); + + Object theRecordObject = row.get(3); + System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName()); + + Object theIntObject = row.get(2); + assertEquals(theIntObject, null); + + // The original record was lost in the deserialization, so just go the correct way, through objectinspectors + StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector(); + List allStructFieldRefs = oi.getAllStructFieldRefs(); + assertEquals(4, allStructFieldRefs.size()); + StructField fieldRefForaRecord = allStructFieldRefs.get(3); + assertEquals("arecord", fieldRefForaRecord.getFieldName()); + Object innerRecord2 = oi.getStructFieldData(row, fieldRefForaRecord); // <--- use this! + + // Extract innerRecord field refs + StandardStructObjectInspector innerRecord2OI = (StandardStructObjectInspector) fieldRefForaRecord.getFieldObjectInspector(); + + List allStructFieldRefs1 = innerRecord2OI.getAllStructFieldRefs(); + assertEquals(3, allStructFieldRefs1.size()); + assertEquals("int1", allStructFieldRefs1.get(0).getFieldName()); + assertEquals("boolean1", allStructFieldRefs1.get(1).getFieldName()); + assertEquals("long1", allStructFieldRefs1.get(2).getFieldName()); + + innerRecord2OI.getStructFieldsDataAsList(innerRecord2); + assertEquals(42, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(0))); + assertEquals(true, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(1))); + assertEquals(42432234234l, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(2))); + } + } Index: serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (revision 1440134) +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (working copy) @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -38,15 +47,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.Writable; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - class AvroDeserializer { private static final Log LOG = LogFactory.getLog(AvroDeserializer.class); /** @@ -62,7 +62,7 @@ private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final GenericDatumWriter gdw = new GenericDatumWriter(); private BinaryDecoder binaryDecoder = null; - private InstanceCache> gdrCache + private final InstanceCache> gdrCache = new InstanceCache>() { @Override protected GenericDatumReader makeInstance(ReaderWriterSchemaPair hv) { @@ -94,7 +94,12 @@ private List row; private SchemaReEncoder reEncoder; + private List readColumnArray; + public void setReadColumnsArray(List rColumnArray) { + this.readColumnArray = rColumnArray; + } + /** * Deserialize an Avro record, recursing into its component fields and * deserializing them as well. Fields of the record are matched by name @@ -112,40 +117,60 @@ */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { - if(!(writable instanceof AvroGenericRecordWritable)) + if(!(writable instanceof AvroGenericRecordWritable)) { throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); + } - if(row == null || row.size() != columnNames.size()) + if(row == null || row.size() != columnNames.size()) { row = new ArrayList(columnNames.size()); - else + } else { row.clear(); + } AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; GenericRecord r = recordWritable.getRecord(); - // Check if we're working with an evolved schema if(!r.getSchema().equals(readerSchema)) { LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false)); - if(reEncoder == null) reEncoder = new SchemaReEncoder(); + if(reEncoder == null) { + reEncoder = new SchemaReEncoder(); + } r = reEncoder.reencode(r, readerSchema); } - workerBase(row, columnNames, columnTypes, r); + workerBase(row, columnNames, columnTypes, r, true); return row; } + // Returns true iff the colid is needed by the query. + private boolean isNeededColumn(boolean project, int projectedIdx, int colId){ + if (!project || readColumnArray == null || + (projectedIdx < readColumnArray.size() && readColumnArray.get(projectedIdx) == colId)) { + return true; + } + return false; + } + // The actual deserialization may involve nested records, which require recursion. private List workerBase(List objectRow, List columnNames, - List columnTypes, GenericRecord record) + List columnTypes, GenericRecord record, + boolean project) throws AvroSerdeException { + //Because projectedColumns is sorted, we can just walk the list and add nulls where appropriate + + int projectedIdx = 0; for(int i = 0; i < columnNames.size(); i++) { - TypeInfo columnType = columnTypes.get(i); - String columnName = columnNames.get(i); - Object datum = record.get(columnName); - Schema datumSchema = record.getSchema().getField(columnName).schema(); - - objectRow.add(worker(datum, datumSchema, columnType)); + if (isNeededColumn(project, projectedIdx, i)){ + projectedIdx++; + TypeInfo columnType = columnTypes.get(i); + String columnName = columnNames.get(i); + Object datum = record.get(columnName); + Schema datumSchema = record.getSchema().getField(columnName).schema(); + objectRow.add(worker(datum, datumSchema, columnType)); + } else { + objectRow.add(null); + } } return objectRow; @@ -156,13 +181,16 @@ // Klaxon! Klaxon! Klaxon! // Avro requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. - if(AvroSerdeUtils.isNullableType(recordSchema)) + if(AvroSerdeUtils.isNullableType(recordSchema)) { return deserializeNullableUnion(datum, recordSchema, columnType); + } if(columnType == TypeInfoFactory.stringTypeInfo) + { return datum.toString(); // To workaround AvroUTF8 // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! + } switch(columnType.getCategory()) { case STRUCT: @@ -186,8 +214,9 @@ TypeInfo columnType) throws AvroSerdeException { int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value Schema schema = recordSchema.getTypes().get(tag); - if(schema.getType().equals(Schema.Type.NULL)) + if(schema.getType().equals(Schema.Type.NULL)) { return null; + } return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema)); } @@ -199,7 +228,7 @@ ArrayList innerFieldNames = columnType.getAllStructFieldNames(); List innerObjectRow = new ArrayList(innerFieldTypes.size()); - return workerBase(innerObjectRow, innerFieldNames, innerFieldTypes, datum); + return workerBase(innerObjectRow, innerFieldNames, innerFieldTypes, datum, false); } private Object deserializeUnion(Object datum, Schema recordSchema, Index: serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java (revision 1440134) +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java (working copy) @@ -17,10 +17,15 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + import org.apache.avro.Schema; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -28,9 +33,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; -import java.util.List; -import java.util.Properties; - /** * Read or write Avro data from Hive. */ @@ -39,6 +41,7 @@ private ObjectInspector oi; private List columnNames; private List columnTypes; + private List readColumns; private Schema schema; private AvroDeserializer avroDeserializer = null; private AvroSerializer avroSerializer = null; @@ -48,8 +51,9 @@ @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { // Reset member variables so we don't get in a half-constructed state - if(schema != null) + if(schema != null) { LOG.info("Resetting already initialized AvroSerDe"); + } schema = null; oi = null; @@ -71,6 +75,7 @@ this.columnNames = aoig.getColumnNames(); this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); + this.readColumns = generateReadColumns(configuration); } @Override @@ -80,13 +85,17 @@ @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) throw new BadSchemaException(); + if(badSchema) { + throw new BadSchemaException(); + } return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema); } @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) throw new BadSchemaException(); + if(badSchema) { + throw new BadSchemaException(); + } return getDeserializer().deserialize(columnNames, columnTypes, writable, schema); } @@ -102,14 +111,37 @@ } private AvroDeserializer getDeserializer() { - if(avroDeserializer == null) avroDeserializer = new AvroDeserializer(); + if(avroDeserializer == null) { + avroDeserializer = new AvroDeserializer(); + avroDeserializer.setReadColumnsArray(readColumns); + } return avroDeserializer; } private AvroSerializer getSerializer() { - if(avroSerializer == null) avroSerializer = new AvroSerializer(); + if(avroSerializer == null) { + avroSerializer = new AvroSerializer(); + } return avroSerializer; } + + private List generateReadColumns(Configuration configuration) { + List columns = null; + if (configuration != null){ + columns = ColumnProjectionUtils.getReadColumnIDs(configuration); + // For "Select *" type queries, the column array comes in empty. handle + // this similar to the case where optimization is disabled. + if (columns.size()==0) { + return null; + } + } else { + return null; + } + + // sort and eliminate duplicates + Collections.sort(columns); + return columns; + } } Index: ql/src/test/results/clientpositive/avro_column_sanity_test.q.out =================================================================== --- ql/src/test/results/clientpositive/avro_column_sanity_test.q.out (revision 0) +++ ql/src/test/results/clientpositive/avro_column_sanity_test.q.out (revision 0) @@ -0,0 +1,172 @@ +PREHOOK: query: -- verify that we can actually read avro-trevni files +CREATE TABLE futurama_episodes +ROW FORMAT SERDE +'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS INPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnInputFormat' +OUTPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnOutputFormat' +TBLPROPERTIES ( + 'avro.schema.literal'= + '{"namespace":"testing.hive.avro.columnar", + "name":"futurama_episode", + "type":"record", + "fields":[{"name":"id", "type":"int"}, + {"name":"season", "type":"int"}, + {"name":"episode", "type":"int"}, + {"name":"title", "type":"string"}]}') +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- verify that we can actually read avro-trevni files +CREATE TABLE futurama_episodes +ROW FORMAT SERDE +'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS INPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnInputFormat' +OUTPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnOutputFormat' +TBLPROPERTIES ( + 'avro.schema.literal'= + '{"namespace":"testing.hive.avro.columnar", + "name":"futurama_episode", + "type":"record", + "fields":[{"name":"id", "type":"int"}, + {"name":"season", "type":"int"}, + {"name":"episode", "type":"int"}, + {"name":"title", "type":"string"}]}') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@futurama_episodes +PREHOOK: query: DESCRIBE futurama_episodes +PREHOOK: type: DESCTABLE +POSTHOOK: query: DESCRIBE futurama_episodes +POSTHOOK: type: DESCTABLE +# col_name data_type comment + +id int from deserializer +season int from deserializer +episode int from deserializer +title string from deserializer +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/futurama_episodes.avro' INTO TABLE futurama_episodes +PREHOOK: type: LOAD +PREHOOK: Output: default@futurama_episodes +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/futurama_episodes.avro' INTO TABLE futurama_episodes +POSTHOOK: type: LOAD +POSTHOOK: Output: default@futurama_episodes +PREHOOK: query: SELECT * FROM futurama_episodes ORDER BY id +PREHOOK: type: QUERY +PREHOOK: Input: default@futurama_episodes +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM futurama_episodes ORDER BY id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@futurama_episodes +#### A masked pattern was here #### +1 1 1 Space Pilot 3000 +2 1 2 The Series Has Landed +3 1 3 I, Roommate +4 1 4 Love's Labors Lost in Space +5 1 5 Fear of a Bot Planet +6 1 6 A Fishful of Dollars +7 1 7 My Three Suns +8 1 8 A Big Piece of Garbage +9 1 9 Hell Is Other Robots +10 2 1 A Flight to Remember +11 2 2 Mars University +12 2 3 When Aliens Attack +13 2 4 Fry & the Slurm Factory +14 2 5 I Second That Emotion +15 2 6 Brannigan Begin Again +16 2 7 A Head in the Polls +17 2 8 Xmas Story +18 2 9 Why Must I Be a Crustacean in Love +20 2 10 Put Your Head on My Shoulder +20 2 11 Lesser of Two Evils +21 2 12 Raging Bender +22 2 13 A Bicyclops Built for Two +23 2 14 How Hermes Requisitioned His Groove Back +24 2 15 A Clone of My Own +25 2 16 The Deep South +26 2 17 Bender Gets Made +27 2 18 The Problem with Popplers +28 2 19 Mother's Day +29 2 20 Anthology of Interest (1) +30 3 1 The Honking +31 3 2 War Is the H-Word +32 3 3 The Cryonic Woman +33 3 4 Parasites Lost +34 3 5 Amazon Women in the Mood +35 3 6 Bendless Love +36 3 7 The Day the Earth Stood Stupid +37 3 8 That's Lobstertainment! +38 3 9 The Birdbot of Ice-Catraz +39 3 10 Luck of the Fryrish +40 3 11 The Cyber House Rules +41 3 12 Insane in the Mainframe +42 3 13 Bendin' in the Wind +43 3 14 Time Keeps on Slipping +44 3 15 I Dated a Robot +45 4 1 Roswell That Ends Well +46 4 2 A Tale of Two Santas +47 4 3 Anthology of Interest (2) +48 4 4 Love and Rocket +49 4 5 Leela's Homeworld +50 4 6 Where the Buggalo Roam +51 4 7 A Pharaoh to Remember +52 4 8 Godfellas +53 4 9 Futurestock +54 4 10 A Leela of Her Own +55 4 11 30% Iron Chef +56 4 12 Where No Fan Has Gone Before +57 5 1 Crimes of the Hot +58 5 2 Jurassic Bark +59 5 3 The Route of All Evil +60 5 4 A Taste of Freedom +61 5 5 Kif Gets Knocked Up a Notch +62 5 6 Less Than Hero +63 5 7 Teenage Mutant Leela's Hurdles +64 5 8 The Why of Fry +65 5 9 The Sting +66 5 10 The Farnsworth Parabox +67 5 11 Three Hundred Big Boys +68 5 12 Spanish Fry +69 5 13 Bend Her +70 5 14 Obsoletely Fabulous +71 5 15 Bender Should Not Be Allowed on Television +72 5 16 The Devil's Hands Are Idle Playthings +73 6 1 Rebirth +74 6 2 In-A-Gadda-Da-Leela +75 6 3 Attack of the Killer App +76 6 4 Proposition Infinity +77 6 5 The Duh-Vinci Code +78 6 6 Lethal Inspection +79 6 7 The Late Philip J. Fry +80 6 8 That Darn Katz! +81 6 9 A Clockwork Origin +82 6 10 The Prisoner of Benda +83 6 11 Lrrreconcilable Ndndifferences +84 6 12 The Mutants Are Revolting +85 6 13 The Futurama Holiday Spectacular +86 6 14 Neutopia +87 6 15 Benderama +88 6 16 Ghost in the Machines +89 6 17 Law and Oracle +90 6 18 The Silence of the Clamps +91 6 19 Yo Leela Leela +92 6 20 All the Presidents' Heads +93 6 21 Möbius Dick +94 6 22 Fry am the Egg Man +95 6 23 The Tip of the Zoidberg +96 6 24 Cold Warriors +97 6 25 Overclockwise +98 6 26 Reincarnation +99 7 1 The Bots and the Bees +100 7 2 A Farewell to Arms +101 7 3 Decision 3012 +102 7 4 The Thief of Baghead +103 7 5 Zapp Dingbat +104 7 6 The Butterjunk Effect +105 7 7 The Six Million Dollar Mon +106 7 8 Fun on a Bun +107 7 9 Free Will Hunting +108 7 10 Near-Death Wish +109 7 11 Viva Mars Vegas +110 7 12 31st Century Fox +111 7 13 Naturama Index: ql/src/test/queries/clientpositive/avro_partition_format.q =================================================================== --- ql/src/test/queries/clientpositive/avro_partition_format.q (revision 0) +++ ql/src/test/queries/clientpositive/avro_partition_format.q (revision 0) @@ -0,0 +1,48 @@ +--Ensure that we can change the file format across partitions and retain access to all data +CREATE TABLE futurama +ROW FORMAT SERDE +'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS INPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnInputFormat' +OUTPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnOutputFormat' +TBLPROPERTIES ( + 'avro.schema.literal'= + '{"namespace":"testing.hive.avro.columnar", + "name":"futurama_episode", + "type":"record", + "fields":[{"name":"id", "type":"int"}, + {"name":"season", "type":"int"}, + {"name":"episode", "type":"int"}, + {"name":"title", "type":"string"}]}'); + +LOAD DATA LOCAL INPATH '../data/files/futurama_episodes.avro' INTO TABLE futurama; + +CREATE TABLE futurama_partitioned + PARTITIONED BY (season INT) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + TBLPROPERTIES ( + 'avro.schema.literal'= + '{"name":"futurama", + "type":"record", + "fields":[{"name":"id", "type":"int"}, + {"name":"episode", "type":"int"}, + {"name":"title", "type":"string"}]}'); + +SET hive.exec.dynamic.partition.mode=nonstrict; +INSERT OVERWRITE TABLE futurama_partitioned PARTITION (season) SELECT id, episode, title, season FROM futurama where season <= 4; + +ALTER TABLE futurama_partitioned SET FILEFORMAT + INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroColumnInputFormat' + OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.avro.AvroColumnOutputFormat'; + +INSERT OVERWRITE TABLE futurama_partitioned PARTITION (season) SELECT id, episode, title, season FROM futurama where season > 4; + +SELECT * FROM futurama_partitioned; Index: ql/src/test/queries/clientpositive/avro_column_sanity_test.q =================================================================== --- ql/src/test/queries/clientpositive/avro_column_sanity_test.q (revision 0) +++ ql/src/test/queries/clientpositive/avro_column_sanity_test.q (revision 0) @@ -0,0 +1,23 @@ +-- verify that we can actually read avro-trevni files +CREATE TABLE futurama_episodes +ROW FORMAT SERDE +'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS INPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnInputFormat' +OUTPUTFORMAT +'org.apache.hadoop.hive.ql.io.avro.AvroColumnOutputFormat' +TBLPROPERTIES ( + 'avro.schema.literal'= + '{"namespace":"testing.hive.avro.columnar", + "name":"futurama_episode", + "type":"record", + "fields":[{"name":"id", "type":"int"}, + {"name":"season", "type":"int"}, + {"name":"episode", "type":"int"}, + {"name":"title", "type":"string"}]}'); + +DESCRIBE futurama_episodes; + +LOAD DATA LOCAL INPATH '../data/files/futurama_episodes.avro' INTO TABLE futurama_episodes; + +SELECT * FROM futurama_episodes ORDER BY id; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnOutputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnOutputFormat.java (revision 0) @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.avro; + +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; +import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.trevni.ColumnFileMetaData; +import org.apache.trevni.avro.AvroColumnWriter; + +/** + * Write to a columnar Avro (Trevni) file from a Hive process. + */ +public class AvroColumnOutputFormat + implements HiveOutputFormat { + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, + Path path, Class valueClass, boolean isCompressed, + Properties properties, Progressable progressable) throws IOException { + Schema schema; + try { + schema = AvroSerdeUtils.determineSchemaOrThrowException(properties); + } catch (AvroSerdeException e) { + throw new IOException(e); + } + + ColumnFileMetaData meta = new ColumnFileMetaData(); + if (isCompressed) { + meta.setCodec(jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC)); + } + + AvroColumnWriter acw = new AvroColumnWriter(schema, meta); + OutputStream out = path.getFileSystem(jobConf).create(path); + + return new AvroColumnRecordWriter(acw, out); + } + + //no records will be emitted from Hive + public RecordWriter + getRecordWriter(FileSystem ignored, JobConf job, String name, + Progressable progress) { + return new RecordWriter() { + public void write(LongWritable key, AvroGenericRecordWritable value) { + throw new RuntimeException("Should not be called"); + } + + public void close(Reporter reporter) { + } + }; + } + + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + return; // Not doing any check + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordWriter.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordWriter.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordWriter.java (revision 0) @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.avro; + + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.io.Writable; +import org.apache.trevni.avro.AvroColumnWriter; + +/** + * Write an Avro GenericRecord to an Avro data file. + */ +public class AvroColumnRecordWriter implements FileSinkOperator.RecordWriter{ + final private AvroColumnWriter acw; + final private OutputStream out; + + public AvroColumnRecordWriter(AvroColumnWriter acw, OutputStream out) throws IOException { + this.acw = acw; + this.out = out; + } + + @Override + public void write(Writable writable) throws IOException { + if(!(writable instanceof AvroGenericRecordWritable)) { + throw new IOException("Expecting instance of AvroGenericRecordWritable, " + + "but received" + writable.getClass().getCanonicalName()); + } + AvroGenericRecordWritable r = (AvroGenericRecordWritable)writable; + acw.write(r.getRecord()); + } + + + @Override + public void close(boolean abort) throws IOException { + acw.writeTo(out); + out.close(); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroRecordReaderBase.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroRecordReaderBase.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroRecordReaderBase.java (revision 0) @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.avro; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; + +abstract class AvroRecordReaderBase implements +RecordReader, JobConfigurable { + protected static final Log LOG = LogFactory.getLog(AvroRecordReaderBase.class); + + public abstract float getProgress() throws IOException; + + public abstract long getPos() throws IOException; + + public abstract boolean next(NullWritable nullWritable, AvroGenericRecordWritable record) throws IOException; + + protected JobConf jobConf; + + /** + * Attempt to retrieve the reader schema. We have a couple opportunities + * to provide this, depending on whether or not we're just selecting data + * or running with a MR job. + * @return Reader schema for the Avro object, or null if it has not been provided. + * @throws AvroSerdeException + */ + protected Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException, IOException { + FileSystem fs = split.getPath().getFileSystem(job); + // Inside of a MR job, we can pull out the actual properties + if(AvroSerdeUtils.insideMRJob(job)) { + MapredWork mapRedWork = Utilities.getMapRedWork(job); + + // Iterate over the Path -> Partition descriptions to find the partition + // that matches our input split. + for (Map.Entry pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){ + String partitionPath = pathsAndParts.getKey(); + if(pathIsInPartition(split.getPath(), partitionPath)) { + if(LOG.isInfoEnabled()) { + LOG.info("Matching partition " + partitionPath + + " with input split " + split); + } + + Properties props = pathsAndParts.getValue().getProperties(); + if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) { + return AvroSerdeUtils.determineSchemaOrThrowException(props); + } + else { + return null; // If it's not in this property, it won't be in any others + } + } + } + if(LOG.isInfoEnabled()) { + LOG.info("Unable to match filesplit " + split + " with a partition."); + } + } + + // In "select * from table" situations (non-MR), we can add things to the job + // It's safe to add this to the job since it's not *actually* a mapred job. + // Here the global state is confined to just this process. + String s = job.get(AvroSerdeUtils.AVRO_SERDE_SCHEMA); + if(s != null) { + LOG.info("Found the avro schema in the job: " + s); + return Schema.parse(s); + } + // No more places to get the schema from. Give up. May have to re-encode later. + return null; + } + + private boolean pathIsInPartition(Path split, String partitionPath) { + boolean schemeless = split.toUri().getScheme() == null; + if (schemeless) { + String schemelessPartitionPath = new Path(partitionPath).toUri().getPath(); + return split.toString().startsWith(schemelessPartitionPath); + } else { + return split.toString().startsWith(partitionPath); + } + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public AvroGenericRecordWritable createValue() { + return new AvroGenericRecordWritable(); + } + + @Override + public abstract void close() throws IOException; + + @Override + public void configure(JobConf jobConf) { + this.jobConf= jobConf; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (revision 1440134) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io.avro; +import java.io.IOException; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; @@ -26,34 +28,23 @@ import org.apache.avro.mapred.FsInput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; -import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; - -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - /** * RecordReader optimized against Avro GenericRecords that returns to record * as the value of the k-v pair, as Hive requires. */ -public class AvroGenericRecordReader implements - RecordReader, JobConfigurable { - private static final Log LOG = LogFactory.getLog(AvroGenericRecordReader.class); +public class AvroGenericRecordReader extends AvroRecordReaderBase { + protected static final Log LOG = LogFactory.getLog(AvroGenericRecordReader.class); - final private org.apache.avro.file.FileReader reader; final private long start; final private long stop; - protected JobConf jobConf; + final private DataFileReader reader; public AvroGenericRecordReader(JobConf job, FileSplit split, Reporter reporter) throws IOException { this.jobConf = job; @@ -67,7 +58,9 @@ GenericDatumReader gdr = new GenericDatumReader(); - if(latest != null) gdr.setExpected(latest); + if(latest != null) { + gdr.setExpected(latest); + } this.reader = new DataFileReader(new FsInput(split.getPath(), job), gdr); this.reader.sync(split.getStart()); @@ -75,62 +68,6 @@ this.stop = split.getStart() + split.getLength(); } - /** - * Attempt to retrieve the reader schema. We have a couple opportunities - * to provide this, depending on whether or not we're just selecting data - * or running with a MR job. - * @return Reader schema for the Avro object, or null if it has not been provided. - * @throws AvroSerdeException - */ - private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException, IOException { - FileSystem fs = split.getPath().getFileSystem(job); - // Inside of a MR job, we can pull out the actual properties - if(AvroSerdeUtils.insideMRJob(job)) { - MapredWork mapRedWork = Utilities.getMapRedWork(job); - - // Iterate over the Path -> Partition descriptions to find the partition - // that matches our input split. - for (Map.Entry pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){ - String partitionPath = pathsAndParts.getKey(); - if(pathIsInPartition(split.getPath(), partitionPath)) { - if(LOG.isInfoEnabled()) { - LOG.info("Matching partition " + partitionPath + - " with input split " + split); - } - - Properties props = pathsAndParts.getValue().getProperties(); - if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) { - return AvroSerdeUtils.determineSchemaOrThrowException(props); - } else - return null; // If it's not in this property, it won't be in any others - } - } - if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition."); - } - - // In "select * from table" situations (non-MR), we can add things to the job - // It's safe to add this to the job since it's not *actually* a mapred job. - // Here the global state is confined to just this process. - String s = job.get(AvroSerdeUtils.AVRO_SERDE_SCHEMA); - if(s != null) { - LOG.info("Found the avro schema in the job: " + s); - return Schema.parse(s); - } - // No more places to get the schema from. Give up. May have to re-encode later. - return null; - } - - private boolean pathIsInPartition(Path split, String partitionPath) { - boolean schemeless = split.toUri().getScheme() == null; - if (schemeless) { - String schemelessPartitionPath = new Path(partitionPath).toUri().getPath(); - return split.toString().startsWith(schemelessPartitionPath); - } else { - return split.toString().startsWith(partitionPath); - } - } - - @Override public boolean next(NullWritable nullWritable, AvroGenericRecordWritable record) throws IOException { if(!reader.hasNext() || reader.pastSync(stop)) { @@ -144,33 +81,18 @@ } @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public AvroGenericRecordWritable createValue() { - return new AvroGenericRecordWritable(); - } - - @Override public long getPos() throws IOException { return reader.tell(); } @Override - public void close() throws IOException { - reader.close(); - } - - @Override public float getProgress() throws IOException { return stop == start ? 0.0f : Math.min(1.0f, (getPos() - start) / (float)(stop - start)); } @Override - public void configure(JobConf jobConf) { - this.jobConf= jobConf; + public void close() throws IOException { + reader.close(); } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordReader.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnRecordReader.java (revision 0) @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.avro; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.trevni.avro.AvroColumnReader; +import org.apache.trevni.avro.HadoopInput; + +/** + * RecordReader for Trevni (columnar) formated Avro data. Reads only the columns needed, leaving all other fields null. + */ +public class AvroColumnRecordReader extends AvroRecordReaderBase { + protected static final Log LOG = LogFactory.getLog(AvroColumnRecordReader.class); + + Schema readSchema; + Schema projectSchema; + private long count = 0; + final private long total_rows; + final private AvroColumnReader reader; + private final GenericRecord cache; + + public AvroColumnRecordReader(JobConf job, FileSplit split, Reporter reporter) throws IOException { + this.jobConf = job; + Schema latest; + + try { + latest = getSchema(job, split); + } catch (AvroSerdeException e) { + throw new IOException(e); + } + + readSchema = latest; + cache = new GenericData.Record(readSchema); + + //Trevni spec requires that 1 file = 1 block, so we'll read the whole file. It's up to the user + //to conform to this requirement. + + HadoopInput input = new HadoopInput(split.getPath(), job); + AvroColumnReader.Params params = new AvroColumnReader.Params(input); + projectSchema = getProjectedSchema(job, latest); + params.setSchema (projectSchema); + reader = new AvroColumnReader(params); + total_rows = reader.getRowCount(); + } + + //Project an Avro schema based on columnIds. We're only able to project the top level fields + private Schema getProjectedSchema(Configuration conf, Schema baseSchema){ + List readColumns = ColumnProjectionUtils.getReadColumnIDs(conf); + if (readColumns.isEmpty()) { + return baseSchema; + } + List fields = baseSchema.getFields(); + List readFields = new ArrayList(); + for (Integer columnId : readColumns){ + readFields.add(clone(fields.get(columnId))); + } + Schema projected = Schema.createRecord(baseSchema.getName(), null, baseSchema.getNamespace(), false); + projected.setFields(readFields); + return projected; + } + + Field clone(Field field){ + Field cloned = new Field(field.name(), field.schema(), field.doc(), field.defaultValue()); + return cloned; + } + + @Override + public boolean next(NullWritable nullWritable, AvroGenericRecordWritable record) throws IOException { + if(!reader.hasNext()) { + return false; + } + + //The RecordReader should return a record with the proper schema, so we copy over the projected fields + GenericData.Record r = (GenericData.Record)reader.next(); + count++; + for (Schema.Field field : projectSchema.getFields()){ + String name = field.name(); + cache.put(name, r.get(name)); + } + record.setRecord(cache); + return true; + } + + @Override + public long getPos() throws IOException { + //This isn't canonical, but it makes the most sense for a columnar storage + return count; + } + + @Override + public float getProgress() throws IOException { + return ((float)count)/total_rows; + } + + @Override + public void close() throws IOException { + reader.close(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroColumnInputFormat.java (revision 0) @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.avro; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class AvroColumnInputFormat + extends FileInputFormat implements JobConfigurable { + protected JobConf jobConf; + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + List result = new ArrayList(); + for (FileStatus file : super.listStatus(job)) { + result.add(file); + } + return result.toArray(new FileStatus[0]); + } + + @Override + public RecordReader + getRecordReader(InputSplit inputSplit, JobConf jc, Reporter reporter) throws IOException { + return new AvroColumnRecordReader(jc, (FileSplit) inputSplit, reporter); + } + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } +} Index: ql/build.xml =================================================================== --- ql/build.xml (revision 1440134) +++ ql/build.xml (working copy) @@ -209,6 +209,18 @@ + + + + + + + + + + + + @@ -230,6 +242,8 @@ + +