Index: src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java =================================================================== --- src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java (revision 0) +++ src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/TestFlatFileInputFormat.java (revision 0) @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.record.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.ReflectionUtils; + +import com.facebook.thrift.*; +import com.facebook.thrift.transport.*; +import com.facebook.thrift.protocol.*; + +import org.apache.hadoop.contrib.serialization.thrift.*; + +public class TestFlatFileInputFormat extends TestCase { + + /** + * Simple test object + */ + static class TestObj implements Serializable { + String s; + int num; + public TestObj(String s, int num) { + this.s = s; + this.num = num; + } + public TestObj() { + } + } + + + public void testFlatFileInputJava() throws Exception { + Configuration conf; + JobConf job ; + FileSystem fs; + Path dir ; + Path file; + Reporter reporter; + FSDataOutputStream ds; + + try { + // + // create job and filesystem and reporter and such. + // + conf = new Configuration(); + job = new JobConf(conf); + fs = FileSystem.getLocal(conf); + dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + file = new Path(dir, "test.txt"); + reporter = Reporter.NULL; + fs.delete(dir, true); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationImplKey, + org.apache.hadoop.io.serializer.JavaSerialization.class, + org.apache.hadoop.io.serializer.Serialization.class); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationSubclassKey, + TestObj.class, java.io.Serializable.class); + + // + // Write some data out to a flat file + // + FileInputFormat.setInputPaths(job, dir); + ds = fs.create(file); + Serializer serializer = new JavaSerialization().getSerializer(null); + + // construct some data and write it + serializer.open(ds); + for (int i = 0; i < 10; i++) { + serializer.serialize(new TestObj("Hello World! " + String.valueOf(i), i)); + } + serializer.close(); + + // + // Construct the reader + // + FileInputFormat> format = + new FlatFileInputFormat(); + InputSplit[] splits = format.getSplits(job, 1); + + // construct the record reader + RecordReader> reader = + format.getRecordReader(splits[0], job, reporter); + + // create key/value + Void key = reader.createKey(); + FlatFileInputFormat.RowContainer value = reader.createValue(); + + // + // read back the data using the FlatFileRecordReader + // + int count = 0; + while (reader.next(key, value)) { + assertTrue(key == null); + assertTrue(((TestObj)value.row).s.equals("Hello World! " +String.valueOf(count))); + assertTrue(((TestObj)value.row).num == count); + count++; + } + reader.close(); + + } catch(Exception e) { + System.err.println("caught: " + e); + e.printStackTrace(); + } finally { + } + + } + + public void testFlatFileInputRecord() throws Exception { + Configuration conf; + JobConf job ; + FileSystem fs; + Path dir ; + Path file; + Reporter reporter; + FSDataOutputStream ds; + + try { + // + // create job and filesystem and reporter and such. + // + conf = new Configuration(); + job = new JobConf(conf); + fs = FileSystem.getLocal(conf); + dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + file = new Path(dir, "test.txt"); + reporter = Reporter.NULL; + fs.delete(dir, true); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationImplKey, + org.apache.hadoop.io.serializer.WritableSerialization.class, + org.apache.hadoop.io.serializer.Serialization.class); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationSubclassKey, + RecordTestObj.class, Writable.class); + + // + // Write some data out to a flat file + // + FileInputFormat.setInputPaths(job, dir); + ds = fs.create(file); + Serializer serializer = new WritableSerialization().getSerializer(Writable.class); + + // construct some data and write it + serializer.open(ds); + for (int i = 0; i < 10; i++) { + serializer.serialize(new RecordTestObj("Hello World! " + String.valueOf(i), i)); + } + serializer.close(); + + // + // Construct the reader + // + FileInputFormat> format = + new FlatFileInputFormat(); + InputSplit[] splits = format.getSplits(job, 1); + + // construct the record reader + RecordReader> reader = + format.getRecordReader(splits[0], job, reporter); + + // create key/value + Void key = reader.createKey(); + FlatFileInputFormat.RowContainer value = reader.createValue(); + + // + // read back the data using the FlatFileRecordReader + // + int count = 0; + while (reader.next(key, value)) { + assertTrue(key == null); + assertTrue(((RecordTestObj)value.row).getS().equals("Hello World! " +String.valueOf(count))); + assertTrue(((RecordTestObj)value.row).getNum() == count); + count++; + } + reader.close(); + + } catch(Exception e) { + System.err.println("caught: " + e); + e.printStackTrace(); + } finally { + } + + } + + public void testFlatFileInputThrift() throws Exception { + Configuration conf; + JobConf job ; + FileSystem fs; + Path dir ; + Path file; + Reporter reporter; + FSDataOutputStream ds; + + try { + // + // create job and filesystem and reporter and such. + // + conf = new Configuration(); + job = new JobConf(conf); + fs = FileSystem.getLocal(conf); + dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + file = new Path(dir, "test.txt"); + reporter = Reporter.NULL; + fs.delete(dir, true); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationImplKey, + org.apache.hadoop.contrib.serialization.thrift.ThriftSerialization.class, + org.apache.hadoop.io.serializer.Serialization.class); + + job.setClass(FlatFileInputFormat.SerializationContextFromConf.SerializationSubclassKey, + FlatFileThriftTestObj.class, TBase.class); + + // + // Write some data out to a flat file + // + FileInputFormat.setInputPaths(job, dir); + ds = fs.create(file); + Serializer serializer = new ThriftSerialization().getSerializer(TBase.class); + + // construct some data and write it + serializer.open(ds); + for (int i = 0; i < 10; i++) { + serializer.serialize(new FlatFileThriftTestObj("Hello World! " + String.valueOf(i), i)); + } + serializer.close(); + + // + // Construct the reader + // + FileInputFormat> format = + new FlatFileInputFormat(); + InputSplit[] splits = format.getSplits(job, 1); + + // construct the record reader + RecordReader> reader = + format.getRecordReader(splits[0], job, reporter); + + // create key/value + Void key = reader.createKey(); + FlatFileInputFormat.RowContainer value = reader.createValue(); + + // + // read back the data using the FlatFileRecordReader + // + int count = 0; + while (reader.next(key, value)) { + assertTrue(key == null); + assertTrue(((FlatFileThriftTestObj)value.row).s.equals("Hello World! " +String.valueOf(count))); + assertTrue(((FlatFileThriftTestObj)value.row).num == count); + count++; + } + reader.close(); + + } catch(Exception e) { + System.err.println("caught: " + e); + e.printStackTrace(); + } finally { + } + + } + + + + public static void main(String[] args) throws Exception { + new TestFlatFileInputFormat().testFlatFileInputJava(); + new TestFlatFileInputFormat().testFlatFileInputRecord(); + new TestFlatFileInputFormat().testFlatFileInputThrift(); + } +} Index: src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/FlatFileThriftTestObj.java =================================================================== --- src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/FlatFileThriftTestObj.java (revision 0) +++ src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/FlatFileThriftTestObj.java (revision 0) @@ -0,0 +1,149 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package org.apache.hadoop.hive.ql.io; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import com.facebook.thrift.*; + +import com.facebook.thrift.protocol.*; +import com.facebook.thrift.transport.*; + +public class FlatFileThriftTestObj implements TBase, java.io.Serializable { + public String s; + public int num; + + public final Isset __isset = new Isset(); + public static final class Isset implements java.io.Serializable { + public boolean s = false; + public boolean num = false; + } + + public FlatFileThriftTestObj() { + } + + public FlatFileThriftTestObj( + String s, + int num) + { + this(); + this.s = s; + this.__isset.s = true; + this.num = num; + this.__isset.num = true; + } + + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof FlatFileThriftTestObj) + return this.equals((FlatFileThriftTestObj)that); + return false; + } + + public boolean equals(FlatFileThriftTestObj that) { + if (that == null) + return false; + + boolean this_present_s = true && (this.s != null); + boolean that_present_s = true && (that.s != null); + if (this_present_s || that_present_s) { + if (!(this_present_s && that_present_s)) + return false; + if (!this.s.equals(that.s)) + return false; + } + + boolean this_present_num = true; + boolean that_present_num = true; + if (this_present_num || that_present_num) { + if (!(this_present_num && that_present_num)) + return false; + if (this.num != that.num) + return false; + } + + return true; + } + + public int hashCode() { + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) + { + case -1: + if (field.type == TType.STRING) { + this.s = iprot.readString(); + this.__isset.s = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case -2: + if (field.type == TType.I32) { + this.num = iprot.readI32(); + this.__isset.num = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + } + + public void write(TProtocol oprot) throws TException { + TStruct struct = new TStruct("FlatFileThriftTestObj"); + oprot.writeStructBegin(struct); + TField field = new TField(); + if (this.s != null) { + field.name = "s"; + field.type = TType.STRING; + field.id = -1; + oprot.writeFieldBegin(field); + oprot.writeString(this.s); + oprot.writeFieldEnd(); + } + field.name = "num"; + field.type = TType.I32; + field.id = -2; + oprot.writeFieldBegin(field); + oprot.writeI32(this.num); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + public String toString() { + StringBuilder sb = new StringBuilder("FlatFileThriftTestObj("); + sb.append("s:"); + sb.append(this.s); + sb.append(",num:"); + sb.append(this.num); + sb.append(")"); + return sb.toString(); + } + +} + Index: src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java =================================================================== --- src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java (revision 0) +++ src/contrib/hive/ql/src/test/org/apache/hadoop/hive/ql/io/RecordTestObj.java (revision 0) @@ -0,0 +1,212 @@ +// File generated by hadoop record compiler. Do not edit. +package org.apache.hadoop.hive.ql.io; + +public class RecordTestObj extends org.apache.hadoop.record.Record { + private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo; + private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter; + private static int[] _rio_rtiFilterFields; + static { + _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("RecordTestObj"); + _rio_recTypeInfo.addField("s", org.apache.hadoop.record.meta.TypeID.StringTypeID); + _rio_recTypeInfo.addField("num", org.apache.hadoop.record.meta.TypeID.LongTypeID); + } + + private String s; + private long num; + public RecordTestObj() { } + public RecordTestObj( + final String s, + final long num) { + this.s = s; + this.num = num; + } + public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() { + return _rio_recTypeInfo; + } + public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) { + if (null == rti) return; + _rio_rtiFilter = rti; + _rio_rtiFilterFields = null; + } + private static void setupRtiFields() + { + if (null == _rio_rtiFilter) return; + // we may already have done this + if (null != _rio_rtiFilterFields) return; + int _rio_i, _rio_j; + _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()]; + for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) { + _rio_rtiFilterFields[_rio_i] = 0; + } + java.util.Iterator _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator(); + _rio_i=0; + while (_rio_itFilter.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next(); + java.util.Iterator _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator(); + _rio_j=1; + while (_rio_it.hasNext()) { + org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next(); + if (_rio_tInfo.equals(_rio_tInfoFilter)) { + _rio_rtiFilterFields[_rio_i] = _rio_j; + break; + } + _rio_j++; + } + _rio_i++; + } + } + public String getS() { + return s; + } + public void setS(final String s) { + this.s=s; + } + public long getNum() { + return num; + } + public void setNum(final long num) { + this.num=num; + } + public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(this,_rio_tag); + _rio_a.writeString(s,"s"); + _rio_a.writeLong(num,"num"); + _rio_a.endRecord(this,_rio_tag); + } + private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + _rio_a.startRecord(_rio_tag); + s=_rio_a.readString("s"); + num=_rio_a.readLong("num"); + _rio_a.endRecord(_rio_tag); + } + public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) + throws java.io.IOException { + if (null == _rio_rtiFilter) { + deserializeWithoutFilter(_rio_a, _rio_tag); + return; + } + // if we're here, we need to read based on version info + _rio_a.startRecord(_rio_tag); + setupRtiFields(); + for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) { + if (1 == _rio_rtiFilterFields[_rio_i]) { + s=_rio_a.readString("s"); + } + else if (2 == _rio_rtiFilterFields[_rio_i]) { + num=_rio_a.readLong("num"); + } + else { + java.util.ArrayList typeInfos = (java.util.ArrayList)(_rio_rtiFilter.getFieldTypeInfos()); + org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID()); + } + } + _rio_a.endRecord(_rio_tag); + } + public int compareTo (final Object _rio_peer_) throws ClassCastException { + if (!(_rio_peer_ instanceof RecordTestObj)) { + throw new ClassCastException("Comparing different types of records."); + } + RecordTestObj _rio_peer = (RecordTestObj) _rio_peer_; + int _rio_ret = 0; + _rio_ret = s.compareTo(_rio_peer.s); + if (_rio_ret != 0) return _rio_ret; + _rio_ret = (num == _rio_peer.num)? 0 :((num<_rio_peer.num)?-1:1); + if (_rio_ret != 0) return _rio_ret; + return _rio_ret; + } + public boolean equals(final Object _rio_peer_) { + if (!(_rio_peer_ instanceof RecordTestObj)) { + return false; + } + if (_rio_peer_ == this) { + return true; + } + RecordTestObj _rio_peer = (RecordTestObj) _rio_peer_; + boolean _rio_ret = false; + _rio_ret = s.equals(_rio_peer.s); + if (!_rio_ret) return _rio_ret; + _rio_ret = (num==_rio_peer.num); + if (!_rio_ret) return _rio_ret; + return _rio_ret; + } + public Object clone() throws CloneNotSupportedException { + RecordTestObj _rio_other = new RecordTestObj(); + _rio_other.s = this.s; + _rio_other.num = this.num; + return _rio_other; + } + public int hashCode() { + int _rio_result = 17; + int _rio_ret; + _rio_ret = s.hashCode(); + _rio_result = 37*_rio_result + _rio_ret; + _rio_ret = (int) (num^(num>>>32)); + _rio_result = 37*_rio_result + _rio_ret; + return _rio_result; + } + public static String signature() { + return "LRecordTestObj(sl)"; + } + public static class Comparator extends org.apache.hadoop.record.RecordComparator { + public Comparator() { + super(RecordTestObj.class); + } + static public int slurpRaw(byte[] b, int s, int l) { + try { + int os = s; + { + int i = org.apache.hadoop.record.Utils.readVInt(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=(z+i); l-= (z+i); + } + { + long i = org.apache.hadoop.record.Utils.readVLong(b, s); + int z = org.apache.hadoop.record.Utils.getVIntSize(i); + s+=z; l-=z; + } + return (os - s); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + static public int compareRaw(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + try { + int os1 = s1; + { + int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1); + int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2); + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2); + if (r1 != 0) { return (r1<0)?-1:0; } + s1+=i1; s2+=i2; l1-=i1; l1-=i2; + } + { + long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1); + long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2); + if (i1 != i2) { + return ((i1-i2) < 0) ? -1 : 0; + } + int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1); + int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2); + s1+=z1; s2+=z2; l1-=z1; l2-=z2; + } + return (os1 - s1); + } catch(java.io.IOException e) { + throw new RuntimeException(e); + } + } + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + int ret = compareRaw(b1,s1,l1,b2,s2,l2); + return (ret == -1)? -1 : ((ret==0)? 1 : 0);} + } + + static { + org.apache.hadoop.record.RecordComparator.define(RecordTestObj.class, new Comparator()); + } +} Index: src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java =================================================================== --- src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java (revision 0) +++ src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java (revision 0) @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.io.EOFException; +import java.io.InputStream; +import java.io.DataInputStream; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RecordReader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; + +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Deserializer; + +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; + +/** An {@link InputFormat} for Plain files with {@link Deserializer} records */ +public class FlatFileInputFormat extends FileInputFormat> { + + /** + * A work-around until HADOOP-1230 is fixed. + * + * Allows boolean next(k,v) to be called by reference but still allow the deserializer to create a new + * object (i.e., row) on every call to next. + */ + static public class RowContainer { + T row; + } + + /** + * An implementation of SerializationContext is responsible for looking up the Serialization implementation + * for the given RecordReader. Potentially based on the Configuration or some other mechanism + * + * The SerializationFactory does not give this functionality since: + * 1. Requires Serialization implementations to be specified in the Configuration a-priori (although same as setting + * a SerializationContext) + * 2. Does not lookup the actual subclass being deserialized. e.g., for Serializable does not have a way of configuring + * the actual Java class being serialized/deserialized. + */ + static public interface SerializationContext extends Configurable { + + /** + * An {@link Serialization} object for objects of type S + * @return a serialization object for this context + */ + public Serialization getSerialization() throws IOException; + + /** + * Produces the specific class to deserialize + */ + public Class getRealClass() throws IOException; + } + + /** + * An implementation of {@link SerializationContext} that reads the Serialization class and + * specific subclass to be deserialized from the JobConf. + * + */ + static public class SerializationContextFromConf implements FlatFileInputFormat.SerializationContext { + + /** + * The JobConf keys for the Serialization implementation and the Class that is being + * deserialized. + */ + static public final String SerializationImplKey = "mapred.input.serialization.implKey"; + static public final String SerializationSubclassKey = "mapred.input.serialization.subclassKey"; + + + /** + * Implements configurable so it can use the configuration to find the right classes + * Note: ReflectionUtils will automatigically call setConf with the right configuration. + */ + private Configuration conf; + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + /** + * @return the actual class being deserialized + * @exception does not currently throw IOException + */ + public Class getRealClass() throws IOException { + return (Class)conf.getClass(SerializationSubclassKey, null, Object.class); + } + + /** + * Looks up and instantiates the Serialization Object + * + * Important to note here that we are not relying on the Hadoop SerializationFactory part of the + * Serialization framework. This is because in the case of Non-Writable Objects, we cannot make any + * assumptions about the uniformity of the serialization class APIs - i.e., there may not be a "write" + * method call and a subclass may need to implement its own Serialization classes. + * The SerializationFactory currently returns the first (de)serializer that is compatible + * with the class to be deserialized; in this context, that assumption isn't necessarily true. + * + * @return the serialization object for this context + * @exception does not currently throw any IOException + */ + public Serialization getSerialization() throws IOException { + Class> tClass = (Class>)conf.getClass(SerializationImplKey, null, Serialization.class); + return tClass == null ? null : (Serialization)ReflectionUtils.newInstance(tClass, conf); + } + } + + /** + * An {@link RecordReader} for plain files with {@link Deserializer} records + * + * Reads one row at a time of type R. + * R is intended to be a base class of something such as: Record, Writable, Text, ... + * + */ + public class FlatFileRecordReader implements RecordReader> { + + /** + * An interface for a helper class for instantiating {@link Serialization} classes. + */ + /** + * The stream in use - is fsin if not compressed, otherwise, it is dcin. + */ + private final DataInputStream in; + + /** + * The decompressed stream or null if the input is not decompressed. + */ + private final InputStream dcin; + + /** + * The underlying stream. + */ + private final FSDataInputStream fsin; + + /** + * For calculating progress + */ + private final long end; + + /** + * The constructed deserializer + */ + private final Deserializer deserializer; + + /** + * Once EOF is reached, stop calling the deserializer + */ + private boolean isEOF; + + /** + * The JobConf which contains information needed to instantiate the correct Deserializer + */ + private Configuration conf; + + /** + * The actual class of the row's we are deserializing, not just the base class + */ + private Class realRowClass; + + + /** + * FlatFileRecordReader constructor constructs the underlying stream (potentially decompressed) and + * creates the deserializer. + * + * @param conf the jobconf + * @param split the split for this file + */ + public FlatFileRecordReader(Configuration conf, + FileSplit split) throws IOException { + final Path path = split.getPath(); + FileSystem fileSys = path.getFileSystem(conf); + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); + final CompressionCodec codec = compressionCodecs.getCodec(path); + this.conf = conf; + + fsin = fileSys.open(path); + if (codec != null) { + dcin = codec.createInputStream(fsin); + in = new DataInputStream(dcin); + } else { + dcin = null; + in = fsin; + } + + isEOF = false; + end = split.getLength(); + + // Instantiate a SerializationContext which this will use to lookup the Serialization class and the + // actual class being deserialized + SerializationContext sinfo; + Class> sinfoClass = + (Class>)conf.getClass(SerializationContextImplKey, SerializationContextFromConf.class); + + sinfo = (SerializationContext)ReflectionUtils.newInstance(sinfoClass, conf); + + // Get the Serialization object and the class being deserialized + Serialization serialization = sinfo.getSerialization(); + realRowClass = (Class)sinfo.getRealClass(); + + deserializer = (Deserializer)serialization.getDeserializer((Class)realRowClass); + deserializer.open(in); + } + + /** + * The actual class of the data being deserialized + */ + private Class realRowclass; + + /** + * The JobConf key of the SerializationContext to use + */ + static public final String SerializationContextImplKey = "mapred.input.serialization.context_impl"; + + /** + * @return null + */ + public Void createKey() { + return null; + } + + /** + * @return a new R instance. + */ + public RowContainer createValue() { + RowContainer r = new RowContainer(); + r.row = (R)ReflectionUtils.newInstance(realRowClass, conf); + return r; + } + + /** + * Returns the next row # and value + * + * @param key - void as these files have a value only + * @param value - the row container which is always re-used, but the internal value may be set to a new Object + * @return whether the key and value were read. True if they were and false if EOF + * @exception IOException from the deserializer + */ + public synchronized boolean next(Void key, RowContainer value) throws IOException { + if(isEOF || in.available() == 0) { + isEOF = true; + return false; + } + + // the deserializer is responsible for actually reading each record from the stream + try { + value.row = deserializer.deserialize(value.row); + if (value.row == null) { + isEOF = true; + return false; + } + return true; + } catch(EOFException e) { + isEOF = true; + return false; + } + } + + public synchronized float getProgress() throws IOException { + // this assumes no splitting + if (end == 0) { + return 0.0f; + } else { + // gives progress over uncompressed stream + // assumes deserializer is not buffering itself + return Math.min(1.0f, fsin.getPos()/(float)(end)); + } + } + + public synchronized long getPos() throws IOException { + // assumes deserializer is not buffering itself + // position over uncompressed stream. not sure what + // effect this has on stats about job + return fsin.getPos(); + } + + public synchronized void close() throws IOException { + // assuming that this closes the underlying streams + deserializer.close(); + } + } + + protected boolean isSplittable(FileSystem fs, Path filename) { + return false; + } + + public RecordReader> getRecordReader(InputSplit split, + JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(split.toString()); + + return new FlatFileRecordReader(job, (FileSplit) split); + } +}