Index: src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerInputFormat.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerInputFormat.java (revision 0) +++ src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerInputFormat.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.Path; + +/** An {@link InputFormat} for Plain files with {@link Deserializer} records */ +public class FlatFileDeserializerInputFormat extends FileInputFormat> { + + public FlatFileDeserializerInputFormat() { + setMinSplitSize(1); + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + FileStatus[] files = super.listStatus(job); + for (int i = 0; i < files.length; i++) { + FileStatus file = files[i]; + } + return files; + } + + /** + * + */ + public RecordReader> getRecordReader(InputSplit split, + JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(split.toString()); + + return new FlatFileDeserializerRecordReader(job, (FileSplit) split); + } +} Index: src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerRecordReader.java (revision 0) +++ src/mapred/org/apache/hadoop/mapred/FlatFileDeserializerRecordReader.java (revision 0) @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.*; + +import java.io.*; + + + +/** + * An {@link RecordReader} for Plain files with {@link Deserializer} records + * Reads one row at a time of type R. The key is the row number and the actual row is returned inside the RowContainer. + * + */ +public class FlatFileDeserializerRecordReader implements RecordReader> { + + /** + * A RowContainer is needed until HADOOP-1230 is implemented. + * This is because some deserializers need to construct a new Object every time and cannot return the Object passed in (in this case, the value). + * e.g., thrift has no clear method to clear an object for re-use. + * + */ + static public class RowContainer { + public R row; + } + + + /** + * A Serialization implementation that reads serializer/deserializer from the configuration object. + */ + static public class SerializerFromConf implements Serialization, Configurable { + + public SerializerFromConf() { + } + + /** + * Always return true since we know this was instantiated for Objects of type R + */ + + public boolean accept(Class c) { + return true; + } + + /** + * Gets the serializer from mapred.input.serializer conf key + * + * @param c is ignored + * @return the serializer instance specified in the config file + * + */ + @SuppressWarnings("unchecked") + public Serializer getSerializer(Class c) { + // ignore c. doesn't matter, it is coming from the configuration + return ReflectionUtils.newInstance(conf.getClass("mapred.input.serializer", null, Serializer.class), conf); + } + + /** + * Gets the deserializer from mapred.input.deserializer conf key + * + * @param c is ignored + * @return the deserializer instance specified in the config file + * + */ + @SuppressWarnings("unchecked") + public Deserializer getDeserializer(Class c) { + // ignore c. doesn't matter, it is coming from the configuration + Class t = conf.getClass("mapred.input.deserializer", null, Deserializer.class); + return ReflectionUtils.newInstance(t, conf); + } + + /** + * The conf object for gettting the serializer/deserializer + */ + private Configuration conf; + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + } + + /** + * The current row within the file as our key is always a LongWritable with the row # + */ + private long rnum; + + /** + * The stream in use - is fsin if not compressed, otherwise, it is dcin. + */ + private final DataInputStream in; + + /** + * The decompressed stream or null if the input is not decompressed. + */ + private final InputStream dcin; + + /** + * The underlying stream. + */ + private final FSDataInputStream fsin; + + /** + * For calculating progress + */ + private final long end; + + /** + * The constructed deserializer + */ + private final Deserializer deserializer; + + /** + * Once EOF is reached, stop calling the deserializer + */ + private boolean isEOF; + + + /** + * FlatFileDeserializerRecordReader constructor constructs the underlying stream (potentially decompressed) and + * creates the deserializer. + * + * @param conf the jobconf + * @param split the split for this file + */ + public FlatFileDeserializerRecordReader(Configuration conf, + FileSplit split) throws IOException { + final Path path = split.getPath(); + FileSystem fileSys = path.getFileSystem(conf); + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); + final CompressionCodec codec = compressionCodecs.getCodec(path); + + fsin = fileSys.open(path); + + if(codec != null) { + dcin = codec.createInputStream(fsin); + in = new DataInputStream(dcin); + } else { + dcin = null; + in = fsin; + } + isEOF = false; + rnum = 0; + end = split.getLength(); + + FlatFileDeserializerRecordReader.SerializerFromConf lookup = new FlatFileDeserializerRecordReader.SerializerFromConf(); + lookup.setConf(conf); + deserializer = lookup.getDeserializer(null); + deserializer.open(in); + } + + /** + * The key is always a LongWritable of the row number + * @return LongWritable the current row number. + */ + public LongWritable createKey() { + return new LongWritable(); + } + + /** + * @return a new RowContainer instance. + */ + public RowContainer createValue() { + return new RowContainer(); + } + + + /** + * Returns the next row # and value + * + * @param key - the longwritable row # + * @param value - the row container which is always re-used, but the internal value may be set to a new Object + * @return whether the key and value were read. True if they were and false if EOF + * @exception IOException from the deserializer + */ + @SuppressWarnings("unchecked") + public synchronized boolean next(LongWritable key, RowContainer value) throws IOException { + if(isEOF) { + return false; + } + + // key here is just the row number + key.set(rnum++); + + if (key == null) + key = createKey(); + + if (value == null) + value = createValue(); + + // the deserializer is responsible for actually reading each record from the stream + try { + R row = (R)deserializer.deserialize(value.row); + if(row == null) { + isEOF = true; + return false; + } + value.row = row; + return true; + } catch(EOFException e) { + isEOF = true; + return false; + } + } + + public synchronized float getProgress() throws IOException { + // this assumes no splitting + if (end == 0) { + return 0.0f; + } else { + // gives progress over uncompressed stream + // assumes deserializer is not buffering itself + return Math.min(1.0f, fsin.getPos()/(float)(end)); + } + } + + public synchronized long getPos() throws IOException { + // assumes deserializer is not buffering itself + // position over uncompressed stream. not sure what + // effect this has on stats about job + return fsin.getPos(); + } + + public synchronized void close() throws IOException { + // assuming that this closes the underlying streams + deserializer.close(); + } +} Index: src/test/org/apache/hadoop/mapred/TestFlatFileDeserializerRecordReader.java =================================================================== --- src/test/org/apache/hadoop/mapred/TestFlatFileDeserializerRecordReader.java (revision 0) +++ src/test/org/apache/hadoop/mapred/TestFlatFileDeserializerRecordReader.java (revision 0) @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.conf.*; + + + +public class TestFlatFileDeserializerRecordReader extends TestCase { + + /** + * A deserializer for lines of plain text; is a thin wrapper around LineRecordReader.LineReader. + * But it shows that all the LineRecordReader stuff other than the line reader can go at some point. + * + */ + public static class DeserializeTextLines implements Deserializer { + + /** + * The line reader does the real work. + */ + private LineRecordReader.LineReader lr; + + + /** + * The input stream is used solely by this to implement close. + */ + // private InputStream in; + + public DeserializeTextLines() { } + + /** + * Constructs the LineReader + * @param in the input stream containing the rows of records to be deserialized + * @exception IOException if the underlying LineReader throws it. + */ + + public void open(InputStream in) throws IOException { + // this.in = in; + lr = new LineRecordReader.LineReader(in, 1024); + } + + /** + * @param t the row of text being deserialized. If this is non-null, it is re-used. Otherwise, a new Text object is constructed + * @return the next row or null on EOF + * + */ + public Text deserialize(Text t) throws IOException { + if(t == null) { + t = new Text(); + } + if(lr.readLine((Text)t) > 0) + return t; + else + return null; + } + + /** + * Closes the line reader. + * @exception IOException if the line reader throws IOException + */ + + public void close() throws IOException { + lr.close(); + } + } + + private static final Log LOG = FileInputFormat.LOG; + private static Configuration conf = new Configuration(); + + public void testReader() throws Exception { + + // create job and filesystem and reporter and such. + JobConf job = new JobConf(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + Path file = new Path(dir, "test.txt"); + Reporter reporter = Reporter.NULL; + fs.delete(dir, true); + + // Set this so the SerializerFromConf can lookup our deserializer. + job.setClass("mapred.input.deserializer", org.apache.hadoop.mapred.TestFlatFileDeserializerRecordReader.DeserializeTextLines.class, org.apache.hadoop.io.serializer.Deserializer.class); + + try { + FileInputFormat.setInputPaths(job, dir); + FSDataOutputStream ds = fs.create(file); + + // construct some data and write it + for (int i = 0; i < 10; i++) { + String s = "Hello World! " + String.valueOf(i) + "\n"; + ds.writeBytes(s); + } + ds.close(); + + // construct the split + FileInputFormat> format = + new FlatFileDeserializerInputFormat(); + InputSplit[] splits = format.getSplits(job, 1); + + // construct the record reader + RecordReader> reader = + format.getRecordReader(splits[0], job, reporter); + + // create key/value + LongWritable key = reader.createKey(); + FlatFileDeserializerRecordReader.RowContainer value = reader.createValue(); + + // read back the data using the FlatFileDeserializeRecordReader + int count = 0; + while (reader.next(key, value)) { + assertTrue(key.equals(new LongWritable(count))); + assertTrue(value.row.equals(new Text("Hello World! " +String.valueOf(count)))); + count++; + } + reader.close(); + } catch(Exception e) { + System.err.println("caught: " + e); + e.printStackTrace(); + } finally { + } + } + + + public static void main(String[] args) throws Exception { + new TestFlatFileDeserializerRecordReader().testReader(); + } +}