Index: src/core/org/apache/hadoop/io/SequenceFile.java =================================================================== --- src/core/org/apache/hadoop/io/SequenceFile.java (revision 693297) +++ src/core/org/apache/hadoop/io/SequenceFile.java (working copy) @@ -1350,7 +1350,7 @@ } // BlockCompressionWriter /** Reads key/value pairs from a sequence-format file. */ - public static class Reader implements java.io.Closeable { + public static class Reader implements java.io.Closeable, TypedSplittableFile { private Path file; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); @@ -1406,12 +1406,18 @@ private Deserializer keyDeserializer; private Deserializer valDeserializer; + public Reader() { } + /** Open the named file. */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false); } + public void initialize(FileSystem fs, Path file, Configuration conf) throws IOException { + internal_initialize(fs, file, conf.getInt("io.file.buffer.size", 4096), 0, fs.getLength(file), conf, false); + } + private Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, boolean tempReader) throws IOException { this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader); @@ -1420,6 +1426,12 @@ private Reader(FileSystem fs, Path file, int bufferSize, long start, long length, Configuration conf, boolean tempReader) throws IOException { + internal_initialize(fs, file, bufferSize, start, length, conf, tempReader); + } + + private void internal_initialize(FileSystem fs, Path file, int bufferSize, long start, + long length, Configuration conf, boolean tempReader) + throws IOException { this.file = file; this.in = openFile(fs, file, bufferSize, length); this.conf = conf; @@ -1819,7 +1831,7 @@ /** Read the next key in the file into key, skipping its * value. True if another entry exists, and false at end of file. */ - public synchronized boolean next(Writable key) throws IOException { + public synchronized boolean next(Writable key) throws IOException { if (key.getClass() != getKeyClass()) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); @@ -2545,7 +2557,7 @@ * @return true if there exists a key/value, false otherwise * @throws IOException */ - boolean next() throws IOException; + boolean next() throws IOException; /** closes the iterator so that the underlying streams can be closed * @throws IOException */ @@ -2808,7 +2820,7 @@ public ValueBytes getValue() throws IOException { return rawValue; } - public boolean next() throws IOException { + public boolean next() throws IOException { if (size() == 0) return false; if (minSegment != null) { Index: src/core/org/apache/hadoop/io/TypedSplittableFile.java =================================================================== --- src/core/org/apache/hadoop/io/TypedSplittableFile.java (revision 0) +++ src/core/org/apache/hadoop/io/TypedSplittableFile.java (revision 0) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import java.io.*; + +public interface TypedSplittableFile { + public void initialize(FileSystem fileSys, Path path, Configuration conf) throws IOException; + + public Class getKeyClass() ; + public Class getValueClass(); + + public Object next(Object key) throws IOException; + public Object getCurrentValue(Object val) throws IOException ; + + public boolean syncSeen(); // i.e., atEOF() + public void sync(long position) throws IOException; // skip to past last frame boundary + public long getPosition() throws IOException; + public void seek(long position) throws IOException; + public void close() throws IOException; + +} Index: src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java (revision 0) +++ src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java (revision 0) @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; +import org.apache.hadoop.util.ReflectionUtils; + +/** An {@link RecordReader} for {@link TypedSplittableFile}s. */ +//public class TypedSplittableFileRecordReader implements RecordReader { +public class TypedSplittableFileRecordReader implements RecordReader { + + private TypedSplittableFile in; + private long start; + private long end; + private boolean more = true; + protected Configuration conf; + + public TypedSplittableFileRecordReader(Configuration conf, FileSplit split, TypedSplittableFile in) + throws IOException { + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + this.in = in; + this.in.initialize(fs, path, conf); + this.end = split.getStart() + split.getLength(); + this.conf = conf; + + if (split.getStart() > in.getPosition()) + in.sync(split.getStart()); // sync to start + + this.start = in.getPosition(); + more = start < end; + } + + + /** The class of key that must be passed to {@link + * #next(Object, Object)}.. */ + public Class getKeyClass() { return in.getKeyClass(); } + + /** The class of value that must be passed to {@link + * #next(Object, Object)}.. */ + public Class getValueClass() { return in.getValueClass(); } + + @SuppressWarnings("unchecked") + public K createKey() { + return (K) ReflectionUtils.newInstance(getKeyClass(), conf); + } + + @SuppressWarnings("unchecked") + public V createValue() { + return (V) ReflectionUtils.newInstance(getValueClass(), conf); + } + + public synchronized boolean next(K key, V value) throws IOException { + if (!more) return false; + long pos = in.getPosition(); + boolean remaining = (in.next(key) != null); + if (remaining) { + getCurrentValue(value); + } + if (pos >= end && in.syncSeen()) { + more = false; + } else { + more = remaining; + } + return more; + } + + protected synchronized boolean next(K key) + throws IOException { + if (!more) return false; + long pos = in.getPosition(); + boolean remaining = (in.next(key) != null); + if (pos >= end && in.syncSeen()) { + more = false; + } else { + more = remaining; + } + return more; + } + + protected synchronized void getCurrentValue(V value) + throws IOException { + in.getCurrentValue(value); + } + + /** + * Return the progress within the input split + * @return 0.0 to 1.0 of the input byte range + */ + public float getProgress() throws IOException { + if (end == start) { + return 0.0f; + } else { + return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); + } + } + + public synchronized long getPos() throws IOException { + return in.getPosition(); + } + + protected synchronized void seek(long pos) throws IOException { + in.seek(pos); + } + public synchronized void close() throws IOException { in.close(); } + +} + Index: src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java =================================================================== --- src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java (revision 693316) +++ src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java (working copy) @@ -20,7 +20,6 @@ import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,101 +27,8 @@ import org.apache.hadoop.util.ReflectionUtils; /** An {@link RecordReader} for {@link SequenceFile}s. */ -public class SequenceFileRecordReader implements RecordReader { - - private SequenceFile.Reader in; - private long start; - private long end; - private boolean more = true; - protected Configuration conf; - - public SequenceFileRecordReader(Configuration conf, FileSplit split) - throws IOException { - Path path = split.getPath(); - FileSystem fs = path.getFileSystem(conf); - this.in = new SequenceFile.Reader(fs, path, conf); - this.end = split.getStart() + split.getLength(); - this.conf = conf; - - if (split.getStart() > in.getPosition()) - in.sync(split.getStart()); // sync to start - - this.start = in.getPosition(); - more = start < end; +public class SequenceFileRecordReader extends TypedSplittableFileRecordReader { + public SequenceFileRecordReader(Configuration conf, FileSplit split) throws IOException { + super(conf, split, new SequenceFile.Reader()); } - - - /** The class of key that must be passed to {@link - * #next(Object, Object)}.. */ - public Class getKeyClass() { return in.getKeyClass(); } - - /** The class of value that must be passed to {@link - * #next(Object, Object)}.. */ - public Class getValueClass() { return in.getValueClass(); } - - @SuppressWarnings("unchecked") - public K createKey() { - return (K) ReflectionUtils.newInstance(getKeyClass(), conf); - } - - @SuppressWarnings("unchecked") - public V createValue() { - return (V) ReflectionUtils.newInstance(getValueClass(), conf); - } - - public synchronized boolean next(K key, V value) throws IOException { - if (!more) return false; - long pos = in.getPosition(); - boolean remaining = (in.next(key) != null); - if (remaining) { - getCurrentValue(value); - } - if (pos >= end && in.syncSeen()) { - more = false; - } else { - more = remaining; - } - return more; - } - - protected synchronized boolean next(K key) - throws IOException { - if (!more) return false; - long pos = in.getPosition(); - boolean remaining = (in.next(key) != null); - if (pos >= end && in.syncSeen()) { - more = false; - } else { - more = remaining; - } - return more; - } - - protected synchronized void getCurrentValue(V value) - throws IOException { - in.getCurrentValue(value); - } - - /** - * Return the progress within the input split - * @return 0.0 to 1.0 of the input byte range - */ - public float getProgress() throws IOException { - if (end == start) { - return 0.0f; - } else { - return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); - } - } - - public synchronized long getPos() throws IOException { - return in.getPosition(); - } - - protected synchronized void seek(long pos) throws IOException { - in.seek(pos); - } - public synchronized void close() throws IOException { in.close(); } - } -