Index: src/java/org/apache/hadoop/io/SequenceFile.java =================================================================== --- src/java/org/apache/hadoop/io/SequenceFile.java (revision 656276) +++ src/java/org/apache/hadoop/io/SequenceFile.java (working copy) @@ -1038,6 +1038,18 @@ val.writeUncompressedBytes(out); // value } + public synchronized void appendRaw(byte[] keyData, int keyOffset, + int keyLength, byte[] value, int valOffset, int valLength) + throws IOException { + if (keyLength == 0) + throw new IOException("zero length keys not allowed: " + keyLength); + checkAndWriteSync(); + out.writeInt(keyLength); // total record length + out.writeInt(valLength); // key portion length + out.write(keyData, keyOffset, keyLength); // key + out.write(value, valOffset, valLength); // value + } + /** Returns the current length of the output file. * *

This always returns a synchronized position. In other words, @@ -2645,6 +2657,23 @@ writer.close(); } + /** Merge the keys/values contained in the byte arrays + * @param segments the list of byte-arrays + * @param outFile the final output file + * @throws IOException + */ + public void merge(ByteArrayManager ba, Path outFile) throws IOException { + if (fs.exists(outFile)) { + throw new IOException("already exists: " + outFile); + } + Writer writer = createWriter(fs, conf, outFile, keyClass, valClass); + InMemMergeQueue m = new InMemMergeQueue(ba); + while (m.next()) { + writer.appendRaw(m.getTop(), m.getOffsetInTop(), m.getKeyLength(), + m.getTop(), m.getOffsetInTop() + m.getKeyLength(), m.getValueLength()); + } + writer.close(); + } /** sort calls this to generate the final merged output */ private int mergePass(Path tmpDir) throws IOException { @@ -2678,6 +2707,93 @@ return mQueue.merge(); } + private class InMemMergeQueue extends PriorityQueue { + private class ByteArrayStream { + byte[] b; + int length; + int currentKeyLength; + int currentValLength; + int current; + public ByteArrayStream(byte[] b) { + this.b = b; + this.length = b.length; + } + boolean next() { + //First store the offset to read from. Then reset the offset to where + //it should be for the next read + int readFrom = current + currentKeyLength + currentValLength; + if (current == length) { + return false; + } else { + current += 8 + currentKeyLength + currentValLength; + } + currentKeyLength = readInt(readFrom); + currentValLength = readInt(readFrom + 4); + return true; + } + int readInt(int offset) { //this could be readVInt + int ch1 = b[offset]; + int ch2 = b[offset+1]; + int ch3 = b[offset+2]; + int ch4 = b[offset+3]; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + void cleanup() { + b = null; //help GC + ba.unreserveByteArraySpace(length); + } + } + private ByteArrayStream minSegment; + private ByteArrayManager ba; + protected boolean lessThan(Object a, Object b) { + ByteArrayStream ba = (ByteArrayStream)a; + ByteArrayStream bb = (ByteArrayStream)b; + return comparator.compare(ba.b, ba.current, ba.currentKeyLength, + bb.b, bb.current, bb.currentKeyLength) < 0; + } + public InMemMergeQueue(ByteArrayManager ba) { + this.ba = ba; + for (byte[] segment : ba.getCommittedArrays()) { + ByteArrayStream b = new ByteArrayStream(segment); + if (b.next()) { + put(b); + } + } + } + private void adjustPriorityQueue(ByteArrayStream bs) { + if (bs.next()) { + adjustTop(); + } else { + pop(); + bs.cleanup(); + } + } + public boolean next() { + if (size() == 0) { + return false; + } + if (minSegment != null) { + adjustPriorityQueue(minSegment); + if (size() == 0) { + return false; + } + } + minSegment = (ByteArrayStream)top(); + return true; + } + public byte[] getTop() { + return minSegment.b; + } + public int getOffsetInTop() { + return minSegment.current; + } + public int getKeyLength() { + return minSegment.currentKeyLength; + } + public int getValueLength() { + return minSegment.currentValLength; + } + } /** This class implements the core of the merge logic */ private class MergeQueue extends PriorityQueue implements RawKeyValueIterator { Index: src/java/org/apache/hadoop/io/ByteArrayManager.java =================================================================== --- src/java/org/apache/hadoop/io/ByteArrayManager.java (revision 0) +++ src/java/org/apache/hadoop/io/ByteArrayManager.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import java.util.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** An implementation of a byte array pool. The pool is capped by the total + * amount of memory configured for it to use. The flow is: + * getNewByteArray - get a new array and write to it + * commitByteArray - commit the array when the writes are over + * unreserveByteArraySpace - free up the space when the array is no longer reqd + * getCommittedArrays - returns the list of arrays that have been committed + */ +public class ByteArrayManager { + + private final static Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.ByteArrayManager"); + private long maxSize; + private volatile long totalUsed; + + private List committedArrays = new ArrayList(); + + public void initialize(Configuration conf) { + int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100")); + this.maxSize = size * 1024L * 1024L; + } + + public byte[] getNewByteArray(int size) { + synchronized (this) { + if (!canFitInMemory(size)) + return null; + byte[] b; + try { + b = new byte[(int)size]; + } catch (OutOfMemoryError o) { + return null; + } + totalUsed += size; + return b; + } + } + + public void commitByteArray(byte b[]) { + synchronized (this) { + committedArrays.add(b); + } + } + + public void unreserveByteArraySpace(int size) { + synchronized (this) { + totalUsed -= size; + } + } + + public List getCommittedArrays() { + synchronized (this) { + List arrays = + new ArrayList(committedArrays.size()); + int idx = 0; + for (byte[] b : committedArrays) { + arrays.set(idx++, b); + } + committedArrays.clear(); + return arrays; + } + } + + /** + * @TODO: Fix for Java6? + * As of Java5 it is safe to assume that if the buffer can fit + * in-memory then its buffer-size is less than Integer.MAX_VALUE. + */ + private boolean canFitInMemory(long size) { + if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < maxSize)) { + return true; + } + return false; + } + + public long getMaxSize() { + return maxSize; + } + + public float getPercentUsed() { + if (maxSize > 0) { + return (float)totalUsed/maxSize; + } + return 0; + } +}