Index: src/core/org/apache/hadoop/io/SequenceFile.java
===================================================================
--- src/core/org/apache/hadoop/io/SequenceFile.java (revision 693297)
+++ src/core/org/apache/hadoop/io/SequenceFile.java (working copy)
@@ -1350,7 +1350,7 @@
} // BlockCompressionWriter
/** Reads key/value pairs from a sequence-format file. */
- public static class Reader implements java.io.Closeable {
+ public static class Reader implements java.io.Closeable, TypedSplittableFile {
private Path file;
private FSDataInputStream in;
private DataOutputBuffer outBuf = new DataOutputBuffer();
@@ -1406,12 +1406,18 @@
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
+ public Reader() { }
+
/** Open the named file. */
public Reader(FileSystem fs, Path file, Configuration conf)
throws IOException {
this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
}
+ public void initialize(FileSystem fs, Path file, Configuration conf) throws IOException {
+ internal_initialize(fs, file, conf.getInt("io.file.buffer.size", 4096), 0, fs.getLength(file), conf, false);
+ }
+
private Reader(FileSystem fs, Path file, int bufferSize,
Configuration conf, boolean tempReader) throws IOException {
this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
@@ -1420,6 +1426,12 @@
private Reader(FileSystem fs, Path file, int bufferSize, long start,
long length, Configuration conf, boolean tempReader)
throws IOException {
+ internal_initialize(fs, file, bufferSize, start, length, conf, tempReader);
+ }
+
+ private void internal_initialize(FileSystem fs, Path file, int bufferSize, long start,
+ long length, Configuration conf, boolean tempReader)
+ throws IOException {
this.file = file;
this.in = openFile(fs, file, bufferSize, length);
this.conf = conf;
@@ -1819,7 +1831,7 @@
/** Read the next key in the file into key, skipping its
* value. True if another entry exists, and false at end of file. */
- public synchronized boolean next(Writable key) throws IOException {
+ public synchronized boolean next(Writable key) throws IOException {
if (key.getClass() != getKeyClass())
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
@@ -2545,7 +2557,7 @@
* @return true if there exists a key/value, false otherwise
* @throws IOException
*/
- boolean next() throws IOException;
+ boolean next() throws IOException;
/** closes the iterator so that the underlying streams can be closed
* @throws IOException
*/
@@ -2808,7 +2820,7 @@
public ValueBytes getValue() throws IOException {
return rawValue;
}
- public boolean next() throws IOException {
+ public boolean next() throws IOException {
if (size() == 0)
return false;
if (minSegment != null) {
Index: src/core/org/apache/hadoop/io/TypedSplittableFile.java
===================================================================
--- src/core/org/apache/hadoop/io/TypedSplittableFile.java (revision 0)
+++ src/core/org/apache/hadoop/io/TypedSplittableFile.java (revision 0)
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import java.io.*;
+
+public interface TypedSplittableFile {
+ public void initialize(FileSystem fileSys, Path path, Configuration conf) throws IOException;
+
+ public Class getKeyClass() ;
+ public Class getValueClass();
+
+ public Object next(Object key) throws IOException;
+ public Object getCurrentValue(Object val) throws IOException ;
+
+ public boolean syncSeen(); // i.e., atEOF()
+ public void sync(long position) throws IOException; // skip to past last frame boundary
+ public long getPosition() throws IOException;
+ public void seek(long position) throws IOException;
+ public void close() throws IOException;
+
+}
Index: src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java (revision 0)
+++ src/mapred/org/apache/hadoop/mapred/TypedSplittableFileRecordReader.java (revision 0)
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** An {@link RecordReader} for {@link TypedSplittableFile}s. */
+//public class TypedSplittableFileRecordReader implements RecordReader {
+public class TypedSplittableFileRecordReader implements RecordReader {
+
+ private TypedSplittableFile in;
+ private long start;
+ private long end;
+ private boolean more = true;
+ protected Configuration conf;
+
+ public TypedSplittableFileRecordReader(Configuration conf, FileSplit split, TypedSplittableFile in)
+ throws IOException {
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = in;
+ this.in.initialize(fs, path, conf);
+ this.end = split.getStart() + split.getLength();
+ this.conf = conf;
+
+ if (split.getStart() > in.getPosition())
+ in.sync(split.getStart()); // sync to start
+
+ this.start = in.getPosition();
+ more = start < end;
+ }
+
+
+ /** The class of key that must be passed to {@link
+ * #next(Object, Object)}.. */
+ public Class getKeyClass() { return in.getKeyClass(); }
+
+ /** The class of value that must be passed to {@link
+ * #next(Object, Object)}.. */
+ public Class getValueClass() { return in.getValueClass(); }
+
+ @SuppressWarnings("unchecked")
+ public K createKey() {
+ return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ public V createValue() {
+ return (V) ReflectionUtils.newInstance(getValueClass(), conf);
+ }
+
+ public synchronized boolean next(K key, V value) throws IOException {
+ if (!more) return false;
+ long pos = in.getPosition();
+ boolean remaining = (in.next(key) != null);
+ if (remaining) {
+ getCurrentValue(value);
+ }
+ if (pos >= end && in.syncSeen()) {
+ more = false;
+ } else {
+ more = remaining;
+ }
+ return more;
+ }
+
+ protected synchronized boolean next(K key)
+ throws IOException {
+ if (!more) return false;
+ long pos = in.getPosition();
+ boolean remaining = (in.next(key) != null);
+ if (pos >= end && in.syncSeen()) {
+ more = false;
+ } else {
+ more = remaining;
+ }
+ return more;
+ }
+
+ protected synchronized void getCurrentValue(V value)
+ throws IOException {
+ in.getCurrentValue(value);
+ }
+
+ /**
+ * Return the progress within the input split
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ protected synchronized void seek(long pos) throws IOException {
+ in.seek(pos);
+ }
+ public synchronized void close() throws IOException { in.close(); }
+
+}
+
Index: src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java (revision 693316)
+++ src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java (working copy)
@@ -20,7 +20,6 @@
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,101 +27,8 @@
import org.apache.hadoop.util.ReflectionUtils;
/** An {@link RecordReader} for {@link SequenceFile}s. */
-public class SequenceFileRecordReader implements RecordReader {
-
- private SequenceFile.Reader in;
- private long start;
- private long end;
- private boolean more = true;
- protected Configuration conf;
-
- public SequenceFileRecordReader(Configuration conf, FileSplit split)
- throws IOException {
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- this.in = new SequenceFile.Reader(fs, path, conf);
- this.end = split.getStart() + split.getLength();
- this.conf = conf;
-
- if (split.getStart() > in.getPosition())
- in.sync(split.getStart()); // sync to start
-
- this.start = in.getPosition();
- more = start < end;
+public class SequenceFileRecordReader extends TypedSplittableFileRecordReader {
+ public SequenceFileRecordReader(Configuration conf, FileSplit split) throws IOException {
+ super(conf, split, new SequenceFile.Reader());
}
-
-
- /** The class of key that must be passed to {@link
- * #next(Object, Object)}.. */
- public Class getKeyClass() { return in.getKeyClass(); }
-
- /** The class of value that must be passed to {@link
- * #next(Object, Object)}.. */
- public Class getValueClass() { return in.getValueClass(); }
-
- @SuppressWarnings("unchecked")
- public K createKey() {
- return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
- }
-
- @SuppressWarnings("unchecked")
- public V createValue() {
- return (V) ReflectionUtils.newInstance(getValueClass(), conf);
- }
-
- public synchronized boolean next(K key, V value) throws IOException {
- if (!more) return false;
- long pos = in.getPosition();
- boolean remaining = (in.next(key) != null);
- if (remaining) {
- getCurrentValue(value);
- }
- if (pos >= end && in.syncSeen()) {
- more = false;
- } else {
- more = remaining;
- }
- return more;
- }
-
- protected synchronized boolean next(K key)
- throws IOException {
- if (!more) return false;
- long pos = in.getPosition();
- boolean remaining = (in.next(key) != null);
- if (pos >= end && in.syncSeen()) {
- more = false;
- } else {
- more = remaining;
- }
- return more;
- }
-
- protected synchronized void getCurrentValue(V value)
- throws IOException {
- in.getCurrentValue(value);
- }
-
- /**
- * Return the progress within the input split
- * @return 0.0 to 1.0 of the input byte range
- */
- public float getProgress() throws IOException {
- if (end == start) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
- }
- }
-
- public synchronized long getPos() throws IOException {
- return in.getPosition();
- }
-
- protected synchronized void seek(long pos) throws IOException {
- in.seek(pos);
- }
- public synchronized void close() throws IOException { in.close(); }
-
}
-