Index: src/contrib/serialization/readers/src/test/org/apache/hadoop/contrib/serialization/readers/TestFlatFileDeserializerRecordReader.java =================================================================== --- src/contrib/serialization/readers/src/test/org/apache/hadoop/contrib/serialization/readers/TestFlatFileDeserializerRecordReader.java (revision 0) +++ src/contrib/serialization/readers/src/test/org/apache/hadoop/contrib/serialization/readers/TestFlatFileDeserializerRecordReader.java (revision 0) @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.serialization.readers; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.record.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.ReflectionUtils; + +import com.facebook.thrift.*; +import com.facebook.thrift.transport.*; +import com.facebook.thrift.protocol.*; + + +public class TestFlatFileDeserializerRecordReader extends TestCase { + + /** + * Simple test object + */ + static class TestObj implements Serializable { + String s; + int num; + public TestObj(String s, int num) { + this.s = s; + this.num = num; + } + } + + + public void testFlatFileDeserializer() throws Exception { + Configuration conf; + JobConf job ; + FileSystem fs; + Path dir ; + Path file; + Reporter reporter; + FSDataOutputStream ds; + + try { + // + // create job and filesystem and reporter and such. + // + conf = new Configuration(); + job = new JobConf(conf); + fs = FileSystem.getLocal(conf); + dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + file = new Path(dir, "test.txt"); + reporter = Reporter.NULL; + fs.delete(dir, true); + job.setClass(FlatFileDeserializerRecordReader.SerializationContextImplKey, + org.apache.hadoop.mapred.FlatFileDeserializerRecordReader.SerializationContextFromConf.class, + org.apache.hadoop.mapred.FlatFileDeserializerRecordReader.SerializationContext.class); + + // + // Set this so the SerializerFromConf can lookup our deserializer. + // + job.setClass(FlatFileDeserializerRecordReader.SerializationContextFromConf.SerializationImplKey, + org.apache.hadoop.io.serializer.JavaSerialization.class, + org.apache.hadoop.io.serializer.Serialization.class); + + job.setClass(FlatFileDeserializerRecordReader.SerializationContextFromConf.SerializationSubclassKey, + TestObj.class, java.io.Serializable.class); + + // + // Write some data out to a flat file + // + FileInputFormat.setInputPaths(job, dir); + Serializer serializer = new JavaSerialization().getSerializer(null); + + // construct some data and write it + serializer.open(ds); + for (int i = 0; i < 10; i++) { + serializer.serialize(new TestObj("Hello World! " + String.valueOf(i), i)); + } + serializer.close(); + + // + // Construct the reader + // + FileInputFormat> format = + new FlatFileDeserializerInputFormat(); + InputSplit[] splits = format.getSplits(job, 1); + + // construct the record reader + RecordReader> reader = + format.getRecordReader(splits[0], job, reporter); + + // create key/value + LongWritable key = reader.createKey(); + FlatFileDeserializerRecordReader.RowContainer value = reader.createValue(); + + // + // read back the data using the FlatFileDeserializeRecordReader + // + int count = 0; + while (reader.next(key, value)) { + assertTrue(key.equals(new LongWritable(count))); + assertTrue(value.row.equals(new TestObj("Hello World! " +String.valueOf(count), count))); + count++; + } + reader.close(); + + } catch(Exception e) { + System.err.println("caught: " + e); + e.printStackTrace(); + } finally { + } + } + + + public static void main(String[] args) throws Exception { + new TestFlatFileDeserializerRecordReader().testFlatFileDeserializer(); + + } +} Index: src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerInputFormat.java =================================================================== --- src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerInputFormat.java (revision 0) +++ src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerInputFormat.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.serialization.readers; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.Path; + +/** An {@link InputFormat} for Plain files with {@link Deserializer} records */ +public class FlatFileDeserializerInputFormat extends FileInputFormat> { + + protected boolean isSplittable(FileSystem fs, Path filename) { + return false; + } + + public FlatFileDeserializerInputFormat() { + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + FileStatus[] files = super.listStatus(job); + for (int i = 0; i < files.length; i++) { + FileStatus file = files[i]; + } + return files; + } + + /** + * + */ + public RecordReader> getRecordReader(InputSplit split, + JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(split.toString()); + + return new FlatFileDeserializerRecordReader(job, (FileSplit) split); + } +} Index: src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerRecordReader.java =================================================================== --- src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerRecordReader.java (revision 0) +++ src/contrib/serialization/readers/src/java/org/apache/hadoop/contrib/serialization/readers/FlatFileDeserializerRecordReader.java (revision 0) @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.serialization.readers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.*; + +import java.io.*; + + +/** + * An {@link RecordReader} for plain files with {@link Deserializer} records + * + * Reads one row at a time of type R. The key is the row number and the actual row is returned inside the RowContainer. + * R is intended to be a base class of something such as: Record, Writable, Text, ... + * + */ +public class FlatFileDeserializerRecordReader implements RecordReader> { + + + /** + * An interface for a helper class for instantiating {@link Serialization} classes. + */ + static public interface SerializationContext extends Configurable { + + /** + * An {@link Serialization} object for objects of type T + * @return a serialization object for this context + */ + public Serialization getSerialization() throws IOException; + + /** + * Produces the specific class to deserialize + */ + public Class getRealClass() throws IOException; + } + + /** + * An implementation of {@link SerializationContext} that reads the Serialization class and + * specific subclass to be deserialized from the JobConf. + * + */ + static public class SerializationContextFromConf implements FlatFileDeserializerRecordReader.SerializationContext { + + /** + * The JobConf keys for the Serialization implementation and the Class that is being + * deserialized. + */ + static public final String SerializationImplKey = "mapred.input.serialization.implKey"; + static public final String SerializationSubclassKey = "mapred.input.serialization.subclassKey"; + + /** + * Implements configurable so it can use the configuration to find the right classes + */ + private Configuration conf; + public void setConf(Configuration conf) { this.conf = conf; } + public Configuration getConf() { return conf; } + + public SerializationContextFromConf() { + } + + /** + * @return the actual class being deserialized + * @exception does not currently throw IOException + */ + public Class getRealClass() throws IOException { + return (Class)conf.getClass(SerializationSubclassKey, null, Object.class); + + } + + /** + * Looks up and instantiates the Serialization Object + * @return the serialization object for this context + * @exception does not currently throw any IOException + */ + public Serialization getSerialization() throws IOException { + Class> tClass = (Class>)conf.getClass(SerializationImplKey, null, Serialization.class); + return (Serialization)ReflectionUtils.newInstance(tClass, conf); + } + } + + + /** + * A RowContainer is needed until HADOOP-1230 is implemented. + * This is because some deserializers need to construct a new Object every time and cannot return the Object passed in (in this case, the value). + * e.g., thrift has no clear method to clear an object for re-use. + * + */ + static public class RowContainer { + public R row; + } + + /** + * The current row within the file as our key is always a LongWritable with the row # + */ + private long rnum; + + /** + * The stream in use - is fsin if not compressed, otherwise, it is dcin. + */ + private final DataInputStream in; + + /** + * The decompressed stream or null if the input is not decompressed. + */ + private final InputStream dcin; + + /** + * The underlying stream. + */ + private final FSDataInputStream fsin; + + /** + * For calculating progress + */ + private final long end; + + /** + * The constructed deserializer + */ + private final Deserializer deserializer; + + /** + * Once EOF is reached, stop calling the deserializer + */ + private boolean isEOF; + + /** + * The JobConf which contains information needed to instantiate the correct Deserializer + */ + private Configuration conf; + + /** + * The actual class of the row's we are deserializing, not just the base class + */ + private Class realRowClass; + + + /** + * FlatFileDeserializerRecordReader constructor constructs the underlying stream (potentially decompressed) and + * creates the deserializer. + * + * @param conf the jobconf + * @param split the split for this file + */ + public FlatFileDeserializerRecordReader(Configuration conf, + FileSplit split) throws IOException { + final Path path = split.getPath(); + FileSystem fileSys = path.getFileSystem(conf); + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); + final CompressionCodec codec = compressionCodecs.getCodec(path); + this.conf = conf; + + fsin = fileSys.open(path); + if(codec != null) { + dcin = codec.createInputStream(fsin); + in = new DataInputStream(dcin); + } else { + dcin = null; + in = fsin; + } + + isEOF = false; + rnum = 0; + end = split.getLength(); + + // Instantiate a SerializationContext which this will use to lookup the Serialization class and the + // actual class being deserialized + Class> sinfoClass = + (Class>)conf.getClass(SerializationContextImplKey, SerializationContext.class); + SerializationContext sinfo = (SerializationContext)ReflectionUtils.newInstance(sinfoClass, conf); + sinfo.setConf(conf); + + // Get the Serialization object and the class being deserialized + Serialization serialization = sinfo.getSerialization(); + realRowClass = (Class)sinfo.getRealClass(); + + deserializer = serialization.getDeserializer((Class)realRowClass); + deserializer.open(in); + } + + /** + * The actual class of the data being deserialized + */ + private Class realRowclass; + + /** + * The JobConf key of the SerializationContext to use + */ + static public final String SerializationContextImplKey = "mapred.input.serialization.context_impl"; + + /** + * The key is always a LongWritable of the row number + * @return LongWritable the current row number. + */ + public LongWritable createKey() { + return new LongWritable(); + } + + /** + * @return a new RowContainer instance. + */ + @SuppressWarnings("unchecked") + public RowContainer createValue() { + RowContainer container = new RowContainer(); + container.row = (R)ReflectionUtils.newInstance(realRowClass, conf); + return container; + } + + /** + * Returns the next row # and value + * + * @param key - the longwritable row # + * @param value - the row container which is always re-used, but the internal value may be set to a new Object + * @return whether the key and value were read. True if they were and false if EOF + * @exception IOException from the deserializer + */ + @SuppressWarnings("unchecked") + public synchronized boolean next(LongWritable key, RowContainer value) throws IOException { + if(isEOF) { + return false; + } + + // key here is just the row number + key.set(rnum++); + + if (key == null) + key = createKey(); + + if (value == null) + value = createValue(); + + // the deserializer is responsible for actually reading each record from the stream + try { + R row = (R)deserializer.deserialize(value.row); + if(row == null) { + isEOF = true; + return false; + } + value.row = row; + return true; + } catch(EOFException e) { + isEOF = true; + return false; + } + } + + public synchronized float getProgress() throws IOException { + // this assumes no splitting + if (end == 0) { + return 0.0f; + } else { + // gives progress over uncompressed stream + // assumes deserializer is not buffering itself + return Math.min(1.0f, fsin.getPos()/(float)(end)); + } + } + + public synchronized long getPos() throws IOException { + // assumes deserializer is not buffering itself + // position over uncompressed stream. not sure what + // effect this has on stats about job + return fsin.getPos(); + } + + public synchronized void close() throws IOException { + // assuming that this closes the underlying streams + deserializer.close(); + } +}