Index: src/java/org/apache/hadoop/io/SequenceFile.java =================================================================== --- src/java/org/apache/hadoop/io/SequenceFile.java (revision 656276) +++ src/java/org/apache/hadoop/io/SequenceFile.java (working copy) @@ -1038,6 +1038,18 @@ val.writeUncompressedBytes(out); // value } + public synchronized void appendRaw(byte[] keyData, int keyOffset, + int keyLength, byte[] value, int valOffset, int valLength) + throws IOException { + if (keyLength == 0) + throw new IOException("zero length keys not allowed: " + keyLength); + checkAndWriteSync(); + out.writeInt(keyLength); // total record length + out.writeInt(valLength); // key portion length + out.write(keyData, keyOffset, keyLength); // key + out.write(value, valOffset, valLength); // value + } + /** Returns the current length of the output file. * *
This always returns a synchronized position. In other words,
@@ -2645,6 +2657,23 @@
writer.close();
}
+ /** Merge the keys/values contained in the byte arrays
+ * @param segments the list of byte-arrays
+ * @param outFile the final output file
+ * @throws IOException
+ */
+ public void merge(ByteArrayManager ba, Path outFile) throws IOException {
+ if (fs.exists(outFile)) {
+ throw new IOException("already exists: " + outFile);
+ }
+ Writer writer = createWriter(fs, conf, outFile, keyClass, valClass);
+ InMemMergeQueue m = new InMemMergeQueue(ba);
+ while (m.next()) {
+ writer.appendRaw(m.getTop(), m.getOffsetInTop(), m.getKeyLength(),
+ m.getTop(), m.getOffsetInTop() + m.getKeyLength(), m.getValueLength());
+ }
+ writer.close();
+ }
/** sort calls this to generate the final merged output */
private int mergePass(Path tmpDir) throws IOException {
@@ -2678,6 +2707,93 @@
return mQueue.merge();
}
+ private class InMemMergeQueue extends PriorityQueue {
+ private class ByteArrayStream {
+ byte[] b;
+ int length;
+ int currentKeyLength;
+ int currentValLength;
+ int current;
+ public ByteArrayStream(byte[] b) {
+ this.b = b;
+ this.length = b.length;
+ }
+ boolean next() {
+ //First store the offset to read from. Then reset the offset to where
+ //it should be for the next read
+ int readFrom = current + currentKeyLength + currentValLength;
+ if (current == length) {
+ return false;
+ } else {
+ current += 8 + currentKeyLength + currentValLength;
+ }
+ currentKeyLength = readInt(readFrom);
+ currentValLength = readInt(readFrom + 4);
+ return true;
+ }
+ int readInt(int offset) { //this could be readVInt
+ int ch1 = b[offset];
+ int ch2 = b[offset+1];
+ int ch3 = b[offset+2];
+ int ch4 = b[offset+3];
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+ void cleanup() {
+ b = null; //help GC
+ ba.unreserveByteArraySpace(length);
+ }
+ }
+ private ByteArrayStream minSegment;
+ private ByteArrayManager ba;
+ protected boolean lessThan(Object a, Object b) {
+ ByteArrayStream ba = (ByteArrayStream)a;
+ ByteArrayStream bb = (ByteArrayStream)b;
+ return comparator.compare(ba.b, ba.current, ba.currentKeyLength,
+ bb.b, bb.current, bb.currentKeyLength) < 0;
+ }
+ public InMemMergeQueue(ByteArrayManager ba) {
+ this.ba = ba;
+ for (byte[] segment : ba.getCommittedArrays()) {
+ ByteArrayStream b = new ByteArrayStream(segment);
+ if (b.next()) {
+ put(b);
+ }
+ }
+ }
+ private void adjustPriorityQueue(ByteArrayStream bs) {
+ if (bs.next()) {
+ adjustTop();
+ } else {
+ pop();
+ bs.cleanup();
+ }
+ }
+ public boolean next() {
+ if (size() == 0) {
+ return false;
+ }
+ if (minSegment != null) {
+ adjustPriorityQueue(minSegment);
+ if (size() == 0) {
+ return false;
+ }
+ }
+ minSegment = (ByteArrayStream)top();
+ return true;
+ }
+ public byte[] getTop() {
+ return minSegment.b;
+ }
+ public int getOffsetInTop() {
+ return minSegment.current;
+ }
+ public int getKeyLength() {
+ return minSegment.currentKeyLength;
+ }
+ public int getValueLength() {
+ return minSegment.currentValLength;
+ }
+ }
/** This class implements the core of the merge logic */
private class MergeQueue extends PriorityQueue
implements RawKeyValueIterator {
Index: src/java/org/apache/hadoop/io/ByteArrayManager.java
===================================================================
--- src/java/org/apache/hadoop/io/ByteArrayManager.java (revision 0)
+++ src/java/org/apache/hadoop/io/ByteArrayManager.java (revision 0)
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/** An implementation of a byte array pool. The pool is capped by the total
+ * amount of memory configured for it to use. The flow is:
+ * getNewByteArray - get a new array and write to it
+ * commitByteArray - commit the array when the writes are over
+ * unreserveByteArraySpace - free up the space when the array is no longer reqd
+ * getCommittedArrays - returns the list of arrays that have been committed
+ */
+public class ByteArrayManager {
+
+ private final static Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.ByteArrayManager");
+ private long maxSize;
+ private volatile long totalUsed;
+
+ private List